Kafka connect in practice(2): distributed mode mysql binlog ->kafka->hive

In the previous post Kafka connect in practice(1): standalone, I have introduced about the basics of kafka connect  configuration and demonstrate a local standalone demo. In this post we will show the knowledge about distributed data pull an sink. To start, do make sure the kafka broker and zookeeper are started!

1. configuration

vim $KAFKA_HOME/bin/connect-distributed.properties

set contents of this file as:

# These are defaults. This file just demonstrates how to override some settings.
#A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
#The client will make use of all servers irrespective of which servers are specified here for bootstrapping—
#this list only impacts the initial hosts used to discover the full set of servers.
#This list should be in the form host1:port1,host2:port2,....
#Since these servers are just used for the initial connection to discover the full cluster membership
#(which may change dynamically), this list need not contain the full set of servers (you may want more than one, though,
#in case a server is down).
#notes: this configuration is required.
bootstrap.servers=localhost:9092

# unique name for the cluster, used in forming the Connect cluster group.
#Note that this must not conflict with consumer group IDs.
#notes: this configuration is required.
group.id=connect-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
#Converter class for key Connect data. This controls the format of the data that will be written to Kafka for source connectors or read from Kafka for sink connectors.
#Popular formats include Avro and JSON.
#notes: this configuration is required.
key.converter=org.apache.kafka.connect.json.JsonConverter

#Converter class for value Connect data. This controls the format of the data that will be written to
#Kafka for source connectors or read from Kafka for sink connectors. Popular formats include Avro and JSON.
#notes: this configuration is required.
value.converter=org.apache.kafka.connect.json.JsonConverter

# Converter-specific settings can be passed in by prefixing the Converter‘s setting with the converter we want to apply
# it to,
#if we want to see the schema of the message ,we can turn on the *.converter.schemas.enable
#vice, if we don‘t wana to see the schema of the message, we should turn of the *.converter.schemas.enable
# generally speaking, if dev and testing env, we can turn the following attributes on for tracking consideration,
# and turned off in production consideration for network and disk capacity usage.
key.converter.schemas.enable=false
value.converter.schemas.enable=false

#The name of the topic where connector and task configuration offsets are stored. This must be the same for all workers with
#the same group.id. Kafka Connect will upon startup attempt to automatically create this topic with multiple partitions and
#a compacted cleanup policy to avoid losing data, but it will simply use the topic if it already exists. If you choose to
#create this topic manually, always create it as a compacted, highly replicated (3x or more) topic with a large number of
#partitions (e.g., 25 or 50, just like Kafka‘s built-in __consumer_offsets topic) to support large Kafka Connect clusters.
offset.storage.topic=connect-offsets
#The replication factor used when Connect creates the topic used to store connector offsets. This should always be at least
# 3 for a production system, but cannot be larger than the number of Kafka brokers in the cluster.
offset.storage.replication.factor=1
#The number of partitions used when Connect creates the topic used to store connector offsets. A large value (e.g., 25 or 50,
#just like Kafka‘s built-in __consumer_offsets topic) is necessary to support large Kafka Connect clusters.
offset.storage.partitions=50

#The name of the topic where connector and task configuration data are stored. This must be the same for all workers with
#the same group.id. Kafka Connect will upon startup attempt to automatically create this topic with a single-partition and
#ompacted cleanup policy to avoid losing data, but it will simply use the topic if it already exists. If you choose to create
#this topic manually, always create it as a compacted topic with a single partition and a high replication factor (3x or more).
config.storage.topic=connect-configs
#The replication factor used when Kafka Connects creates the topic used to store connector and task configuration data.
#This should always be at least 3 for a production system, but cannot be larger than the number of Kafka brokers in the cluster.
config.storage.replication.factor=1
config.storage.partitions=1

#The name of the topic where connector and task configuration status updates are stored. This must be the same for all workers with
#the same group.id. Kafka Connect will upon startup attempt to automatically create this topic with multiple partitions and a compacted
#cleanup policy to avoid losing data, but it will simply use the topic if it already exists. If you choose to create this topic manually,
#always create it as a compacted, highly replicated (3x or more) topic with multiple partitions.
status.storage.topic=connect-status
#The replication factor used when Connect creates the topic used to store connector and task status updates. This should always be at least 3
#for a production system, but cannot be larger than the number of Kafka brokers in the cluster.
status.storage.replication.factor=1
#The number of partitions used when Connect creates the topic used to store connector and task status updates.
status.storage.partitions=10

# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
#Hostname for the REST API. If this is set, it will only bind to this interface.
#notes: this configuration is optional
rest.host.name=localhost
#Port for the REST API to listen on.
#notes: this configuration is optional
rest.port=8083

# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=
#rest.advertised.port=
rest.advertised.host.name=127.0.0.1
rest.advertised.port=8083

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
plugin.path=/home/lenmom/workspace/software/kafka_2.11-2.1.0/connect

#Converter class for internal key Connect data that implements the Converter interface. Used for converting data like offsets and configs.
#notes: this configuration is optional
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
#Converter class for offset value Connect data that implements the Converter interface. Used for converting data like offsets and configs.
#notes: this configuration is optional
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
#notes: this configuration is optional
task.shutdown.graceful.timeout.ms=10000
#notes: this configuration is optional
offset.flush.timeout.ms=5000

2. download a debezium-connector-mysql plugin tarbar and unzip it into the the folder defined at the end of connect-distributed.properties

2. start mysql server with bin-log enabled

for detail please refer my previous blog post mysql 5.7 enable binlog

3. start the kafka connect distributed

sh $KAFKA_HOME/connect-distributed.sh   $KAFKA_HOME/config/connect-distributed.properties
#or start in background
#sh $KAFKA_HOME/connect-distributed.sh  --daemon  $KAFKA_HOME/config/connect-distributed.properties  

it starts with the following screenshot.

be aware that the INFO Added aliases ‘MySqlConnector‘ and ‘MySql‘ to plugin ‘io.debezium.connector.mysql.MySqlConnector‘ (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:390), our target plugin has been loaded by the kafka connect distributed.

4. create a the demo database in mysql 

# In production you would almost certainly limit the replication user must be on the follower (slave) machine,
# to prevent other clients accessing the log from other machines. For example, ‘replicator‘@‘follower.acme.com‘.
#
# However, this grant is equivalent to specifying *any* hosts, which makes this easier since the docker host
# is not easily known to the Docker container. But don‘t do this in production.
#
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO ‘replicator‘ IDENTIFIED BY ‘replpass‘;
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT  ON *.* TO ‘debezium‘ IDENTIFIED BY ‘dbz‘;

# Create the database that we‘ll use to populate data and watch the effect in the binlog
DROP DATABASE if exists inventory;
CREATE DATABASE if not exists inventory;
GRANT ALL PRIVILEGES ON inventory.* TO ‘root‘@‘%‘;

# Switch to this database
USE inventory;

# Create and populate our products using a single insert with many rows
CREATE TABLE products (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512),
  weight FLOAT
);
ALTER TABLE products AUTO_INCREMENT = 101;

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter",3.14),
       (default,"car battery","12V car battery",8.1),
       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8),
       (default,"hammer","12oz carpenter‘s hammer",0.75),
       (default,"hammer","14oz carpenter‘s hammer",0.875),
       (default,"hammer","16oz carpenter‘s hammer",1.0),
       (default,"rocks","box of assorted rocks",5.3),
       (default,"jacket","water resistent black wind breaker",0.1),
       (default,"spare tire","24 inch spare tire",22.2);

# Create and populate the products on hand using multiple inserts
CREATE TABLE products_on_hand (
  product_id INTEGER NOT NULL PRIMARY KEY,
  quantity INTEGER NOT NULL,
  FOREIGN KEY (product_id) REFERENCES products(id)
);

INSERT INTO products_on_hand VALUES (101,3);
INSERT INTO products_on_hand VALUES (102,8);
INSERT INTO products_on_hand VALUES (103,18);
INSERT INTO products_on_hand VALUES (104,4);
INSERT INTO products_on_hand VALUES (105,5);
INSERT INTO products_on_hand VALUES (106,0);
INSERT INTO products_on_hand VALUES (107,44);
INSERT INTO products_on_hand VALUES (108,2);
INSERT INTO products_on_hand VALUES (109,5);

# Create some customers ...
CREATE TABLE customers (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;

INSERT INTO customers
VALUES (default,"Sally","Thomas","[email protected]"),
       (default,"George","Bailey","[email protected]"),
       (default,"Edward","Walker","[email protected]"),
       (default,"Anne","Kretchmar","[email protected]");

# Create some veyr simple orders
CREATE TABLE orders (
  order_number INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATE NOT NULL,
  purchaser INTEGER NOT NULL,
  quantity INTEGER NOT NULL,
  product_id INTEGER NOT NULL,
  FOREIGN KEY order_customer (purchaser) REFERENCES customers(id),
  FOREIGN KEY ordered_product (product_id) REFERENCES products(id)
) AUTO_INCREMENT = 10001;

INSERT INTO orders
VALUES (default, ‘2016-01-16‘, 1001, 1, 102),
       (default, ‘2016-01-17‘, 1002, 2, 105),
       (default, ‘2016-02-19‘, 1002, 2, 106),
       (default, ‘2016-02-21‘, 1003, 1, 107);

5. Register a mysql connector using kafka connector rest api

5.1 check connector version

curl -H "Accept:application/json" localhost:8083/

output

{"version":"2.1.0","commit":"809be928f1ae004e","kafka_cluster_id":"NGQRxNZMSY6Q53ktQABHsQ"}

5.2 get current connector list

[email protected]:~/$ curl -H "Accept:application/json" localhost:8083/connectors/
[]

the ouput indicates there‘s no connector in the distributed connector.

5.3 registet a mysql connector instance to in the distributed connector

$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d ‘{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "127.0.0.1", "database.port": "3306", "database.user": "root", "database.password": "root", "database.server.id": "184054", "database.server.name": "127.0.0.1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "127.0.0.1:9092", "database.history.kafka.topic": "dbhistory.inventory" } }‘

the formated content describes as follows for readable consideration

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "127.0.0.1",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "root",
    "database.server.id": "184054",
    "database.server.name": "127.0.0.1",
    "database.whitelist": "inventory",
    "database.history.kafka.bootstrap.servers": "127.0.0.1:9092",
    "database.history.kafka.topic": "schema-changes.inventory"
  }
}

the response of the post request show as follows:

HTTP/1.1 201 Created
Date: Tue, 07 Feb 2017 20:49:34 GMT
Location: http://localhost:8083/connectors/inventory-connector
Content-Type: application/json
Content-Length: 471
Server: Jetty(9.2.15.v20160210)

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "root",
    "database.server.id": "184054",
    "database.server.name": "127.0.0.1",
    "database.whitelist": "inventory",
    "database.history.kafka.bootstrap.servers": "127.0.0.1:9092",
    "database.history.kafka.topic": "dbhistory.inventory",
    "name": "inventory-connector"
  },
  "tasks": []
}

 5.4 get connector using curl again, there should exist a connector since we have just registered one.

curl -H "Accept:application/json" localhost:8083/connectors/

output

["inventory-connector"]

5.5 get connector detail

curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector

output

HTTP/1.1 200 OK
Date: Wed, 24 Apr 2019 10:15:16 GMT
Content-Type: application/json
Content-Length: 536
Server: Jetty(9.4.12.v20180830)

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.user": "root",
    "database.server.id": "184054",
    "tasks.max": "1",
    "database.history.kafka.bootstrap.servers": "127.0.0.1:9092",
    "database.history.kafka.topic": "dbhistory.inventory",
    "database.server.name": "127.0.0.1",
    "database.port": "3306",
    "database.hostname": "127.0.0.1",
    "database.password": "root",
    "name": "inventory-connector",
    "database.whitelist": "inventory"
  },
  "tasks": [
    {
      "connector": "inventory-connector",
      "task": 0
    }
  ],
  "type": "source"
}

the "task":0 indicate there is one task with id 0.

5.5 list the kafka topics 

sh $KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --list 

output

127.0.0.1
127.0.0.1.inventory.customers
127.0.0.1.inventory.orders
127.0.0.1.inventory.products
127.0.0.1.inventory.products_on_hand
__consumer_offsets
connect-configs
connect-offsets
connect-status
connect-test
dbhistory.inventory

and this indicate the mysql connector has started watching the mysql data changes and begin to push the changed data to kafka broker.

 5.6 abserve the data in the kafka topic

sh $KAFKA_HOME/bin/kafka-console-consumer.sh  --bootstrap-server localhost:9092 --topic 127.0.0.1.inventory.customers --from-beginning   # all the data changes for  table customers in database inventory would be listed

output:

{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"[email protected]"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000012","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1556100441651}
{"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"[email protected]"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000012","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1556100441651}
{"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"[email protected]"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000012","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1556100441651}
{"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"[email protected]"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000012","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1556100441651}

let‘s take a record as formated as example,say the first one.

{
  "before": null,
  "after": {
    "id": 1001,
    "first_name": "Sally",
    "last_name": "Thomas",
    "email": "[email protected]"
  },
  "source": {
    "version": "0.9.4.Final",
    "connector": "mysql",
    "name": "127.0.0.1",
    "server_id": 0,
    "ts_sec": 0,
    "gtid": null,
    "file": "mysql-bin.000012",
    "pos": 154,
    "row": 0,
    "snapshot": true,
    "thread": null,
    "db": "inventory",
    "table": "customers",
    "query": null
  },
  "op": "c",
  "ts_ms": 1556100441651
}

the op field indicate the data change type:

  • c: insert a record into the database.   if c, the before element would be null.
  • d: delete a record in the database.    if d, the after element would be null
  • u:update a record in the database.   the before element indicate the data in the database when the update take action, and the after element indicate the data after the update take action.

let‘s do some update and delete operations in mysql database to see the changed data captured in kafka.

mysql> select * from customers;
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | [email protected] |
| 1002 | George     | Bailey    | [email protected]    |
| 1003 | Edward     | Walker    | [email protected]         |
| 1004 | Anne       | Kretchmar | [email protected]    |
+------+------------+-----------+-----------------------+
4 rows in set (0.00 sec)

mysql> update customers set first_name=‘1234‘ where id=1004;
Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0

mysql> delete from customers where id=1004;
Query OK, 1 row affected (0.01 sec)

mysql> select * from customers;
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | [email protected] |
| 1002 | George     | Bailey    | [email protected]    |
| 1003 | Edward     | Walker    | [email protected]         |
+------+------------+-----------+-----------------------+
3 rows in set (0.00 sec)

as it shows, we first update the record with id=1004 field first_name to 1234, and then delete the record.

kafka record:

{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"[email protected]"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000012","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1556100441651}
{"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"[email protected]"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000012","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1556100441651}
{"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"[email protected]"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000012","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1556100441651}
{"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"[email protected]"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000012","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1556100441651}
{"before":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"[email protected]"},"after":{"id":1004,"first_name":"1234","last_name":"Kretchmar","email":"[email protected]"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":223344,"ts_sec":1556102881,"gtid":null,"file":"mysql-bin.000012","pos":364,"row":0,"snapshot":false,"thread":2,"db":"inventory","table":"customers","query":null},"op":"u","ts_ms":1556102881385}
{"before":{"id":1004,"first_name":"1234","last_name":"Kretchmar","email":"[email protected]"},"after":null,"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":223344,"ts_sec":1556102900,"gtid":null,"file":"mysql-bin.000012","pos":725,"row":0,"snapshot":false,"thread":2,"db":"inventory","table":"customers","query":null},"op":"d","ts_ms":1556102900269}
null

there‘s two more record in the kafka related topic.

update:

{
  "before": {
    "id": 1004,
    "first_name": "Anne",
    "last_name": "Kretchmar",
    "email": "[email protected]"
  },
  "after": {
    "id": 1004,
    "first_name": "1234",
    "last_name": "Kretchmar",
    "email": "[email protected]"
  },
  "source": {
    "version": "0.9.4.Final",
    "connector": "mysql",
    "name": "127.0.0.1",
    "server_id": 223344,
    "ts_sec": 1556102881,
    "gtid": null,
    "file": "mysql-bin.000012",
    "pos": 364,
    "row": 0,
    "snapshot": false,
    "thread": 2,
    "db": "inventory",
    "table": "customers",
    "query": null
  },
  "op": "u",
  "ts_ms": 1556102881385
}

delete:

{
  "before": {
    "id": 1004,
    "first_name": "1234",
    "last_name": "Kretchmar",
    "email": "[email protected]"
  },
  "after": null,
  "source": {
    "version": "0.9.4.Final",
    "connector": "mysql",
    "name": "127.0.0.1",
    "server_id": 223344,
    "ts_sec": 1556102900,
    "gtid": null,
    "file": "mysql-bin.000012",
    "pos": 725,
    "row": 0,
    "snapshot": false,
    "thread": 2,
    "db": "inventory",
    "table": "customers",
    "query": null
  },
  "op": "d",
  "ts_ms": 1556102900269
}

if we stop the mysql connector or restart kafka broker, we should see the data still exist in the kafka since it persistent message offsets and data in kafka‘s specific topic we configed in connect-distributed.properties

6. register hdfs   connector plugin in distributed connect

原文地址:https://www.cnblogs.com/lenmom/p/10763589.html

时间: 2024-10-05 05:50:10

Kafka connect in practice(2): distributed mode mysql binlog ->kafka->hive的相关文章

Apache Kafka系列(五) Kafka Connect及FileConnector示例

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka系列(三) Java API使用 Apache Kafka系列(四) 多线程Consumer方案 Apache Kafka系列(五) Kafka Connect及FileConnector示例 一. Kafka Connect简介 Kafka是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析).为何集成其他系统和解耦应用,经常使用Producer来发送消

Kafka Connect HDFS

概述 Kafka 的数据如何传输到HDFS?如果仔细思考,会发现这个问题并不简单. 不妨先想一下这两个问题? 1)为什么要将Kafka的数据传输到HDFS上? 2)为什么不直接写HDFS而要通过Kafka? HDFS一直以来是为离线数据的存储和计算设计的,因此对实时事件数据的写入并不友好,而Kafka生来就是为实时数据设计的,但是数据在Kafka上无法使用离线计算框架来作批量离线分析. 那么,Kafka为什么就不能支持批量离线分析呢?想象我们将Kafka的数据按天拆分topic,并建足够多的分区

4 kafka集群部署及生产者java客户端编程 + kafka消费者java客户端编程

本博文的主要内容有   kafka的单机模式部署 kafka的分布式模式部署 生产者java客户端编程 消费者java客户端编程 运行kafka ,需要依赖 zookeeper,你可以使用已有的 zookeeper 集群或者利用 kafka自带的zookeeper. 单机模式,用的是kafka自带的zookeeper, 分布式模式,用的是外部安装的zookeeper,即公共的zookeeper. Step 6: Setting up a multi-broker cluster So far w

Build an ETL Pipeline With Kafka Connect via JDBC Connectors

This article is an in-depth tutorial for using Kafka to move data from PostgreSQL to Hadoop HDFS via JDBC connections. Read this eGuide to discover the fundamental differences between iPaaS and dPaaS and how the innovative approach of dPaaS gets to t

Kafka: Connect

Kafka Connect 简介 Kafka Connect 是一个可以在Kafka与其他系统之间提供可靠的.易于扩展的数据流处理工具.使用它能够使得数据进出Kafka变得很简单.Kafka Connect有如下特性: ·是一个通用的构造kafka connector的框架 ·有单机.分布式两种模式.开发时建议使用单机模式,生产环境下使用分布式模式. ·提供restful的管理connector的API. ·自动化的offset管理.Kafka Connect自动的管理offset提交. ·分布

Kafka Connect REST Interface

Since Kafka Connect is intended to be run as a service, it also supports a REST API for managing connectors. By default this service runs on port 8083. When executed in distributed mode, the REST API will be the primary interface to the cluster. You

Kafka Connect Details 详解

目录 1. Kafka Connect Details 详解 1.1. 概览 1.2. 启动和配置 1.2.1. Standalone 单机模式 1.2.2. Distribute 分布式模式 1.2.3. Connector的配置 1.3. Transformations 转换器 1.4. REST API 1.5. Kafka Connect 开发详解 1.6. Kafka Connect VS Producer Consumer 1.6.1. Kafka Connect的优点 1.7. 第

Mysql 流增量写入 Hdfs(一) --从 mysql 到 kafka

一. 概述 在大数据的静态数据处理中,目前普遍采用的是用 Spark + Hdfs (Hive / Hbase) 的技术架构来对数据进行处理. 但有时候有其他的需求,需要从其他不同数据源不间断得采集数据,然后存储到 Hdfs 中进行处理.而追加(append)这种操作在 Hdfs 里面明显是比较麻烦的一件事.所幸有了 Storm 这么个流数据处理这样的东西问世,可以帮我们解决这些问题. 不过光有 Storm 还不够,我们还需要其他中间件来协助我们,让所有其他数据源都归于一个通道.这样就能实现不同

Kafka Connect Architecture

Kafka Connect's goal of copying data between systems has been tackled by a variety of frameworks, many of them still actively developed and maintained. This section explains the motivation behind Kafka Connect, where it fits in the design space, and