kafka 生产者消费者 api接口

生产者

import java.util.Properties;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

import kafka.serializer.StringEncoder;

public class KafkaProducer  extends Thread{

private String topic;

//构造方法

public KafkaProducer(String topic) {

this.topic = topic;

}

@Override

public void run() {

Producer<Integer, String> producer = createProducer();

for(int i =1 ;i<=10;i++){

String message = "message"+i;

producer.send(new KeyedMessage<Integer, String>(topic, message));

System.out.println("生产者生成---------->"+message);

try {

//睡眠 1S

sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

public Producer createProducer() {

Properties properties = new Properties();

properties.setProperty("zookeeper.connect", "192.168.80.20:2181,192.168.80.21:2181,192.168.80.22:2181");

// properties.setProperty("zookeeper.connect", "cloud1:2181,cloud2:2181,cloud3:2181");

properties.setProperty("serializer.class", StringEncoder.class.getName());

//properties.setProperty("serializer.class", "kafka.serializer.StringEncoder");

properties.setProperty("metadata.broker.list", "192.168.80.20:9092");

return new Producer<Integer, String>(new ProducerConfig(properties));

}

public static void main(String[] args) {

new KafkaProducer("test").start();

}

}

消费者

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaComsumer extends Thread {

private String topic;

public KafkaComsumer(String topic) {

this.topic = topic;

}

@Override

public void run() {

ConsumerConnector consumer = createConsumer();

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

topicCountMap.put(topic, 1);

Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap );

KafkaStream<byte[], byte[]> kafkaStream = messageStreams.get(topic).get(0);

ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();

while(iterator.hasNext()){

String message = new String(iterator.next().message());

System.out.println("接受到的信息------------》"+message);

}

}

public ConsumerConnector createConsumer() {

Properties properties = new Properties();

properties.setProperty("zookeeper.connect", "192.168.80.20:2181,192.168.80.21:2181,192.168.80.22:2181");

properties.setProperty("group.id", "group1");

return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

}

public static void main(String[] args) {

new KafkaComsumer("test").start();

}

}

时间: 2024-10-24 07:10:20

kafka 生产者消费者 api接口的相关文章

kafka集群搭建和使用Java写kafka生产者消费者

 kafka集群搭建 Java代码   1.zookeeper集群  搭建在110, 111,112 2.kafka使用3个节点110, 111,112 修改配置文件config/server.properties broker.id=110 host.name=192.168.1.110 log.dirs=/usr/local/kafka_2.10-0.8.2.0/logs 复制到其他两个节点,然后修改对应节点上的config/server.pro 3.启动,在三个节点分别执行 bin/kaf

kafka中生产者和消费者API

使用idea实现相关API操作,先要再pom.xml重添加Kafka依赖: <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.8.2</artifactId> <version>0.8.1</version> <exclusions> <exclusion> <artifactId>jmxtools&

使用Win32 API实现生产者消费者线程同步

使用win32 API创建线程,创建信号量用于线程的同步 创建信号量 语法例如以下 HANDLE semophore; semophore = CreateSemaphore(lpSemaphoreAttributes, lInitialCount, lMaximumCount, lpName); CreateSemophore函数的原型例如以下: HANDLE WINAPI CreateSemaphore( _In_opt_ LPSECURITY_ATTRIBUTES lpSemaphoreA

Java总结(十一)——通过Callable接口实现多线程,生产者消费者问题,多线下载(复制)文件

一.通过Callable接口实现多线程 1.Callable接口介绍: (1)java.util.concurrent.Callable是一个泛型接口,只有一个call()方法 (2)call()方法抛出异常Exception异常,且返回一个指定的泛型类对象 2.Callable接口实现多线程的应用场景 (1)当父线程想要获取子线程的运行结果时 3.使用Callable接口实现多线程的步骤 (1)第一步:创建Callable子类的实例化对象 (2)第二步:创建FutureTask对象,并将Cal

Kafka 生产者、消费者与分区的关系

kafka 生产者.消费者与分区的关系 背景 最近和海康整数据对接, 需要将海康产生的结构化数据拿过来做二次识别. 基本的流程: 海康大数据 --> kafka server --> 平台 Kafka 的 topic 正常过车 topic: BAYONET_VEHICLEPASS 违法过车 topic: BAYONET_VEHICLEALARM 前言 首先我们需要对kafka中的一些名词有一定的了解, 有过一些使用经验, 一般来说, 生产者发送消息到主题, 而消费者从主题消费数据 ( 我初次接

Kafka消费者——API开发

目录 消费者客户端 订阅主题 订阅分区 取消订阅 订阅总结 消息消费 poll ConsumerRecord 位移提交 自动提交 手动提交 控制和关闭消费 指定位移消费 再均衡 消费者拦截器 消费者客户端 消费步骤: 1.配置消费者客户端参数并创建相应的消费者实例. 2.订阅主题. 3.拉取消息并消费 4.提交消费位移 5.关闭消费者实例 Properties prop = new Properties(); prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_C

基于java callable及future接口解决生产者消费者问题

这两天复习java线程时,把java里面的线程基本知识点与jdk1.5以后新添加的一些类的使用都了解了一下,借用生产者消费者的问题来将他们实践一下. 题目:(题目在csdn一大牛的空间找的) 生产者-消费者模式,这个食堂中只有1张桌子,同时最多放10个盘子,现在有4个厨师做菜,每做好一盘就往桌子上放(生产者将产品往仓库中放),而有6个食客不停地吃(消费者消费产品,为了说明问题,他们的食量是无的).一般而言,厨师200-400ms做出一盘菜,而食客要400-600ms吃完一盘.当桌子上放满了10个

生产者消费者模式(转)

本文转载自博文系列架构设计:生产者/消费者模式.文中对原文格式进行了稍加整理. 概述 今天打算来介绍一下“生产者/消费者模式”,这玩意儿在很多开发领域都能派上用场.由于该模式很重要,打算分几个帖子来介绍.今天这个帖子先来扫盲一把.如果你对这个模式已经比较了解,请跳过本扫盲帖,直接看下一个帖子(关于该模式的具体应用) . 看到这里,可能有同学心中犯嘀咕了:在四人帮(GOF)的23种模式里面似乎没听说过这种嘛!其实GOF那经典的23种模式主要是基于OO的(从书名<Design Patterns: E

生产者消费者问题、Java实现

来,今天尝试把这个问题搞定.还是这种节奏,看一个问题要先从历史看.全局看,这样我们才能真正掌握其全貌,最终各个击破,了然于胸! 我们先来温习下如下概念: 1. 基础概念 基本的 程序 - Program 程序是静态的源代码或目标程序,是一个没有生命的实体. 进程 - Process 当CPU赋予程序生命时也即操作系统执行它时,程序成为了一个活动的实体(但不是可执行的实体),称为进程 - 进行中的程序. 进程是程序的一个实例: 是计算机分配资源的基本单位: 是线程的容器: 线程 - Thread