说明:
1、本次仅实现了两个topic的数据同步,后续优化会持续更新。。。。。
2、自建集群CDH5.8,kafka2.1.0;阿里云集群标准版kafka0.10.x
踩坑:
1、cdh添加kafka角色实例CMM,应该是不支持SSL连接
2、VPC网络接入,不知道购买的阿里云实例有VPC网络,这个是没有SSL加密的连接
3、kafka0.10.2的mirrormaker不能连接自建集群
4、阿里云控制提示是SSl接入点,实际验证方式需要SASL_SSL
5、不懂java,不知道这个是加在什么位置export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"
6、ssl.truststore.location=kafka.client.truststore.jks
ssl.truststore.password=KafkaOnsClient 这个证书需要指定路径,还有这个密码就是固定的,我使用了另外的密码
准备:
1、下载kafka_2.12-2.2.1.tgz,比阿里云推荐的高了一个小版本
2、下载kafka.client.truststore.jks,需要跟阿里云要,或者 阿里云提供的文档里有下载链接
3、手动创建kafka_client_jaas.conf文件,下面会贴出内容
部署:
1、服务器确保可以访问自建集群的9092和阿里云集群的9093
2、上传,解压kafka_2.12-2.2.1.tgz(这里不要配置zookeepr,不需要启动kafka)
3、config目录新建kafka_client_jaas.conf文件(kafka的解压目录)
4、新建目录cert,并上传kafka.client.truststore.jks证书(kafka的解压目录)
5、vim /erc/profile最底部加入export KAFKA_OPTS="-Djava.security.auth.login.config=xxxxxx/kafka_client_jaas.conf"(这里需要实际的目录)
6、编辑kafka_client_jaas.conf、consumer.properties和producer.properties
7、启动nohup bin/kafka-mirror-maker.sh --consumer.config config/consumer.properties --producer.config config/producer.properties --whitelist AIS_11_AisMySql,AIS_99_FWP &(后台运行)
8、目标topic查看是否有消息
配置文件内容
1、kafka_client_jaas.conf
#这里的用户和密码从阿里云控制台获取
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="xxxxxx"
password="xxxxxx";
};
2、consumer.properties
# list of brokers used for bootstrapping knowledge about the rest of the cluster
#format: host1:port1,host2:port2 ...
bootstrap.servers=自建集群ip:9092
#consumer group id
group.id=test-consumer-group
#消费者分区分配策略
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
#What to do when there is no initial offset in Kafka or if the current
#offset does not exist any more on the server: latest, earliest, none
#auto.offset.reset=
3、producer.properties
############################# Producer Basics #############################
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=阿里云集群ip:9093
# specify the compression codec for all data generated: none, gzip, snappy, lz4, zstd
compression.type=none
# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=
# the maximum amount of time the client will wait for the response of a request
#request.timeout.ms=
# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=
# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
#linger.ms=
# the maximum size of a request in bytes
#max.request.size=
# the default batch size in bytes when batching multiple records sent to a partition
#batch.size=
# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
#buffer.memory=
ssl.truststore.location=/application/kafka/cert/kafka.client.truststore.jks
ssl.truststore.password=KafkaOnsClient
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
ssl.endpoint.identification.algorithm=
#最后这一行是kafka的版本高于2.x.x才需要
原文地址:https://blog.51cto.com/19840202/2443515