kafka的Java客户端示例代码(kafka_2.11-0.8.2.2)

使用Producer,ConsumerConnector

HelloWorldProducer.java

package cn.ljh.kafka.kafka_helloworld;

import java.util.Date;
import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class HelloWorldProducer {
    public static void main(String[] args) {
        long events = Long.parseLong(args[0]);
        Random rnd = new Random();

        Properties props = new Properties();
        //配置kafka集群的broker地址,建议配置两个以上,以免其中一个失效,但不需要配全,集群会自动查找leader节点。
        props.put("metadata.broker.list", "192.168.137.176:9092,192.168.137.176:9093");
        //配置value的序列化类
        //key的序列化类key.serializer.class可以单独配置,默认使用value的序列化类
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        //配置partitionner选择策略,可选配置
        props.put("partitioner.class", "cn.ljh.kafka.kafka_helloworld.SimplePartitioner");
        props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);

        Producer<String, String> producer = new Producer<String, String>(config);

        for (long nEvents = 0; nEvents < events; nEvents++) {
               long runtime = new Date().getTime();
               String ip = "192.168.2." + rnd.nextInt(255);
               String msg = runtime + ",www.example.com," + ip;
               KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
               producer.send(data);
        }
        producer.close();
    }
}

SimplePartitioner.java

package cn.ljh.kafka.kafka_helloworld;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner {
    public SimplePartitioner (VerifiableProperties props) {

    }

    public int partition(Object key, int a_numPartitions) {
        int partition = 0;
        String stringKey = (String) key;
        int offset = stringKey.lastIndexOf(‘.‘);
        if (offset > 0) {
           partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
        }
       return partition;
  }

}

ConsumerGroupExample.java

package cn.ljh.kafka.kafka_helloworld;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ConsumerGroupExample {
    private final ConsumerConnector consumer;
    private final String topic;
    private  ExecutorService executor;

    public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig(a_zookeeper, a_groupId));
        this.topic = a_topic;
    }

    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
        try {
            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
            }
        } catch (InterruptedException e) {
            System.out.println("Interrupted during shutdown, exiting uncleanly");
        }
   }

    public void run(int a_numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);

        // now create an object to consume the messages
        //
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTest(stream, threadNumber));
            threadNumber++;
        }
    }

    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");

        return new ConsumerConfig(props);
    }

    public static void main(String[] args) {
//        String zooKeeper = args[0];
//        String groupId = args[1];
//        String topic = args[2];
//        int threads = Integer.parseInt(args[3]);

        String zooKeeper = "192.168.137.176:2181,192.168.137.176:2182,192.168.137.176:2183";
        String groupId = "group1";
        String topic = "page_visits";
        int threads = 5;

        ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
        example.run(threads);

        try {
            Thread.sleep(10000);
        } catch (InterruptedException ie) {

        }
        example.shutdown();
    }
}

ConsumerTest.java

package cn.ljh.kafka.kafka_helloworld;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

public class ConsumerTest implements Runnable {
    private KafkaStream m_stream;
    private int m_threadNumber;

    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
    }

    public void run() {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        //线程会一直等待有消息进入
        while (it.hasNext())
            System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
        System.out.println("Shutting down Thread: " + m_threadNumber);
    }
}
时间: 2024-10-24 11:19:15

kafka的Java客户端示例代码(kafka_2.11-0.8.2.2)的相关文章

kafka的Java客户端示例代码(kafka_2.12-0.10.2.1)

使用0.9开始增加的KafkaProducer和KafkaConsumer. Pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.o

4 kafka集群部署及生产者java客户端编程 + kafka消费者java客户端编程

本博文的主要内容有   kafka的单机模式部署 kafka的分布式模式部署 生产者java客户端编程 消费者java客户端编程 运行kafka ,需要依赖 zookeeper,你可以使用已有的 zookeeper 集群或者利用 kafka自带的zookeeper. 单机模式,用的是kafka自带的zookeeper, 分布式模式,用的是外部安装的zookeeper,即公共的zookeeper. Step 6: Setting up a multi-broker cluster So far w

C# WebSocket 服务端示例代码 + HTML5客户端示例代码

WebSocket服务端 C#示例代码 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Net.Sockets; using System.Net; using System.Threading; using System.Text.RegularExpressions; using System.Security.Cryptography; na

HDFS的Java客户端操作代码(HDFS的查看、创建)

1.HDFS的put上传文件操作的java代码: 1 package Hdfs; 2 3 import java.io.FileInputStream; 4 import java.io.FileNotFoundException; 5 import java.io.IOException; 6 import java.net.URI; 7 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.fs.F

Kafka使用Java客户端进行访问

添加maven依赖包 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency> <dependency> <groupId>log4j</groupId> <artifac

HDFS的java客户端操作代码(Windows上面打jar包,提交至linux运行)

1.通过java.net.URL实现屏幕显示demo1文件的内容 1 package Hdfs; 2 import java.io.InputStream; 3 import java.net.URL; 4 import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; 5 import org.apache.hadoop.io.IOUtils; 6 7 public class JarDemo { 8 static{ 9 URL.setURLStr

JAVA SSM 示例代码

SSM 即spring+spring mvc+mybatis,开发工具IDEA 1.先看下项目结构如图: 2.主要配置文件 spring-mvc.xml <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XM

kafka生产者java客户端

producer 包含一个用于保存待发送消息的缓冲池,缓冲池中消息是还没来得及传输到kafka集群的消息. 位于底层的kafka I/O线程负责将缓冲池中的消息转换成请求发送到集群.如果在结束produce时,没有调用close()方法,那么这些资源会发生泄露. 常用配置 bootstrap.servers 用于初始化时建立链接到kafka集群,以host:port形式,多个以逗号分隔host1:port1,host2:port2: acks 生产者需要server端在接收到消息后,进行反馈确认

java封装示例代码

package com.imooc; public class Telphone { private float screen; private float cpu; private float mem; public float getScreen(){ return screen; } public void setScreen(float newScreen){ screen = newScreen; } public Telphone(){ System.out.println("无参的