redis keys导入导出

#!/usr/bin/python2
import sys
import os
import redis
import time
import datetime
import threading

string_keys=[]
hash_keys=[]
list_keys=[]
set_keys=[]
zset_keys=[]
ttl_dict={}
hash_mutex = threading.Lock()
list_mutex = threading.Lock()
set_mutex = threading.Lock()
zset_mutex = threading.Lock()
string_mutex = threading.Lock()
index=0

def import_string(source, dest):
    keys_count = len(string_keys)
    pipeSrc = source.pipeline(transaction=False)
    pipeDst = dest.pipeline(transaction=False)
    global index
    global string_mutex
    pipe_size=1000

    while 1 :
        string_mutex.acquire()
        if index >= keys_count :
            string_mutex.release()
            break
        old_index = index
        if index + pipe_size < keys_count :
            index += pipe_size
        else :
            index = keys_count
        max_index = index
        string_mutex.release()

        cur_index = old_index
        while (cur_index < max_index):
            pipeSrc.get(string_keys[cur_index])
            cur_index +=1
        results=pipeSrc.execute()
        for value in results:
            pipeDst.set(string_keys[old_index], value)
            if string_keys[old_index] in ttl_dict.keys() :
                pipeDst.expire(string_keys[old_index],ttl_dict[string_keys[old_index]]);
            old_index +=1
        pipeDst.execute()

def import_hash(source, dest):
    keys_count = len(hash_keys)
    pipeSrc = source.pipeline(transaction=False)
    pipeDst = dest.pipeline(transaction=False)
    while 1:
        hash_mutex.acquire()
        if len(hash_keys) == 0 :
            hash_mutex.release()
            break;
        key=hash_keys.pop(0)
        hash_mutex.release()
        hkeys=source.hkeys(key)
        keys_count = len(hkeys)
        index=0
        pipe_size=1000
        while index < keys_count:
            old_index=index
            num=0
            while (index < keys_count) and (num < pipe_size):
                pipeSrc.hget(key, hkeys[index])
                index +=1
                num +=1
            results=pipeSrc.execute()
            for value in results:
                pipeDst.hset(key, hkeys[old_index], value)
                old_index +=1
            pipeDst.execute()
        if key in ttl_dict.keys() :
            dest.expire(key,ttl_dict[key]);

def import_set(source, dest):
    keys_count = len(set_keys)
    pipeDst = dest.pipeline(transaction=False)
    while 1:
        set_mutex.acquire()
        if len(set_keys) == 0 :
            set_mutex.release()
            break;
        key=set_keys.pop(0)
        set_mutex.release()
        sValues=source.smembers(key)
        value_count = len(sValues)
        index=0
        pipe_size=1000
        while index < value_count:
            old_index=index
            num=0
            while (index < value_count) and (num < pipe_size):
                pipeDst.sadd(key, sValues.pop())
                index +=1
                num +=1
            pipeDst.execute()
        if key in ttl_dict.keys() :
            dest.expire(key,ttl_dict[key]);

def import_zset(source, dest):
    keys_count = len(zset_keys)
    pipeSrc = source.pipeline(transaction=False)
    pipeDst = dest.pipeline(transaction=False)
    while 1:
        zset_mutex.acquire()
        if len(zset_keys) == 0 :
            zset_mutex.release()
            break;
        key=zset_keys.pop(0)
        zset_mutex.release()
        zset_size = source.zcard(key)
        index=0
        pipe_size=1000
        while index < zset_size:
            members = source.zrange(key, index, index+pipe_size)
            index += len(members)
            for member in members:
                pipeSrc.zscore(key, member)
            scores = pipeSrc.execute()
            i=0
            for member in members:
                pipeDst.zadd(key, member, scores[i])
                i+=1
            pipeDst.execute()
        if key in ttl_dict.keys() :
            dest.expire(key,ttl_dict[key]);

def import_list(source, dest):
    keys_count = len(list_keys)
    pipeDst = dest.pipeline(transaction=False)
    while 1:
        list_mutex.acquire()
        if len(list_keys) == 0 :
            list_mutex.release()
            break;
        key=list_keys.pop(0)
        list_mutex.release()
        list_size = source.llen(key)
        index=0
        pipe_size=1000
        while index < list_size:
            results = source.lrange(key, index, index+pipe_size)
            index += len(results)
            for value in results:
                pipeDst.rpush(key, value)
            pipeDst.execute()
        if key in ttl_dict.keys() :
            dest.expire(key,ttl_dict[key]);

def read_type_keys(source):
    keys=source.keys()
    keys_count = len(keys)
    pipe = source.pipeline(transaction=False)
    #for key in keys:
    index=0
    pipe_size=5000
    while index < keys_count:
        old_index=index
        num=0
        while (index < keys_count) and (num < pipe_size):
            pipe.type(keys[index])
            index +=1
            num +=1
        results=pipe.execute()
        for type in results:
            if type == "string":
                string_keys.append(keys[old_index])
            elif type == "list":
                list_keys.append(keys[old_index])
            elif type == "hash":
                hash_keys.append(keys[old_index])
            elif type == "set":
                set_keys.append(keys[old_index])
            elif type == "zset":
                zset_keys.append(keys[old_index])
            else :
                print keys[old_index]," is not find when TYPE"
            old_index +=1

    index=0
    while index < keys_count:
        old_index=index
        num=0
        while (index < keys_count) and (num < pipe_size):
            pipe.ttl(keys[index])
            index +=1
            num +=1
        results=pipe.execute()
        for ttl_val in results:
            if ttl_val != None :
                ttl_dict[keys[old_index]]=ttl_val
            old_index +=1

def import_data(source):
    dest=redis.Redis(host=DstIP,port=DstPort)
    import_string(source, dest)
    import_hash(source, dest)
    import_list(source, dest)
    import_set(source, dest)
    import_zset(source, dest)

if __name__==‘__main__‘:
    argc = len(sys.argv)
    if argc != 6:
        print "usage: %s sourceIP sourcePort destIP destPort connectionNum" % (sys.argv[0])
        exit(1)
    SrcIP = sys.argv[1]
    SrcPort = int(sys.argv[2])
    DstIP = sys.argv[3]
    DstPort = int(sys.argv[4])
    ConnNum = int(sys.argv[5])

    source=redis.Redis(host=SrcIP,port=SrcPort)

    print "Begin Read Keys"
    read_type_keys(source)
    print "String Key Count is:",len(string_keys)
    print "Set Key Count is:",len(set_keys)
    print "ZSet Key Count is:",len(zset_keys)
    print "List Key Count is:",len(list_keys)
    print "Hash Key Count is:",len(hash_keys)

    start=datetime.datetime.now()
    threads = []
    for i in xrange(ConnNum):
        t = threading.Thread(target=import_data,args=(source,))
        threads.append(t)

    for t in threads:
        t.setDaemon(True)
        t.start()
    for t in threads:
        t.join()
    stop=datetime.datetime.now()
    diff=stop-start
    print "Finish, token time:",str(diff)

使用方法如下
./import_data.py sourceIP sourcePort destIP destPort connectionNum
sourceIP:导出库ip(数据源)
sourcePort:导出库port(数据源)
destIP:导入库ip
destPort:导入库Port
connectionNum: 数据导入时使用的连接数

时间: 2024-10-10 23:46:48

redis keys导入导出的相关文章

redis数据导入导出

在2017年,项目上的redis集群需要从2.8版本更换为3.2版本,此时涉及到redis数据key的迁移,经百度.×××,发现有三种方法可以实现1.第三方工具redis-dump.redis-load实现2.通过aof机制导入导出,需要开启aof功能3.通过rdb存储机制迁移数据,此方法最为简单,直接dump $key.restore $key即可 此篇主要介绍第一种方式,redis-dump.redis-load实现redis的数据导入导出,直接进入主题 redis-dump,要求ruby版

Redis 数据导入导出,redis-dump命令

安装redis-dump 工具 yum install ruby rubygems ruby-devel gem install redis-dump -V 使用方法 没有密码: # 导出 redis-dump -u 127.0.0.1:6379 -d 1 > test.json # 导入 < test.json redis-load -u 10.137.195.25 有密码: # 导出 redis-dump -u :123456@127.0.0.1:6379 -O > test.jso

redis的导入导出需要特别注意的地方

今天下暴雨,不想出去跑业务和拜访客户了,准备好好休息一下,可是,不到八点电话响了,说一个redis迁移不成功,赶紧起来干活,悲催啊! 问清情况,大致是这样的:从一个开发服务器的redis客户端执行了save指令,得到一个dump .rdb文件,然后把这个文件复制到新redis系统的数据目录(数据目录由redis配置文件指定):但重启redis-server以后,数据为空. 看样子对方搞不定,只好连vpn登录系统,查看配置文件,进程等.在日志中发现一些warning,担心是这些问题引起的,顺手做了

MySQL5.7.18 备份、Mysqldump,mysqlpump,xtrabackup,innobackupex 全量,增量备份,数据导入导出

粗略介绍冷备,热备,温暖,及Mysqldump,mysqlpump,xtrabackup,innobackupex 全量,增量备份 --备份的目的 灾难恢复:意外情况下(如服务器宕机.磁盘损坏等)对损坏的数据进行恢复和还原保证数据不丢失,最小程度地丢失需求改变:因需求改变而需要把数据还原到改变以前测试:测试新功能是否可用 --备份与恢复概述 根据备份的方法可以分为: 1.Hot Backup(热备) 2.Cold Backup(冷备) 3.Warm Backup(温备) Hot Backup是指

mysql表的导入导出

⒈select ... into outfile导出数据/load data infile导入数据⑴创建新表,定义表结构  ⅰ方法一:在同一数据库中      ①复制表结构及数据到新表      CREATE TABLE newtable  SELECT * FROM oldtable      ② 只复制表结构到新表       CREATE TABLE newtable  SELECT * FROM oldtable WHERE 1=2  ⅱ方法二:适应库与库之间,或者单库      ①只复

实现excel导入导出功能,excel导入数据到页面中,页面数据导出生成excel文件

今天接到项目中的一个功能,要实现excel的导入,导出功能.这个看起来思路比较清楚,但是做起了就遇到了不少问题. 不过核心的问题,大家也不会遇到了.每个项目前台页面,以及数据填充方式都不一样,不过大多都是以json数据填充的.在导入excel填充json数据到页面时,真的让我差点吐血了.在做这个导入导出的时候,那一个礼拜都是黑暗的. 好了,废话不多说了,我今天就给大家展示这个两个功能的核心点,excel生成json数据和json数据生成excel文件. 一:从上传文件到服务器,后台java解析,

【源】从零自学Hadoop(17):Hive数据导入导出,集群数据迁移下

阅读目录 序 将查询的结果写入文件系统 集群数据迁移一 集群数据迁移二 系列索引 本文版权归mephisto和博客园共有,欢迎转载,但须保留此段声明,并给出原文链接,谢谢合作. 文章是哥(mephisto)写的,SourceLink 序 上一篇,我们介绍了Hive的数据多种方式导入,这样我们的Hive就有了数据来源了,但有时候我们可能需要纯粹的导出,或者集群Hive数据的迁移(不同集群,不同版本),我们就可以通过这两章的知识来实现.   下面我们开始介绍hive的数据导出,以及集群Hive数据的

客户关系管理系统中对客户及相关数据的导入导出操作

在很多系统,我们都知道,Excel数据的导入导出操作是必不可少的一个功能,这种功能能够给使用者和外部进行数据交换,也能批量迅速的录入数据到系统中:但在一些系统中,为了方便,可能把很多个基础表或者相关的数据综合到一个Excel表格文件里面,然后希望通过接口进行导入,这种需求处理就显得比较复杂一点了.本文探讨在我的客户关系管理系统中,对于单个Excel表格中,集合了客户基础数据及相关数据的导入和导出操作的处理. 1.导入导出的需求分析 本随笔主要介绍如何在系统中,导入单一文件中的数据到系统中,这个文

MySQL存储引擎 SQL数据导入/导出 操作表记录 查询及匹配条件

MySQL存储引擎的配置 SQL数据导入/导出 操作表记录 查询及匹配条件 1 MySQL存储引擎的配置1.1 问题 本案例要求MySQL数据存储引擎的使用,完成以下任务操作: 可用的存储引擎类型 查看默认存储类型 更改表的存储引擎 1.2 步骤 实现此案例需要按照如下步骤进行. 步骤一:查看存储引擎信息 登入MySQL服务器,查看当前支持哪些存储引擎. 使用mysql命令连接,以root用户登入: [[email protected] ~]# mysql -u root –p Enter pa