logstash同步mysql到elasticsearch

logstash 获取

wget https://artifacts.elastic.co/downloads/logstash/logstash-6.5.2.zip
unzip logstash-6.5.2.zip && cd logstash-6.5.2

安装 jdbc 和 elasticsearch 插件

bin/logstash-plugin install logstash-input-jdbc
bin/logstash-plugin install logstash-output-elasticsearch

获取 jdbc mysql 驱动

wget http://central.maven.org/maven2/mysql/mysql-connector-java/5.1.47/mysql-connector-java-5.1.47.jar
unzip mysql-connector-java-5.1.47.zip

编写配置文件

logstash-input-jdbc

使用 logstash-input-jdbc 插件读取 mysql 的数据,这个插件的工作原理比较简单,就是定时执行一个 sql,然后将 sql 执行的结果写入到流中,增量获取的方式没有通过 binlog 方式同步,而是用一个递增字段作为条件去查询,每次都记录当前查询的位置,由于递增的特性,只需要查询比当前大的记录即可获取这段时间内的全部增量,一般的递增字段有两种,AUTO_INCREMENT 的主键 id 和 ON UPDATE CURRENT_TIMESTAMP 的 update_time 字段,id 字段只适用于那种只有插入没有更新的表,update_time 更加通用一些,建议在 mysql 表设计的时候都增加一个 update_time 字段

  • jdbc_driver_library: jdbc mysql 驱动的路径,在上一步中已经下载
  • jdbc_driver_class: 驱动类的名字,mysql 填 com.mysql.jdbc.Driver 就好了
  • jdbc_connection_string: mysql 地址
  • jdbc_user: mysql 用户
  • jdbc_password: mysql 密码
  • schedule: 执行 sql 时机,类似 crontab 的调度
  • statement: 要执行的 sql,以 ":" 开头是定义的变量,可以通过 parameters 来设置变量,这里的 sql_last_value是内置的变量,表示上一次 sql 执行中 update_time 的值,这里 update_time 条件是 >= 因为时间有可能相等,没有等号可能会漏掉一些增量
  • use_column_value: 使用递增列的值
  • tracking_column_type: 递增字段的类型,numeric 表示数值类型, timestamp 表示时间戳类型
  • tracking_column: 递增字段的名称,这里使用 update_time 这一列,这列的类型是 timestamp
  • last_run_metadata_path: 同步点文件,这个文件记录了上次的同步点,重启时会读取这个文件,这个文件可以手动修改
input {
  jdbc {
    jdbc_driver_library => "../mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://<mysql_host>:3306/rta"
    jdbc_user => "<username>"
    jdbc_password => "<password>"
    schedule => "* * * * *"
    statement => "SELECT * FROM table WHERE update_time >= :sql_last_value"
    use_column_value => true
    tracking_column_type => "timestamp"
    tracking_column => "update_time"
    last_run_metadata_path => "syncpoint_table"
  }
}

logstash-output-elasticsearch

  • hosts: es 集群地址
  • user: es 用户名
  • password: es 密码
  • index: 导入到 es 中的 index 名,这里我直接设置成了 mysql 表的名字
  • document_id: 导入到 es 中的文档 id,这个需要设置成主键,否则同一条记录更新后在 es 中会出现两条记录,%{id} 表示引用 mysql 表中 id 字段的值
output {
  elasticsearch {
    hosts => ["172.31.22.165", "172.31.17.241", "172.31.30.84", "172.31.18.178"]
    user => "<user>"
    password => "<password>"
    index => "table"
    document_id => "%{id}"
  }
}

运行

把上面的代码保存到一个配置文件里面 sync_table.cfg,执行下面命令即可

bin/logstash -f config/sync_table.cfg
#logstash的配置文件需要自己创建,一下是我自己的配置文件
cat  mysql_conf/mysql.conf

input {
     stdin {
    }
    jdbc {
      #数据库地址
      jdbc_connection_string => "jdbc:mysql://localhost:3601/user"
      jdbc_user => "admin"
      jdbc_password => "passwd"
      #数据库驱动路径
      jdbc_driver_library => "/soft/logstash-5.6.9/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      #sql路径,也可以是直接指定sql语句,字段需要换成 statement =>
      statement_filepath => "/soft/logstash-5.6.9/pay_test.sql"
      #是否开启记录追踪
      record_last_run => "true"
      #是否需要追踪字段,如果为true,则需要指定tracking_column,默认是timestamp
      use_column_value => "true"
      #指定追踪的字段,这里需要注意的是,建议选择主键字段,如果选择日期需要为实时表
      tracking_column => "id"
      #追踪字段的类型,目前只有数字和时间类型,默认是数字类型
      #tracking_column_type => "number"
      #设置时区
      jdbc_default_timezone =>"Asia/Shanghai"
      #是否每次清除last_run_metadata_path的内容
      clean_run => "false"
      #这里可以手动设置:sql_last_value的值,默认时间是1970-01-01,默认数字是0
      last_run_metadata_path => "/soft/logstash-5.6.9/logstash_jdbc_last_run"
      #多久同步一次
      schedule => "*/5 * * * *"
      #是否分页
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
    }
}

filter {
    #jdbc默认json,暂时没找到修改方法
    json {
        source => "message"
        remove_field => ["message"]
    }
    mutate {  #需要移除的字段
        remove_field => "@timestamp"
        remove_field => "type"
        remove_field => "@version"

    }
}

output {
        elasticsearch {
            hosts => "localhost:9200" #elasticsearch地址
            index => "user" #elasticsearch索引
            document_id => "%{id}" #elasticsearch的id,该值需要唯一,如果不唯一就不要加这个字段,默认生成
            document_type => "log" #elasticsearch的type
        }
}

原文地址:https://www.cnblogs.com/FlyBlueSky/p/10097101.html

时间: 2024-08-03 01:06:40

logstash同步mysql到elasticsearch的相关文章

logstash同步mysql数据到Elasticsearch

安装logstash查看我的另一篇文章  Docker 部署 logstash 同步数据我们首先需要安装好对应的插件,然后下载对应的数据库链接jar包,下面是具体的步骤 1.进入容器中 docker exec it logstash bash 2.进入到bin 目录下,我这里是/usr/share/logstash/bin,可以看到logstash-plugin文件,然后安装插件 logstash-plugin install logstash-input-jdbc 3.看到如下输出,则表示安装

【记录】ELK之logstash同步mysql数据到Elasticsearch ,配置文件详解

本文出处:https://my.oschina.net/xiaowangqiongyou/blog/1812708#comments 截取部分内容以便学习 input { jdbc { # mysql相关jdbc配置 jdbc_connection_string => "jdbc:mysql://10.112.76.30:3306/jack_test?useUnicode=true&characterEncoding=utf-8&useSSL=false" jdb

elasticsearch5.2.1使用logstash同步mysql

centos 本人亲测可以  首先安装好mysql,elasticsearch   不懂的请参考另一篇文章 安装logstash官方:https://www.elastic.co/guide/en/logstash/current/installing-logstash.html1.下载公共密钥rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch2.添加yum源vim  /etc/yum.repos.d/logstash.

使用Logstash同步数据至Elasticsearch,Spring Boot中集成Elasticsearch实现搜索

安装logstash.同步数据至ElasticSearch 为什么使用logstash来同步,CSDN上有一篇文章简要的分析了以下几种同步工具的优缺点:https://blog.csdn.net/laoyang360/article/details/51694519. 下面开始实践: 1. 下载Logstash安装包,需要注意版本与elasticsearch保持一致,windows系统下直接解压即可. 2.添加同步mysql数据库的配置,并将mysql连接驱动jar包放在指定的配置目录 注: 目

logstash增量同步mysql数据到es

本篇本章地址:https://www.cnblogs.com/Thehorse/p/11601013.html 今天我们来讲一下logstash同步mysql数据到es 我认为呢,logstash是众多同步mysql数据到es的插件中,最稳定并且最容易配置的一个. input { stdin { } jdbc { type => "xxx" jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/yinhela

elasticsearch -- Logstash实现mysql同步数据到elasticsearch

配置 安装插件由于这里是从mysql同步数据到elasticsearch,所以需要安装jdbc的入插件和elasticsearch的出插件:logstash-input-jdbc.logstash-output-elasticsearch安装效果图如下所示: 下载mysql连接库由于logstash是ruby开发的,所以这里要下载mysql的连接库jar包,从官网下载,我这里下载的是:mysql-connector-java-5.1.46.jar将下载好的mysql-connector-java

【ElasticSearch】---ElasticSearch同步Mysql

ElasticSearch同步Mysql 先讲项目需求:对于资讯模块添加搜索功能 这个搜索功能我就是采用ElasticSearch实现的,功能刚实现完,所以写这篇博客做个记录,让自己在记录下整个步骤和过程中的一些注意事项. 一.安装elasticsearch和可视化工具 有关整个教程参考:mac安装elasticsearch和可视化工具 1.安装elasticsearch 网址地址:官网 2.安装elasticsearch-head(可视化界面) 安装地址:https://github.com/

Elasticsearch学习-----第二章 windows环境下Elasticsearch同步mysql数据库

在上一章中,我们已经能够通过spring boot来使用Elasticsearch,但是由于我们习惯性的将数据写入mysql,所以为了解决这个问题,Elasticsearch为我们提供了一个插件logstash来同步我们的数据库.本文所有的安装环境和使用环境都是在windows系统下进行的. 一.logstash的安装 首先在官网上下载logstash: logstash下载地址:https://www.elastic.co/downloads/logstash 需要注意的是logstash的版

Logstash配置同步mysql到es配置

关于logstash安装:https://www.cnblogs.com/toov5/p/10301727.html Logstash是一个开源数据收集引擎,具有实时管道功能.Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地 下面进一步详细说配置: jdbc_driver_library: jdbc mysql 驱动的路径,在上一步中已经下载 jdbc_driver_class: 驱动类的名字,mysql 填 com.mysql.jdbc.Driver