[同步脚本]mysql-elasticsearch同步

公司项目搜索部分用的elasticsearch,那么这两个之间的数据同步就是一个问题。

网上找了几个包,但都有各自的缺点,最后决定还是自己写一个脚本,大致思路如下:

1.在死循环中不断的select指定的表

2.读取表中更新时间晚于某个时间点的所有行 (初始化时候为"1970-01-01 00:00:00")

3.把需要的字段更新到elasticsearch

注意:1.中间要考虑到脚本中断,或者重启所以把最后的更新时间记录到了固定的txt文件

2.为了让脚本更加通用,不至于为了一个表就大幅度更改脚本,考虑动态生成变量,使用了locals和globals

代码如下:

#!/usr/bin/env python
# coding=utf-8
import sys
sys.path.append(‘/Users/cangyufu/work_jbkj/elabels-flask‘)
from modules.utils.commons import app, redispool, db_master, db_slave
from sqlalchemy import text
import os
import datetime
import time
from service.myelasticsearch.index import es
from modules.utils.mysqldb import db_obj_dict
import datetime

CONST_SLEEP = 3

WORK_INDEX = ‘test‘

#https://stackoverflow.com/questions/136168/get-last-n-lines-of-a-file-with-python-similar-to-tail
def tail(f, lines=1):
    total_lines_wanted = lines

    BLOCK_SIZE = 1024
    f.seek(0, 2)
    block_end_byte = f.tell()
    lines_to_go = total_lines_wanted
    block_number = -1
    blocks = [] # blocks of size BLOCK_SIZE, in reverse order starting
                # from the end of the file
    while lines_to_go > 0 and block_end_byte > 0:
        if (block_end_byte - BLOCK_SIZE > 0):
            # read the last block we haven‘t yet read
            f.seek(block_number*BLOCK_SIZE, 2)
            blocks.append(f.read(BLOCK_SIZE))
        else:
            # file too small, start from begining
            f.seek(0,0)
            # only read what was not read
            blocks.append(f.read(block_end_byte))
        lines_found = blocks[-1].count(‘\n‘)
        lines_to_go -= lines_found
        block_end_byte -= BLOCK_SIZE
        block_number -= 1
    all_read_text = ‘‘.join(reversed(blocks))
    return ‘\n‘.join(all_read_text.splitlines()[-total_lines_wanted:])

def is_file_exists(filename):
    if not os.path.isfile(filename):
        file = open(filename, ‘wb‘)
        file.write("1970-01-01 00:00:00\n")
        file.close()

#传入要监控的表名
def sync_main(*args):
    for table in args:
        try:
            callable(globals()[‘monitor_‘+table])
        except Exception:
            raise Exception(‘lack function monitor_{}‘.format(table))
    for table in args:
        filename = ‘‘.join([‘monitor_‘, table, ‘.txt‘])
        locals()[table+‘path‘] = os.path.join(os.path.dirname(__file__), filename)
        is_file_exists(locals()[table+‘path‘])
        locals()[table+‘file‘] = open(locals()[table+‘path‘], ‘rb+‘)
    try:
        print "begin"
        while True:
            count = 0
            for table in args:
                print ‘handleing ‘+table
                last_time = tail(locals()[table+‘file‘], 1)
                update_time = globals()[‘monitor_‘+table](last_time)
                print update_time
                if update_time == last_time:
                    count += 1
                    continue
                locals()[table + ‘file‘].write(update_time+‘\n‘)
                locals()[table + ‘file‘].flush()
            if count == len(args):
                time.sleep(CONST_SLEEP)
    except Exception, e:
        print e
        raise e
    finally:
        for table in args:
            locals()[table + ‘file‘].close()

########################################################################################################################
#
# 如果要监控哪个表,必须要实现 函数 monitor_table_name,比如要监控table1表,就必须要实现monitor_table1函数,
#   传入参数为开始更新的起始时间,初始化时候为1970-01-01 00:00:00,返回更新到的最新的时间
#
########################################################################################################################
def monitor_table1(last_time):
    pass
    return last_time
def monitor_table2(last_time):
    pass
    return last_time
def trans_date_time(dt):    return datetime.datetime.strptime(dt, "%Y-%m-%d %H:%M:%S")

sync_main(‘table1‘,‘table2‘)
时间: 2024-10-12 20:56:47

[同步脚本]mysql-elasticsearch同步的相关文章

搭建Mysql主从同步服务

Mysql主从同步搭建 mysql主从同步利用binlog日志中记录的sql语句实现数据同步,进而实现自动备份数据的目的. 在搭建mysql主从同步时,要求一台服务器做master即主服务器,一台服务器做slave即从服务器,slave服务器从master服务器上同步binlog日志中记录的sql语句,在本地数据库中执行这些语句来实现数据库同步的目的. 注意,在搭建mysql主从同步服务时,slave上的数据库必须与master服务器上的数据库中的库和表完全相同,即有同样的库,表且表结构完全相同

MySQL 半同步复制模式说明及配置示例 - 运维小结

MySQL主从复制包括异步模式.半同步模式.GTID模式以及多源复制模式,默认是异步模式 (如之前详细介绍的mysql主从复制).所谓异步模式指的是MySQL 主服务器上I/O thread 线程将二进制日志写入binlog文件之后就返回客户端结果,不会考虑二进制日志是否完整传输到从服务器以及是否完整存放到从服务器上的relay日志中,这种模式一旦主服务(器)宕机,数据就可能会发生丢失. 异步模式是一种基于偏移量的主从复制,实现原理是: 主库开启binlog功能并授权从库连接主库,从库通过cha

Elasticsearch PHP MYSQL的同步使用

简介与用途 Elasticsearch是一个分布式,RESTful模式的高速搜索引擎,它使用标准的RESTful APIs和JSON,同时提供支持如java,python,php等的多种语言.下文将Elasticsearch简称ES. 一个简单的curl查询数据的示例如下: curl -XGET 'localhost:9200/sedoctorfeedback/feedback/_search?pretty&q=119' ES使用诸如XPUT,XDELETE,XPOST,XGET等RESTful

mysql准实时同步数据到Elasticsearch

4. 安装JDK8.MySQL5.6驱动以及Logstash -6.0.0 ECS中分别安装JDK8.MySQL5.6驱动以及Logstash -6.0.0.如下图: 安装Logstash input.output插件,此案例数据输入是MySQL,输出是ES,so相应的插件应该是logstash-input-jdbc和logstash-output-elasticsearch. 安装插件的命令分别是(在Logstash主目录下运行):https://blog.51cto.com/433266/b

mysql主从同步监控脚本

mysql主从同步监控脚本,利用mysql从库中的IO和SQL进程以及延迟时间来监控主从同步是否正常,详细shell脚本如下: #!/bin/bash #author wangning #date 2017-7-17 #qq 1198143315 #Email [email protected] ################################## define variable############################# define_variable(){ ip

利用脚本实现mysql主库到备库数据同步(每五分钟同步一次增量)

目标: 将主库数据(IP1)每五分钟一次同步到备库(IP2) 服务器备库上,只同步增加量 思路: 利用crontab 每五分钟一次定时执行脚本进行数据同步 在脚本中编译实现查询五分钟内的数据增加量,并将增加量导入到备库 实现过程及代码(以record 表为例): bash脚本中内容 vi transaction.sh----建立transaction脚本实现同步 #!/bin/bash source.bash_profile (由于定时执行的功能使用crontab实现,需要在bash中添加使环境

【ElasticSearch】---ElasticSearch同步Mysql

ElasticSearch同步Mysql 先讲项目需求:对于资讯模块添加搜索功能 这个搜索功能我就是采用ElasticSearch实现的,功能刚实现完,所以写这篇博客做个记录,让自己在记录下整个步骤和过程中的一些注意事项. 一.安装elasticsearch和可视化工具 有关整个教程参考:mac安装elasticsearch和可视化工具 1.安装elasticsearch 网址地址:官网 2.安装elasticsearch-head(可视化界面) 安装地址:https://github.com/

Elasticsearch学习-----第二章 windows环境下Elasticsearch同步mysql数据库

在上一章中,我们已经能够通过spring boot来使用Elasticsearch,但是由于我们习惯性的将数据写入mysql,所以为了解决这个问题,Elasticsearch为我们提供了一个插件logstash来同步我们的数据库.本文所有的安装环境和使用环境都是在windows系统下进行的. 一.logstash的安装 首先在官网上下载logstash: logstash下载地址:https://www.elastic.co/downloads/logstash 需要注意的是logstash的版

elasticsearch -- Logstash实现mysql同步数据到elasticsearch

配置 安装插件由于这里是从mysql同步数据到elasticsearch,所以需要安装jdbc的入插件和elasticsearch的出插件:logstash-input-jdbc.logstash-output-elasticsearch安装效果图如下所示: 下载mysql连接库由于logstash是ruby开发的,所以这里要下载mysql的连接库jar包,从官网下载,我这里下载的是:mysql-connector-java-5.1.46.jar将下载好的mysql-connector-java

检查mysql主从同步结构中的从数据库服务器的状态-脚本shell

检查mysql主从同步结构(一主一从)中的从数据库服务器的状态          (ip授权.从服务器和IO是否正常.从mysql进程是否正常) 主mysql: 192.168.1.10 从mysql: 192.168.1.20 [[email protected] ~]# vi check_slave.sh #!/bin/bash master=192.168.1.10 i=1 service mysqld status &>/dev/null while [ true ] do echo