java使用kafka创建demo

1.创建java项目。在pom.xml文件中添加

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.0</version>
</dependency>

2.创建生产者代码

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ProducerSend {
    public static void main(String args[]) {

        //1.参数配置:端口、缓冲内存、最大连接数、key序列化、value序列化等等(不是每一个非要配置)
        Properties props=new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //2.创建生产者对象,并建立连接
        Producer<String, String> producer = new KafkaProducer<String,String>(props);
        try {
            //3.在my-topic主题下,发送消息
            for (int i = 0; i < 10000; i++) {
                System.out.println(Integer.toString(i));
                producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
                Thread.sleep(500);
            }
        }
        catch (Exception e)
        {
            System.out.println("ERROR");
        }

        //4.关闭
        producer.close();

    }
}

3. 创建消费者代码

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class ConsumerReceive {
    public static void main(String args[]) {

        //1.参数配置:不是每一非得配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "172.16.8.100:9092");
        props.put("auto.commit.interval.ms", "1000");
        //因为每一个消费者必须属于某一个消费者组,所以必须还设置group.id
        props.put("group.id", "test1");
        props.put("enable.auto.commit", "true");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //2.创建消费者对象,并建立连接
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);

        //3.设置从"my-topic"主题下拿取数据
        consumer.subscribe(Arrays.asList("my-topic"));

        //4.消费数据
        while (true) {
            //阻塞时间,从kafka中取出100毫秒的数据,有可能一次性去除0-n条
            ConsumerRecords<String, String> records = consumer.poll(100);
            //遍历
            for (ConsumerRecord<String, String> record : records)
                //打印结果
                //System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
                System.out.println("消费者消费的数据为:"+record.value());
        }
    }
}

原文地址:https://www.cnblogs.com/tong775131501/p/12327167.html

时间: 2024-10-26 00:30:00

java使用kafka创建demo的相关文章

ionic2新手入门整理,搭建环境,创建demo,打包apk,热更新,优化启动慢等避坑详解

onic官方文档链接:http://ionicframework.com/docs/ 如果是新的环境会有很多坑,主要是有墙,请仔细阅读每个步骤 文档包含以下内容: l  环境搭建 l  创建demo并调试运行 l  打包APK l  添加支持热更新 l  优化启动慢问题 l  常用命令 1.      环境搭建 需要安装以下软件和插件(Android): l  安装nodeJS(自带npm) l  配置cnpm  (使用淘宝镜像取代npm) l  安装cordova和ionic2 l  安装JA

kafka集群搭建和使用Java写kafka生产者消费者

 kafka集群搭建 Java代码   1.zookeeper集群  搭建在110, 111,112 2.kafka使用3个节点110, 111,112 修改配置文件config/server.properties broker.id=110 host.name=192.168.1.110 log.dirs=/usr/local/kafka_2.10-0.8.2.0/logs 复制到其他两个节点,然后修改对应节点上的config/server.pro 3.启动,在三个节点分别执行 bin/kaf

java实现Kafka生产者示例

使用java实现Kafka的生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 package com.lisg.kafkatest; import

56 java编程思想——创建窗口和程序片 用户接口API

56.java编程思想--创建窗口和程序片 用户接口API Java 1.1 版同样增加了一些重要的新功能,包括焦点遍历,桌面色彩访问,打印"沙箱内"及早期的剪贴板支持. 焦点遍历十分的简单,因为它显然存在于AWT 库里的组件并且我们不必为使它工作而去做任何事.如果我们制造我们自己组件并且想使它们去处理焦点遍历,我们过载isFocusTraversable()以使它返回真值.如果我们想在一个鼠标单击上捕捉键盘焦点,我们可以捕捉鼠标按下事件并且调用requestFocus()需求焦点方法

Java版Kafka使用及配置解释

Java版Kafka使用及配置解释 一.Java示例 kafka是吞吐量巨大的一个消息系统,它是用scala写的,和普通的消息的生产消费还有所不同,写了个demo程序供大家参考.kafka的安装请参考官方文档. 引入Maven库 首先我们需要新建一个maven项目,然后在pom中引用kafka jar包,引用依赖如下: <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.1

关于android R.java文件无法创建的问题

R.java文件无法创建的原因网上有很多说法普遍是以下两种: 1. xml文件有错误: 解决方法就是找到哪个xml有错然后把错误修复就OK了. 2.编码问题 这时候只要把xml文件的编码改成utf8就能解决问题. 然而对于查找xml文件错误我们常常依赖android的自带工具进行检测,但是只要是人写得程序必然会有一些bug, 当检测工具出现bug后有可能导致错误很难定位. 而在xml文件校验方面,里面就有一个很坑的bug: 当menu目录下的文件有错时导致r文件无法生成时,可能不会给出任何有关m

Java JSON数据创建和读取

Java  json数据创建 package com.JavaTest; import com.google.gson.JsonArray; import com.google.gson.JsonObject; public class CreatJson { public static void main(String[] args) { JsonObject object = new JsonObject(); object.addProperty("cat", "it&

源码分析:Java堆的创建

虚拟机在内存中申请一片区域,由虚拟机自动管理,用来满足应用程序对象分配的空间需求,即堆空间. 由于程序运行的局部特性,程序创建的大多数对象都具有非常短的生命周期,而程序也会创建一些生命周期特别长的对象.简单的复制收集器无论对象的 生命周期是长是短,都会进行复制操作.而生命周期较长的对象在多次垃圾回收期间内并不会被回收,这就使得这些对象被来回复制而使得算法性能大大下降. 分代收集把堆分为多个子堆,分别用来存放不同寿命的对象.新生对象空间的将经历最频繁的垃圾回收,而对于经历了若干次垃圾收集后仍然存活

Java中如何创建进程(转)

在Java中,可以通过两种方式来创建进程,总共涉及到5个主要的类. 第一种方式是通过Runtime.exec()方法来创建一个进程,第二种方法是通过ProcessBuilder的start方法来创建进程.下面就来讲一讲这2种方式的区别和联系. 首先要讲的是Process类,Process类是一个抽象类,在它里面主要有几个抽象的方法,这个可以通过查看Process类的源代码得知: 位于java.lang.Process路径下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15