In the previous post Kafka connect in practice(1): standalone, I have introduced about the basics of kafka connect configuration and demonstrate a local standalone demo. In this post we will show the knowledge about distributed data pull an sink. To start, do make sure the kafka broker and zookeeper are started!
1. configuration
vim $KAFKA_HOME/bin/connect-distributed.properties
set contents of this file as:
# These are defaults. This file just demonstrates how to override some settings. #A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. #The client will make use of all servers irrespective of which servers are specified here for bootstrapping— #this list only impacts the initial hosts used to discover the full set of servers. #This list should be in the form host1:port1,host2:port2,.... #Since these servers are just used for the initial connection to discover the full cluster membership #(which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, #in case a server is down). #notes: this configuration is required. bootstrap.servers=localhost:9092 # unique name for the cluster, used in forming the Connect cluster group. #Note that this must not conflict with consumer group IDs. #notes: this configuration is required. group.id=connect-cluster # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will # need to configure these based on the format they want their data in when loaded from or stored into Kafka #Converter class for key Connect data. This controls the format of the data that will be written to Kafka for source connectors or read from Kafka for sink connectors. #Popular formats include Avro and JSON. #notes: this configuration is required. key.converter=org.apache.kafka.connect.json.JsonConverter #Converter class for value Connect data. This controls the format of the data that will be written to #Kafka for source connectors or read from Kafka for sink connectors. Popular formats include Avro and JSON. #notes: this configuration is required. value.converter=org.apache.kafka.connect.json.JsonConverter # Converter-specific settings can be passed in by prefixing the Converter‘s setting with the converter we want to apply # it to, #if we want to see the schema of the message ,we can turn on the *.converter.schemas.enable #vice, if we don‘t wana to see the schema of the message, we should turn of the *.converter.schemas.enable # generally speaking, if dev and testing env, we can turn the following attributes on for tracking consideration, # and turned off in production consideration for network and disk capacity usage. key.converter.schemas.enable=false value.converter.schemas.enable=false #The name of the topic where connector and task configuration offsets are stored. This must be the same for all workers with #the same group.id. Kafka Connect will upon startup attempt to automatically create this topic with multiple partitions and #a compacted cleanup policy to avoid losing data, but it will simply use the topic if it already exists. If you choose to #create this topic manually, always create it as a compacted, highly replicated (3x or more) topic with a large number of #partitions (e.g., 25 or 50, just like Kafka‘s built-in __consumer_offsets topic) to support large Kafka Connect clusters. offset.storage.topic=connect-offsets #The replication factor used when Connect creates the topic used to store connector offsets. This should always be at least # 3 for a production system, but cannot be larger than the number of Kafka brokers in the cluster. offset.storage.replication.factor=1 #The number of partitions used when Connect creates the topic used to store connector offsets. A large value (e.g., 25 or 50, #just like Kafka‘s built-in __consumer_offsets topic) is necessary to support large Kafka Connect clusters. offset.storage.partitions=50 #The name of the topic where connector and task configuration data are stored. This must be the same for all workers with #the same group.id. Kafka Connect will upon startup attempt to automatically create this topic with a single-partition and #ompacted cleanup policy to avoid losing data, but it will simply use the topic if it already exists. If you choose to create #this topic manually, always create it as a compacted topic with a single partition and a high replication factor (3x or more). config.storage.topic=connect-configs #The replication factor used when Kafka Connects creates the topic used to store connector and task configuration data. #This should always be at least 3 for a production system, but cannot be larger than the number of Kafka brokers in the cluster. config.storage.replication.factor=1 config.storage.partitions=1 #The name of the topic where connector and task configuration status updates are stored. This must be the same for all workers with #the same group.id. Kafka Connect will upon startup attempt to automatically create this topic with multiple partitions and a compacted #cleanup policy to avoid losing data, but it will simply use the topic if it already exists. If you choose to create this topic manually, #always create it as a compacted, highly replicated (3x or more) topic with multiple partitions. status.storage.topic=connect-status #The replication factor used when Connect creates the topic used to store connector and task status updates. This should always be at least 3 #for a production system, but cannot be larger than the number of Kafka brokers in the cluster. status.storage.replication.factor=1 #The number of partitions used when Connect creates the topic used to store connector and task status updates. status.storage.partitions=10 # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 # These are provided to inform the user about the presence of the REST host and port configs # Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests. #Hostname for the REST API. If this is set, it will only bind to this interface. #notes: this configuration is optional rest.host.name=localhost #Port for the REST API to listen on. #notes: this configuration is optional rest.port=8083 # The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers. #rest.advertised.host.name= #rest.advertised.port= rest.advertised.host.name=127.0.0.1 rest.advertised.port=8083 # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins # (connectors, converters, transformations). The list should consist of top level directories that include # any combination of: # a) directories immediately containing jars with plugins and their dependencies # b) uber-jars with plugins and their dependencies # c) directories immediately containing the package directory structure of classes of plugins and their dependencies # Note: symlinks will be followed to discover dependencies or plugins. plugin.path=/home/lenmom/workspace/software/kafka_2.11-2.1.0/connect #Converter class for internal key Connect data that implements the Converter interface. Used for converting data like offsets and configs. #notes: this configuration is optional internal.value.converter=org.apache.kafka.connect.json.JsonConverter #Converter class for offset value Connect data that implements the Converter interface. Used for converting data like offsets and configs. #notes: this configuration is optional internal.key.converter=org.apache.kafka.connect.json.JsonConverter #notes: this configuration is optional task.shutdown.graceful.timeout.ms=10000 #notes: this configuration is optional offset.flush.timeout.ms=5000
2. download a debezium-connector-mysql plugin tarbar and unzip it into the the folder defined at the end of connect-distributed.properties
2. start mysql server with bin-log enabled
for detail please refer my previous blog post mysql 5.7 enable binlog
3. start the kafka connect distributed
sh $KAFKA_HOME/connect-distributed.sh $KAFKA_HOME/config/connect-distributed.properties #or start in background #sh $KAFKA_HOME/connect-distributed.sh --daemon $KAFKA_HOME/config/connect-distributed.properties
it starts with the following screenshot.
be aware that the INFO Added aliases ‘MySqlConnector‘ and ‘MySql‘ to plugin ‘io.debezium.connector.mysql.MySqlConnector‘ (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:390), our target plugin has been loaded by the kafka connect distributed.
4. create a the demo database in mysql
# In production you would almost certainly limit the replication user must be on the follower (slave) machine, # to prevent other clients accessing the log from other machines. For example, ‘replicator‘@‘follower.acme.com‘. # # However, this grant is equivalent to specifying *any* hosts, which makes this easier since the docker host # is not easily known to the Docker container. But don‘t do this in production. # GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO ‘replicator‘ IDENTIFIED BY ‘replpass‘; GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO ‘debezium‘ IDENTIFIED BY ‘dbz‘; # Create the database that we‘ll use to populate data and watch the effect in the binlog DROP DATABASE if exists inventory; CREATE DATABASE if not exists inventory; GRANT ALL PRIVILEGES ON inventory.* TO ‘root‘@‘%‘; # Switch to this database USE inventory; # Create and populate our products using a single insert with many rows CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512), weight FLOAT ); ALTER TABLE products AUTO_INCREMENT = 101; INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter",3.14), (default,"car battery","12V car battery",8.1), (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8), (default,"hammer","12oz carpenter‘s hammer",0.75), (default,"hammer","14oz carpenter‘s hammer",0.875), (default,"hammer","16oz carpenter‘s hammer",1.0), (default,"rocks","box of assorted rocks",5.3), (default,"jacket","water resistent black wind breaker",0.1), (default,"spare tire","24 inch spare tire",22.2); # Create and populate the products on hand using multiple inserts CREATE TABLE products_on_hand ( product_id INTEGER NOT NULL PRIMARY KEY, quantity INTEGER NOT NULL, FOREIGN KEY (product_id) REFERENCES products(id) ); INSERT INTO products_on_hand VALUES (101,3); INSERT INTO products_on_hand VALUES (102,8); INSERT INTO products_on_hand VALUES (103,18); INSERT INTO products_on_hand VALUES (104,4); INSERT INTO products_on_hand VALUES (105,5); INSERT INTO products_on_hand VALUES (106,0); INSERT INTO products_on_hand VALUES (107,44); INSERT INTO products_on_hand VALUES (108,2); INSERT INTO products_on_hand VALUES (109,5); # Create some customers ... CREATE TABLE customers ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL UNIQUE KEY ) AUTO_INCREMENT=1001; INSERT INTO customers VALUES (default,"Sally","Thomas","[email protected]"), (default,"George","Bailey","[email protected]"), (default,"Edward","Walker","[email protected]"), (default,"Anne","Kretchmar","[email protected]"); # Create some veyr simple orders CREATE TABLE orders ( order_number INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, order_date DATE NOT NULL, purchaser INTEGER NOT NULL, quantity INTEGER NOT NULL, product_id INTEGER NOT NULL, FOREIGN KEY order_customer (purchaser) REFERENCES customers(id), FOREIGN KEY ordered_product (product_id) REFERENCES products(id) ) AUTO_INCREMENT = 10001; INSERT INTO orders VALUES (default, ‘2016-01-16‘, 1001, 1, 102), (default, ‘2016-01-17‘, 1002, 2, 105), (default, ‘2016-02-19‘, 1002, 2, 106), (default, ‘2016-02-21‘, 1003, 1, 107);
5. Register a mysql connector using kafka connector rest api
5.1 check connector version
curl -H "Accept:application/json" localhost:8083/
output
{"version":"2.1.0","commit":"809be928f1ae004e","kafka_cluster_id":"NGQRxNZMSY6Q53ktQABHsQ"}
5.2 get current connector list
[email protected]:~/$ curl -H "Accept:application/json" localhost:8083/connectors/ []
the ouput indicates there‘s no connector in the distributed connector.
5.3 registet a mysql connector instance to in the distributed connector
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d ‘{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "127.0.0.1", "database.port": "3306", "database.user": "root", "database.password": "root", "database.server.id": "184054", "database.server.name": "127.0.0.1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "127.0.0.1:9092", "database.history.kafka.topic": "dbhistory.inventory" } }‘
the formated content describes as follows for readable consideration
{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "127.0.0.1", "database.port": "3306", "database.user": "root", "database.password": "root", "database.server.id": "184054", "database.server.name": "127.0.0.1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "127.0.0.1:9092", "database.history.kafka.topic": "schema-changes.inventory" } }
the response of the post request show as follows:
HTTP/1.1 201 Created Date: Tue, 07 Feb 2017 20:49:34 GMT Location: http://localhost:8083/connectors/inventory-connector Content-Type: application/json Content-Length: 471 Server: Jetty(9.2.15.v20160210) { "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "root", "database.password": "root", "database.server.id": "184054", "database.server.name": "127.0.0.1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "127.0.0.1:9092", "database.history.kafka.topic": "dbhistory.inventory", "name": "inventory-connector" }, "tasks": [] }
5.4 get connector using curl again, there should exist a connector since we have just registered one.
curl -H "Accept:application/json" localhost:8083/connectors/
output
["inventory-connector"]
5.5 get connector detail
curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector
output
HTTP/1.1 200 OK Date: Wed, 24 Apr 2019 10:15:16 GMT Content-Type: application/json Content-Length: 536 Server: Jetty(9.4.12.v20180830) { "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.user": "root", "database.server.id": "184054", "tasks.max": "1", "database.history.kafka.bootstrap.servers": "127.0.0.1:9092", "database.history.kafka.topic": "dbhistory.inventory", "database.server.name": "127.0.0.1", "database.port": "3306", "database.hostname": "127.0.0.1", "database.password": "root", "name": "inventory-connector", "database.whitelist": "inventory" }, "tasks": [ { "connector": "inventory-connector", "task": 0 } ], "type": "source" }
the "task":0 indicate there is one task with id 0.
5.5 list the kafka topics
sh $KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --list
output
127.0.0.1 127.0.0.1.inventory.customers 127.0.0.1.inventory.orders 127.0.0.1.inventory.products 127.0.0.1.inventory.products_on_hand __consumer_offsets connect-configs connect-offsets connect-status connect-test dbhistory.inventory
and this indicate the mysql connector has started watching the mysql data changes and begin to push the changed data to kafka broker.
5.6 abserve the data in the kafka topic
sh $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 127.0.0.1.inventory.customers --from-beginning # all the data changes for table customers in database inventory would be listed
output:
{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"[email protected]"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000012","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1556100441651} {"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"[email protected]"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000012","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1556100441651} {"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"[email protected]"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000012","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1556100441651} {"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"[email protected]"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000012","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1556100441651}
let‘s take a record as formated as example,say the first one.
{ "before": null, "after": { "id": 1001, "first_name": "Sally", "last_name": "Thomas", "email": "[email protected]" }, "source": { "version": "0.9.4.Final", "connector": "mysql", "name": "127.0.0.1", "server_id": 0, "ts_sec": 0, "gtid": null, "file": "mysql-bin.000012", "pos": 154, "row": 0, "snapshot": true, "thread": null, "db": "inventory", "table": "customers", "query": null }, "op": "c", "ts_ms": 1556100441651 }
the op field indicate the data change type:
- c: insert a record into the database. if c, the before element would be null.
- d: delete a record in the database. if d, the after element would be null
- u:update a record in the database. the before element indicate the data in the database when the update take action, and the after element indicate the data after the update take action.
let‘s do some update and delete operations in mysql database to see the changed data captured in kafka.
mysql> select * from customers; +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | [email protected] | | 1002 | George | Bailey | [email protected] | | 1003 | Edward | Walker | [email protected] | | 1004 | Anne | Kretchmar | [email protected] | +------+------------+-----------+-----------------------+ 4 rows in set (0.00 sec) mysql> update customers set first_name=‘1234‘ where id=1004; Query OK, 1 row affected (0.01 sec) Rows matched: 1 Changed: 1 Warnings: 0 mysql> delete from customers where id=1004; Query OK, 1 row affected (0.01 sec) mysql> select * from customers; +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | [email protected] | | 1002 | George | Bailey | [email protected] | | 1003 | Edward | Walker | [email protected] | +------+------------+-----------+-----------------------+ 3 rows in set (0.00 sec)
as it shows, we first update the record with id=1004 field first_name to 1234, and then delete the record.
kafka record:
{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"[email protected]"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000012","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1556100441651} {"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"[email protected]"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000012","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1556100441651} {"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"[email protected]"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000012","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1556100441651} {"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"[email protected]"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000012","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1556100441651} {"before":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"[email protected]"},"after":{"id":1004,"first_name":"1234","last_name":"Kretchmar","email":"[email protected]"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":223344,"ts_sec":1556102881,"gtid":null,"file":"mysql-bin.000012","pos":364,"row":0,"snapshot":false,"thread":2,"db":"inventory","table":"customers","query":null},"op":"u","ts_ms":1556102881385} {"before":{"id":1004,"first_name":"1234","last_name":"Kretchmar","email":"[email protected]"},"after":null,"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":223344,"ts_sec":1556102900,"gtid":null,"file":"mysql-bin.000012","pos":725,"row":0,"snapshot":false,"thread":2,"db":"inventory","table":"customers","query":null},"op":"d","ts_ms":1556102900269} null
there‘s two more record in the kafka related topic.
update:
{ "before": { "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "[email protected]" }, "after": { "id": 1004, "first_name": "1234", "last_name": "Kretchmar", "email": "[email protected]" }, "source": { "version": "0.9.4.Final", "connector": "mysql", "name": "127.0.0.1", "server_id": 223344, "ts_sec": 1556102881, "gtid": null, "file": "mysql-bin.000012", "pos": 364, "row": 0, "snapshot": false, "thread": 2, "db": "inventory", "table": "customers", "query": null }, "op": "u", "ts_ms": 1556102881385 }
delete:
{ "before": { "id": 1004, "first_name": "1234", "last_name": "Kretchmar", "email": "[email protected]" }, "after": null, "source": { "version": "0.9.4.Final", "connector": "mysql", "name": "127.0.0.1", "server_id": 223344, "ts_sec": 1556102900, "gtid": null, "file": "mysql-bin.000012", "pos": 725, "row": 0, "snapshot": false, "thread": 2, "db": "inventory", "table": "customers", "query": null }, "op": "d", "ts_ms": 1556102900269 }
if we stop the mysql connector or restart kafka broker, we should see the data still exist in the kafka since it persistent message offsets and data in kafka‘s specific topic we configed in connect-distributed.properties
6. register hdfs connector plugin in distributed connect
原文地址:https://www.cnblogs.com/lenmom/p/10763589.html