一、安装Elasticsearch下载
https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html
二、在MySQL上新建库表(若已有库表可跳过)
建库脚本如下:
SET FOREIGN_KEY_CHECKS=0;
-- ----------------------------
-- Table structure for contacts
-- ----------------------------
DROP TABLE IF EXISTS `contacts`;
CREATE TABLE `contacts` (
`uid` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`email` varchar(80) NOT NULL,
`first_name` varchar(80) NOT NULL,
`last_name` varchar(80) NOT NULL,
UNIQUE KEY `uid` (`uid`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of contacts
-- ----------------------------
INSERT INTO `contacts` VALUES (‘1‘, ‘[email protected]‘, ‘Jim‘, ‘Smith‘);
INSERT INTO `contacts` VALUES (‘2‘, ‘‘, ‘John‘, ‘Smith‘);
INSERT INTO `contacts` VALUES (‘3‘, ‘[email protected]‘, ‘Carole‘, ‘Smith‘);
INSERT INTO `contacts` VALUES (‘4‘, ‘[email protected]‘, ‘Sam‘, ‘Smith‘);
INSERT INTO `contacts` VALUES (‘5‘, ‘[email protected]‘, ‘Rick‘, ‘Roses‘);
INSERT INTO `contacts` VALUES (‘6‘, ‘[email protected]‘, ‘Sum‘, ‘OKFord‘);
-- ----------------------------
-- Table structure for user
-- ----------------------------
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
`USERID` bigint(20) NOT NULL,
`USERNAME` varchar(20) DEFAULT NULL,
`AGE` int(3) unsigned DEFAULT NULL,
`SEX` tinyint(1) DEFAULT NULL,
PRIMARY KEY (`USERID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of user
-- ----------------------------
INSERT INTO `user` VALUES (‘201609210935231‘, ‘Quejs‘, ‘24‘, ‘1‘);
INSERT INTO `user` VALUES (‘201609210936233‘, ‘Smith‘, ‘24‘, ‘1‘);
INSERT INTO `user` VALUES (‘201609210938234‘, ‘Mends‘, ‘26‘, ‘0‘);
INSERT INTO `user` VALUES (‘201609210947235‘, ‘wisd‘, ‘38‘, ‘0‘);
INSERT INTO `user` VALUES (‘201609210948236‘, ‘Losed‘, ‘35‘, ‘0‘);
SET FOREIGN_KEY_CHECKS=1;
三、安装Logstash
下载
四、安装logstash-input-jdbc和logstash-output-elasticsearch插件
五、新建配置文件
1.需要将MYSQL的jar文件复制到Logstash安装目录下,示例中放在了/home/manager/app/logstash-2.4.0下。如图:
2.在目录/home/manager/app/logstash-2.4.0/bin下新建config文件夹。
命令:切换到/home/manager/app/logstash-2.4.0/bin目录下,mkdir config
3.在目录/home/manager/app/logstash-2.4.0/bin/config/下新建conf目录,命令:mkdir conf
4.新建两个配置文件contacts.conf和user.conf。
下载
user.conf内容:
input {
stdin {
}
jdbc {
# mysql jdbc connection string to our backup databse
jdbc_connection_string => "jdbc:mysql://localhost:3306/mytest"
# the user we wish to excute our statement as
jdbc_user => "manager"
jdbc_password => "[email protected]"
# the path to our downloaded jdbc driver
jdbc_driver_library => "/home/manager/app/logstash-2.4.0/mysql-connector-java-5.1.22.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement_filepath => "/home/manager/app/logstash-2.4.0/bin/config/sql/user.sql"
schedule => "* * * * *"
type => "user"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
if[type]=="user"{
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "mytest"
document_type => "user"
document_id => "%{userid}"
workers => 1
flush_size => 20000
idle_flush_time => 10
template_overwrite => true
}
stdout {
codec => json_lines
}
}
}
contacts.conf内容:
input {
stdin {
}
jdbc {
# mysql jdbc connection string to our backup databse
jdbc_connection_string => "jdbc:mysql://localhost:3306/mytest"
# the user we wish to excute our statement as
jdbc_user => "manager"
jdbc_password => "[email protected]"
# the path to our downloaded jdbc driver
jdbc_driver_library => "/home/manager/app/logstash-2.4.0/mysql-connector-java-5.1.22.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement_filepath => "/home/manager/app/logstash-2.4.0/bin/config/sql/user.sql"
schedule => "* * * * *"
type => "user"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
if[type]=="user"{
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "mytest"
document_type => "user"
document_id => "%{userid}"
workers => 1
flush_size => 20000
idle_flush_time => 10
template_overwrite => true
}
stdout {
codec => json_lines
}
}
}
六、启动,验证
1.到Logstash bin目录下启动Logstash:nohup ./logstash -f ./config/conf/ >/dev/null 2>&1 &
2.验证数据是否同步过来了
下载
:curl -XGET ‘http://localhost:9200/mytest/contacts/_search?pretty‘
七、注意事项
1.只能逻辑删除,物理删除同步不了。
2.以上示例是全量同步,增量的需要每张表加timespan字段。
3.性能还得进行压测,目前先实现第一步。