kafka中生产者和消费者API

使用idea实现相关API操作,先要再pom.xml重添加Kafka依赖:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.8.2</artifactId>
            <version>0.8.1</version>
            <exclusions>
                <exclusion>
                    <artifactId>jmxtools</artifactId>
                    <groupId>com.sun.jdmk</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jmxri</artifactId>
                    <groupId>com.sun.jmx</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jms</artifactId>
                    <groupId>javax.jms</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

Kafka生产者API:

 1 package cn.itcast.storm.kafka.simple;
 2
 3 import kafka.javaapi.producer.Producer;
 4 import kafka.producer.KeyedMessage;
 5 import kafka.producer.ProducerConfig;
 6
 7 import java.util.Properties;
 8 import java.util.UUID;
 9
10 /**
11  * 这是一个简单的Kafka producer代码
12  * 包含两个功能:
13  * 1、数据发送
14  * 2、数据按照自定义的partition策略进行发送
15  *
16  *
17  * KafkaSpout的类
18  */
19 public class KafkaProducerSimple {
20     public static void main(String[] args) {
21         /**
22          * 1、指定当前kafka producer生产的数据的目的地
23          *  创建topic可以输入以下命令,在kafka集群的任一节点进行创建。
24          *  bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic test
25          */
26         String TOPIC = "orderMq";
27         /**
28          * 2、读取配置文件
29          */
30         Properties props = new Properties();
31         /*
32          * key.serializer.class默认为serializer.class
33          */
34         props.put("serializer.class", "kafka.serializer.StringEncoder");
35         /*
36          * kafka broker对应的主机,格式为host1:port1,host2:port2
37          */
38         props.put("metadata.broker.list", "kafka01:9092,kafka02:9092,kafka03:9092");
39         /*
40          * request.required.acks,设置发送数据是否需要服务端的反馈,有三个值0,1,-1
41          * 0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。
42          * 这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。
43          * 1,意味着在leader replica已经接收到数据后,producer会得到一个ack。
44          * 这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。
45          * 如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会丢失。
46          * -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。
47          * 这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失
48          */
49         props.put("request.required.acks", "1");
50         /*
51          * 可选配置,如果不配置,则使用默认的partitioner partitioner.class
52          * 默认值:kafka.producer.DefaultPartitioner
53          * 用来把消息分到各个partition中,默认行为是对key进行hash。
54          */
55         props.put("partitioner.class", "cn.itcast.storm.kafka.MyLogPartitioner");
56 //        props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
57         /**
58          * 3、通过配置文件,创建生产者
59          */
60         Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));
61         /**
62          * 4、通过for循环生产数据
63          */
64         for (int messageNo = 1; messageNo < 100000; messageNo++) {
65 //            String messageStr = new String(messageNo + "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey," +
66 //                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
67 //                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
68 //                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
69 //                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
70 //                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
71 //                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
72 //                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
73 //                    "用来配合自定义的MyLogPartitioner进行数据分发");
74
75             /**
76              * 5、调用producer的send方法发送数据
77              * 注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发
78              */
79             producer.send(new KeyedMessage<String, String>(TOPIC, messageNo + "", "appid" + UUID.randomUUID() + "itcast"));
80         }
81     }
82 }

Kafka消费者API:

 1 package cn.itcast.storm.kafka.simple;
 2
 3 import kafka.consumer.Consumer;
 4 import kafka.consumer.ConsumerConfig;
 5 import kafka.consumer.ConsumerIterator;
 6 import kafka.consumer.KafkaStream;
 7 import kafka.javaapi.consumer.ConsumerConnector;
 8 import kafka.message.MessageAndMetadata;
 9
10 import java.util.HashMap;
11 import java.util.List;
12 import java.util.Map;
13 import java.util.Properties;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.Executors;
16
17 public class KafkaConsumerSimple implements Runnable {
18     public String title;
19     public KafkaStream<byte[], byte[]> stream;
20     public KafkaConsumerSimple(String title, KafkaStream<byte[], byte[]> stream) {
21         this.title = title;
22         this.stream = stream;
23     }
24     @Override
25     public void run() {
26         System.out.println("开始运行 " + title);
27         ConsumerIterator<byte[], byte[]> it = stream.iterator();
28         /**
29          * 不停地从stream读取新到来的消息,在等待新的消息时,hasNext()会阻塞
30          * 如果调用 `ConsumerConnector#shutdown`,那么`hasNext`会返回false
31          * */
32         while (it.hasNext()) {
33             MessageAndMetadata<byte[], byte[]> data = it.next();
34             String topic = data.topic();
35             int partition = data.partition();
36             long offset = data.offset();
37             String msg = new String(data.message());
38             System.out.println(String.format(
39                     "Consumer: [%s],  Topic: [%s],  PartitionId: [%d], Offset: [%d], msg: [%s]",
40                     title, topic, partition, offset, msg));
41         }
42         System.out.println(String.format("Consumer: [%s] exiting ...", title));
43     }
44
45     public static void main(String[] args) throws Exception{
46         Properties props = new Properties();
47         props.put("group.id", "dashujujiagoushi");
48         props.put("zookeeper.connect", "zk01:2181,zk02:2181,zk03:2181");
49         props.put("auto.offset.reset", "largest");
50         props.put("auto.commit.interval.ms", "1000");
51         props.put("partition.assignment.strategy", "roundrobin");
52         ConsumerConfig config = new ConsumerConfig(props);
53         String topic1 = "orderMq";
54         String topic2 = "paymentMq";
55         //只要ConsumerConnector还在的话,consumer会一直等待新消息,不会自己退出
56         ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config);
57         //定义一个map
58         Map<String, Integer> topicCountMap = new HashMap<>();
59         topicCountMap.put(topic1, 3);
60         //Map<String, List<KafkaStream<byte[], byte[]>> 中String是topic, List<KafkaStream<byte[], byte[]>是对应的流
61         Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);
62         //取出 `kafkaTest` 对应的 streams
63         List<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic1);
64         //创建一个容量为4的线程池
65         ExecutorService executor = Executors.newFixedThreadPool(3);
66         //创建20个consumer threads
67         for (int i = 0; i < streams.size(); i++)
68             executor.execute(new KafkaConsumerSimple("消费者" + (i + 1), streams.get(i)));
69     }
70 }

kafka自定义patition:

 1 package cn.itcast.storm.kafka;
 2
 3 import kafka.producer.Partitioner;
 4 import kafka.utils.VerifiableProperties;
 5 import org.apache.log4j.Logger;
 6
 7
 8 public class MyLogPartitioner implements Partitioner {
 9     private static Logger logger = Logger.getLogger(MyLogPartitioner.class);
10
11     public MyLogPartitioner(VerifiableProperties props) {
12     }
13
14     public int partition(Object obj, int numPartitions) {
15         return Integer.parseInt(obj.toString())%numPartitions;
16 //        return 1;
17     }
18
19 }
时间: 2024-12-12 10:16:02

kafka中生产者和消费者API的相关文章

kafka中partition和消费者对应关系

1个partition只能被同组的一个consumer消费,同组的consumer则起到均衡效果 消费者多于partition topic: test 只有一个partition创建一个topic——test, bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 在g2组中启动两个consumer, 1. bin/kafka-conso

Kafka(八)Python生产者和消费者API使用

单线程生产者 #!/usr/bin/env python # -*- coding: utf-8 -*- import random import sys from kafka import KafkaProducer from kafka.client import log import time import json __metaclass__ = type class Producer:     def __init__(self, KafkaServer='127.0.0.1', Ka

使用java创建kafka的生产者和消费者

创建一个Kafka的主题,连接到zk集群,副本因子3,分区3,主题名是test111        [[email protected] kafka]# bin/kafka-topics.sh --create --zookeeper h5:2181 --topic test111 --replication-factor 3 --partitions 3    查看Kafka的主题详情        [[email protected] kafka]# bin/kafka-topics.sh

使用JUC并发工具包的Lock和Condition,实现生产者和消费者问题中的有界缓存

JDK5.0之前,用java实现生产者和消费者的唯一方式就是使用synchronized内置锁和wait/notify条件通知机制.JDK5.0之后提供了显示锁Lock和条件队列Condition,与内置锁和内置条件队列相对应,但是显示的锁和条件队列,功能更强大,更灵活.此外JDK5.0之后还提供了大量很有用的并发工具类,如BlockingQueue等,基于这些数据结构,能够方便.快速.高效的构建自己应用需要的效果.这里我们简单使用下显示锁和条件队列,来模拟有界缓存的实现,功能类似于JDK内置的

LMAX Disrutpor—多生产者多消费者中,消息复制分发的高性能实现

解决的问题 当我们有多个消息的生产者线程,一个消费者线程时,他们之间如何进行高并发.线程安全的协调? 很简单,用一个队列. 当我们有多个消息的生产者线程,多个消费者线程,并且每一条消息需要被所有的消费者都消费一次(这就不是一般队列,只消费一次的语义了),该怎么做? 这时仍然需要一个队列.但是: 1. 每个消费者需要自己维护一个指针,知道自己消费了队列中多少数据.这样同一条消息,可以被多个人独立消费. 2. 队列需要一个全局指针,指向最后一条被所有生产者加入的消息.消费者在消费数据时,不能消费到这

Kafka之生产者消费者示例

本例以kafka2.10_0.10.0.0为例,不同版本的kafka Java api有些区别! 增加maven依赖 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.10.0.0</version> </dependency> 生产者 package com.zns.k

Kafka 生产者、消费者与分区的关系

kafka 生产者.消费者与分区的关系 背景 最近和海康整数据对接, 需要将海康产生的结构化数据拿过来做二次识别. 基本的流程: 海康大数据 --> kafka server --> 平台 Kafka 的 topic 正常过车 topic: BAYONET_VEHICLEPASS 违法过车 topic: BAYONET_VEHICLEALARM 前言 首先我们需要对kafka中的一些名词有一定的了解, 有过一些使用经验, 一般来说, 生产者发送消息到主题, 而消费者从主题消费数据 ( 我初次接

Java中的生产者、消费者问题

Java中的生产者.消费者问题描述: 生产者-消费者(producer-consumer)问题, 也称作有界缓冲区(bounded-buffer)问题, 两个进程共享一个公共的固定大小的缓冲区(仓库). 其中一个是生产者, 用于将产品放入仓库: 另外一个是消费者, 用于从仓库中取出产品消费. 问题出现在当仓库已经满了, 而此时生产者还想向其中放入一个新的产品的情形, 其解决方法是让生产者此时进行等待, 等待消费者从仓库中取走了一个或者多个产品后再去唤醒它. 同样地, 当仓库已经空了, 而消费者还

多线程生产者、消费者模式中,如何停止消费者?多生产者情况下对“毒丸”策略的应用。

生产者.消费者模式是多线程中的经典问题.通过中间的缓冲队列,使得生产者和消费者的速度可以相互调节. 对于比较常见的单生产者.多消费者的情况,主要有以下两种策略: 通过volatile boolean producerDone =false 来标示是否完成.生产者结束后标示为true, 消费者轮询这个变量来决定自己是否退出. 这种方式对producerDone产生比较大的争用,实现起来也有诸多问题需要考虑. 比较经典的"毒丸"策略,生产者结束后,把一个特别的对象:"毒丸&quo