kafka中常用API的简单JAVA代码

  通过之前《kafka分布式消息队列介绍以及集群安装》的介绍,对kafka有了初步的了解。本文主要讲述java代码中常用的操作。

准备:增加kafka依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.0</version>
</dependency>

一、kafka中对topic的操作

package org.kafka;

import kafka.admin.DeleteTopicCommand;
import kafka.admin.TopicCommand;

/**
 * kafka主题操作
 */
public class TopicDemo {
	/**
	 * 添加主题
	 * linux命令:bin/kafka-topics.sh --create --zookeeper 192.168.2.100:2181 --replication-factor 3 --partitions 1 --topic topictest0416
	 */
	public static void createTopic() {
		String[] options = new String[] {
				"--create",
				"--zookeeper",
				"192.168.2.100:2181",
				"--replication-factor",
				"3",
				"--partitions",
				"1",
				"--topic",
				"topictest0416" };
		TopicCommand.main(options);
	}

	/**
	 * 查询所有主题
	 * linux命令:bin/kafka-topics.sh --list --zookeeper 192.168.2.100:2181
	 */
	public static void queryTopic() {
		String[] options = new String[] {
				"--list",
				"--zookeeper",
				"192.168.2.100:2181" };
		TopicCommand.main(options);
	}

	/**
	 * 查看指定主题的分区及副本状态信息
	 * bin/kafka-topics.sh --describe --zookeeper 192.168.2.100:2181 --topic topictest0416
	 */
	public static void queryTopicByName() {
		String[] options = new String[]{
			    "--describe",
			    "--zookeeper",
			    "192.168.2.100:2181",
			    "--topic",
			    "topictest0416",
			};
		TopicCommand.main(options);
	}

	/**
	 * 修改主题
	 * linux命令:bin/kafka-topics.sh --zookeeper 192.168.2.100:2181 --alter --topic topictest0416 --partitions 3
	 */
	public static void alterTopic() {
		String[] options = new String[]{
			    "--alter",
			    "--zookeeper",
			    "192.168.2.100:2181",
			    "--topic",
			    "topictest0416",
			    "--partitions",
			    "3"
			};
			TopicCommand.main(options);
	}

	/**
	 * 删除主题
	 */
	public static void delTopic() {
		String[] options = new String[] {
				"--zookeeper",
			    "192.168.2.100:2181",
			    "--topic",
			    "topictest0416" };
		DeleteTopicCommand.main(options);
	}

}

二、Producer代码

package org.kafka;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class ProducerDemo {
	public static void main(String[] args) throws InterruptedException {
		Properties props = new Properties();
		//zookeeper集群列表
		props.put("zk.connect", "hadoop1-1:2181,hadoop1-2:2181,hadoop1-3:2181");
		props.put("metadata.broker.list", "hadoop1-1:9092,hadoop1-2:9092,hadoop1-3:9092");
		//设置消息使用哪个类来序列化
		props.put("serializer.class", "kafka.serializer.StringEncoder");

		ProducerConfig config = new ProducerConfig(props);
		//构造Producer对象
		Producer<String, String> producer = new Producer<String, String>(config);

		// 发送业务消息
		// 读取文件 读取内存数据库
		for (int i = 0; i < 10; i++) {
			Thread.sleep(500);
			KeyedMessage<String, String> km = new KeyedMessage<String, String>("topictest0416", "I am a producer " + i + " hello!");
			producer.send(km);
		}

	}
}

三、consumer代码

package org.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

public class ConsumerDemo {
	private static final String topic = "topictest0416";
	private static final Integer threads = 1;

	public static void main(String[] args) {
		Properties props = new Properties();
		//zookeeper集群列表
		props.put("zookeeper.connect", "hadoop1-1:2181,hadoop1-2:2181,hadoop1-3:2181");
		//消费者组ID
		props.put("group.id", "001");
		//设置读取的偏移量;smallest意思是指向最小的偏移量
		props.put("auto.offset.reset", "smallest");
		//将Properties封装成消费者配置对象
		ConsumerConfig config = new ConsumerConfig(props);
		ConsumerConnector consumer =  Consumer.createJavaConsumerConnector(config);

		Map<String, Integer> topicMap = new HashMap<>();
		//key为消费的topic
		//value为消费的线程数量
		topicMap.put(topic, threads);

		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicMap);

		List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

		for (final KafkaStream<byte[], byte[]> kafkaStream : streams) {
			new Thread(new Runnable() {
				@Override
				public void run() {
					for (MessageAndMetadata<byte[], byte[]> mm : kafkaStream) {
						System.out.println(new String(mm.message()));
					}
				}
			}).start();
		}
	}

}

四、测试

  先启动Consumer,再启动Producer

  测试结果:

  

时间: 2025-01-05 23:06:31

kafka中常用API的简单JAVA代码的相关文章

spring注解开发中常用注解以及简单配置

一.spring注解开发中常用注解以及简单配置 1.为什么要用注解开发:spring的核心是Ioc容器和Aop,对于传统的Ioc编程来说我们需要在spring的配置文件中邪大量的bean来向spring容器中注入bean对象, 然而,通过注解编程可以缩短我们开发的时间,简化程序员的代码编写. 2.如何开启注解开发:最常用的方法是使用<mvc:annotation-driven/>来开启注解编程(用一个标签配置了spring注解编程的映射器和适配器,同时配置了许多的参数) 3.如何将有注解的be

简单了解Spring中常用工具类_java - JAVA

文章来源:嗨学网 敏而好学论坛www.piaodoo.com 欢迎大家相互学习 文件资源操作 Spring 定义了一个 org.springframework.core.io.Resource 接口,Resource 接口是为了统一各种类型不同的资源而定义的,Spring 提供了若干 Resource 接口的实现类,这些实现类可以轻松地加载不同类型的底层资源,并提供了获取文件名.URL 地址以及资源内容的操作方法 访问文件资源 * 通过 FileSystemResource 以文件系统绝对路径的

【Android】Android Studio 1.5+ 中混合调试Native和Java代码

[Android]Android Studio 1.5+ 中调试Native和Java代码 Android Studio 1.5+表示Android Studio 1.5版本以及以上. 网上大部分中文或英语教程还是停留在老版本的Android Studio的Debug的设置,要么不全.正好最近工作中要在Android Studio中调试C++代码,就来写一篇咯. Android Studio 1.5+的调试设置与之前有所不同. 而且新版中推出了一个Hybrid的调试:可以混合调试Native代码

详解介绍Selenium常用API的使用--Java语言(完整版)

一共分为二十个部分:环境安装之Java.环境安装之IntelliJ IDEA.环境安装之selenium.selenium3浏览器驱动.selenium元素定位.控制浏览器操作.WebDriver常用方法.模拟鼠标操作.模拟键盘操作.获取断言信息.设置元素等待.定位一组元素.多表单切换.多窗口切换.下拉框选择.警告框处理.文件上传.浏览器cookie操作.调用JavaScript代码.获取窗口截屏: 虽然,学习Maven需要增加你的学习成本,但如果你需要长期使用Java编程语言,或者想用Java

.NET开发中常用的10条实用代码

1.读取操作系统和CLR的版本 OperatingSystem os = System.Environment.OSVersion; Console.WriteLine(“Platform: {0}”, os.Platform); Console.WriteLine(“Service Pack: {0}”, os.ServicePack); Console.WriteLine(“Version: {0}”, os.Version); Console.WriteLine(“VersionStrin

浅析jQuery库的核心架构和常用API的简单实现

以下代码只是呈现了jQuery库中的原型继承的实现原理,以及其常用功能模块的简单实现,并不涉及过多的兼容性处理 //This is my$;(function (window , undefined) { //核心架构 function my$(selector){ //使用构造函数,创建my$对象,构造函数是其原型中的一个方法 return new my$.prototype.init(selector); } my$.fn = my$.prototype = { constructor :

常用的设计模式及java代码描述

http://blog.csdn.net/haoxingfeng/article/details/9191619 1>代理模式 http://www.cnblogs.com/chinajava/p/5880870.html 代理模式分静态代理和动态代理 静态代理:(需要一个接口,一个委托类实现接口,一个代理类实现接口,) 代理模式就是提供一个代理对象,其可以实现委托类的部分功能,而且也可以拓展自己的功能 例如(委托类)售票厅可以售票.改签.退票:(代理类)黄牛售票只可以售票,要办理改签或者退票的

ASP.NET程序中常用的三十三种代码

1. 打开新的窗口并传送参数: 传送参数: response.write("<script>window.open(’*.aspx?id="+this.DropDownList1.SelectIndex+"&id1="+...+"’)</script>") 接收参数: string a = Request.QueryString("id");string b = Request.QueryStr

Web javascript 中常用API合集

来源于:https://www.kancloud.cn/dennis/tgjavascript/241852 一.节点 1.1 节点属性 Node.nodeName //返回节点名称,只读 Node.nodeType //返回节点类型的常数值,只读 Node.nodeValue //返回Text或Comment节点的文本值,只读 Node.textContent //返回当前节点和它的所有后代节点的文本内容,可读写 Node.baseURI //返回当前网页的绝对路径 Node.ownerDoc