Mysql 流增量写入 Hdfs(一) --从 mysql 到 kafka

一. 概述

在大数据的静态数据处理中,目前普遍采用的是用 Spark + Hdfs (Hive / Hbase) 的技术架构来对数据进行处理。

但有时候有其他的需求,需要从其他不同数据源不间断得采集数据,然后存储到 Hdfs 中进行处理。而追加(append)这种操作在 Hdfs 里面明显是比较麻烦的一件事。所幸有了 Storm 这么个流数据处理这样的东西问世,可以帮我们解决这些问题。

不过光有 Storm 还不够,我们还需要其他中间件来协助我们,让所有其他数据源都归于一个通道。这样就能实现不同数据源以及 Hhdfs 之间的解耦。而这个中间件 Kafka 无疑是一个很好的选择。

这样我们就可以让 Mysql 的增量数据不停得抛出到 Kafka ,而后再让 storm 不停得从 Kafka 对应的 Topic 读取数据并写入到 Hdfs 中。

二. 基本知识

2.1 Mysql binlog 介绍

binlog 即 Mysql 的二进制日志。它可以说是 Mysql 最重要的日志了,它记录了所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。

上面所说的提到了 DDL 和 DML ,可能有些同学不了解,这里顺便说一下:

  • DDL:数据定义语言DDL用来创建数据库中的各种对象-----表、视图、索引、同义词、聚簇等如:CREATE TABLE/VIEW/INDEX/SYN/CLUSTER...
  • DML:数据操纵语言DML主要有三种形式:插入(INSERT), 更新(UPDATE),以及删除(DELETE)。

在 Mysql 中,binlog 默认是不开启的,因为有大约 1% (官方说法)的性能损耗,如果要手动开启,流程如下:

  1. vi编辑打开mysql配置文件:
vi /usr/local/mysql/etc/my.cnf

在[mysqld] 区块设置/添加如下,

log-bin=mysql-bin 

注意一定要在 [mysqld] 下。

  1. 重启 Mysql
pkill mysqld
/usr/local/mysql/bin/mysqld_safe --user=mysql &

2.2 kafka

这里只对 Kafka 做一个基本的介绍,更多的内容可以度娘一波。

上面的图片是 kafka 官方的一个图片,我们目前只需要关注 Producers 和 Consumers 就行了。

Kafka 是一个分布式发布-订阅消息系统。分布式方面由 Zookeeper 进行协同处理。消息订阅其实说白了吧,就是一个队列,分为消费者和生产者,就像上图中的内容,有数据源充当 Producer 生产数据到 kafka 中,而有数据充当 Consumers ,消费 kafka 中的数据。

上图中的 offset 指的是数据的写入以及消费的位置的信息,这是由 Zookeeper 管理的。也就是说,当 Consumers 重启或是怎样,需要重新从 kafka 读取消息时,总不能让它从头开始消费数据吧,这时候就需要有个记录能告诉你从哪里开始重新读取。这就是 offset 。

kafka 中还有一个至关重要的概念,那就是 topic 。不过这个其实还是很好理解的,比如你要订阅一些消息,你肯定是不会订阅所有消息的吧,你只需要订阅你感兴趣的主题,比如摄影,编程,搞笑这些主题。而这里主题的概念其实和 topic 是一样的。总之,可以将 topic 归结为通道,kafka 中有很多个通道,不同的 Producer 向其中一个通道生产数据,也就是抛数据进去这个通道,Comsumers 不停得消费通道中的数据。

而我们要做的就是将 Mysql binlog 产生的数据抛到 kafka 中充当作生产者,然后由 storm 充当消费者,不停得消费数据并写入到 Hdfs 中。

至于怎么将 binlog 的数据抛到 kafka ,别急,下面我们就来介绍。

2.3 maxwell

maxwell 这个工具可以很方便得监听 Mysql 的 binlog ,然后每当 binlog 发生变化时,就会以 json 格式抛出对应的变化数据到 Kafka 中。比如当向 mysql 一张表中插入一条语句的时候,maxwell 就会立刻监听到 binlog 中有对应的记录增加,然后将一些信息包括插入的数据都转化成 json 格式,然后抛到 kafka 指定的 topic 中。

下载地址在这里可以找到。

除了 Kafka 外,其实 maxwell 还支持写入到其他各种中间件,比如 redis。

同时 maxwell 是比较轻量级的工具,只需要在 mysql 中新建一个数据库供它记录一些信息,然后就可以直接运行。

三. 使用 maxwell 监听 binlog

接下来我们将的是如果使用 maxwell ,让它监听 mysql 的 binlog 并抛到 kafka 中。maxwell 主要有两种运行方式。一种是使用配置文件,另一种则是在命令行中添加参数的方式运行。这里追求方便,只使用命令行的方式进行演示。

这里介绍一下简单的将数据抛到 kafka 的命令行脚本吧。

bin/maxwell --user=‘maxwell‘ --password=‘XXXXXX‘ --host=‘127.0.0.1‘    --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=maxwell --port=3306

各项参数说明如下

  • user:mysql 用户名
  • password:mysql 密码
  • host:Mysql 地址
  • producer:指定写入的中间件类型,比如还有 redies
  • kafka.bootstrap.servers:kafka 的地址
  • kafka_topic:指明写入到 kafka 哪个 topic
  • port:mysql 端口

启动之后,maxwell 便开始工作了,当然如果你想要让这条命令可以在后台运行的话,可以使用 Linux 的 nohup 命令,这里就不多赘述,有需要百度即可。

这样配置的话通常会将整个数据库的增删改都给抛到 kafka ,但这样的需求显然不常见,更常见的应该是具体监听对某个库的操作,或是某个表的操作。

在升级到 1.9.2(最新版本)后,maxwell 为我们提供这样一个参数,让我们可以轻松实现上述需求:--filter

这个参数通常包含两个配置项,exclude 和 include。意思就是让你指定排除哪些和包含哪些。比如我只想监听 Adatabase 库下的 Atable 表的变化。我可以这样。

--filter=‘exclude: *.*, include: Adatabase.Atable‘

这样我们就可以轻松实现监听 mysql binlog 的变化,并可以定制自己的需求。

OK,这一章我们介绍了 mysql binlog ,kafka 以及 maxwell 的一些内容,下一篇我们将会看到 storm 如何写入 hdfs 以及定制一些策略。see you~~

原文地址:https://www.cnblogs.com/listenfwind/p/10089082.html

时间: 2024-11-10 08:49:28

Mysql 流增量写入 Hdfs(一) --从 mysql 到 kafka的相关文章

MySQL通用批量写入工具(Python)

背景 平台目前的分析任务主要以Hive为主,分析后的结果存储在HDFS,用户通过REST API或者Rsync的方式获取分析结果,这样的方式带来以下几个问题: (1)任务执行结束时间未知,用户必须自行编写代码不断地通过REST API请求分析结果,直至获取到分析结果为止,其中还需要处理分析结果过大,转而通过Rsync方式获取: (2)受限于Hive SQL的表达能力,用户的计算逻辑无法完全表述,获取分析结果后需要再计算,然后入库: (3)基于(1).(2)的原因,用户编写大量复杂且冗余的代码处理

MySQL 数据库增量数据恢复案例

一.场景概述 MySQL数据库每日零点自动全备 某天上午10点,小明莫名其妙地drop了一个数据库 我们需要通过全备的数据文件,以及增量的binlog文件进行数据恢复 二.主要思想 利用全备的sql文件中记录的CHANGE MASTER语句,binlog文件及其位置点信息,找出binlog文件增量的部分 用mysqlbinlog命令将上述的binlog文件导出为sql文件,并剔除其中的drop语句 通过全备文件和增量binlog文件的导出sql文件,就可以恢复到完整的数据 三.过程示意图 四.操

mysql数据库增量恢复多实例实战演示

mysql主从复制原理要点 1.异步方式同步 2.逻辑同步模式,多种模式,默认是通过sql语句执行 3.主库通过记录binlog实现对从库的同步,binlog记录数据库的更新语句 4.主库1个IO线程,从库由1个IO线程和一个sql线程来完成的 5.从库关键文件master.info,relay-log,relay-info功能 6.如果从库还想级联从库,需要打开bin-log和log-slave-updates参数 [[email protected] ~]# mysql -uroot -po

用mapreduce实现将mysql数据导出到HDFS上

因为业务需要,需要将一批mysql数据导入到HBASE,现在先将数据从Mysql导出到HDFS. 版本:hadoop CDH4.5,Hbase-0.946 1.实体类 YqBean 是我的实体类,请根据自己需要修改,实体类需要 implements Writable, DBWritable. 2.MR实现 import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configurati

sqoop从hdfs导出到mysql

create database logs; use logs create table weblogs_from_hdfs( md5 varchar(32), url varchar(64), request_date date, request_time time, ip varchar(15)); sqoop export -m 1 --connect jdbc:mysql://hadoop:3306/logs --username root --password root --table

Mysql DBA 高级运维学习笔记-Mysql增量备份及数据恢复实战

大家好本文讲了mysql数据库的增量恢复.老男孩老师讲的很好很细,我自己呢整理了老师讲的部分内容,请大家将就看:文章内容比较多.比较乱,请大家认真总结知识,发现问题请及时批评指点~ 1.1 MySQL数据库的备份与恢复 1.1.1备份单个数据库练习多种参数使用 Mysql数据库自带了一个很好用的备份命令,就是mysqldump, 它的基本使用如下: 语法:mysqldump –u用户名 –p密码 数据库名>备份的文件名 范烈1:备份名字为cuizhong的库 a.查看备份前的数据 [[email

mysql实时增量备份

采用binlog日志的好处 掌控所有更改操作,必要时可用于恢复数据 数据库主从复制的必要条件 [[email protected]~]# vim /etc/my.cnf [mysqld] .. .. log-bin=mysql-bin //启用二进制日志,并指定前缀 .. ..[[email protected] ~]# service mysqld restart 确认binlog日志文件新启用binlog后,每次启动MySQl服务都会新生成一份日志文件: [[email protected]

zookeeper,kafka,jstorm,memcached,mysql流式数据处理平台部署

一.平台环境介绍: 1.系统信息: 项目 信息 系统版本: Ubuntu14.04.2 LTS \n \l 用户: ***** 密码: ****** Java环境: openjdk-7-jre 语言: en_US.UTF-8,en_US:en 磁盘: 每台vda为系统盘(50G),vdb为数据盘(200G)挂载于/storage目录 hcloud15最为DB,第二块磁盘为2000G 主机范围: 192.168.21.7~192.168.21.15,192.168.21.17,192.168.21

mysql备份-增量备份级数据恢复

mysqldump -u 用户名 -p密码 mysqldump -h 127.0.0.1 -u root -P3003 --events -p mysql > /tmp/mysql.bak.sql                                   ###因为mysqldump默认是不备份事件表的,只有加了--events 才会解决加上 egrep -v "#|\*|--|^$" /tmp/mysql.bak.sql