kafka的Java客户端示例代码(kafka_2.12-0.10.2.1)

使用0.9开始增加的KafkaProducer和KafkaConsumer。

Pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>cn.ljh.kafka</groupId>
  <artifactId>kafka-helloworld</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>kafka-helloworld</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.12</artifactId>
      <version>0.10.2.1</version>
      <scope>compile</scope>
      <exclusions>
        <exclusion>
          <artifactId>jmxri</artifactId>
          <groupId>com.sun.jmx</groupId>
        </exclusion>
        <exclusion>
          <artifactId>jms</artifactId>
          <groupId>javax.jms</groupId>
        </exclusion>
        <exclusion>
          <artifactId>jmxtools</artifactId>
          <groupId>com.sun.jdmk</groupId>
        </exclusion>
      </exclusions>
    </dependency>
  </dependencies>
</project>

HelloWorldProducer2.java

package cn.ljh.kafka.kafka_helloworld;

import java.util.Date;
import java.util.Properties;
import java.util.Random;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class HelloWorldProducer2 {
    public static void main(String[] args) {
         long events = Long.parseLong(args[0]);
         Random rnd = new Random();

         Properties props = new Properties();
         props.put("bootstrap.servers", "192.168.137.176:9092,192.168.137.176:9093,192.168.137.176:9094");
         props.put("acks", "all");
         props.put("retries", 0);
         props.put("batch.size", 16384);
         props.put("linger.ms", 1);
         props.put("buffer.memory", 33554432);
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         //配置partitionner选择策略,可选配置
         props.put("partitioner.class", "cn.ljh.kafka.kafka_helloworld.SimplePartitioner2");

         Producer<String, String> producer = new KafkaProducer<>(props);

         for (long nEvents = 0; nEvents < events; nEvents++) {
                long runtime = new Date().getTime();
                String ip = "192.168.2." + rnd.nextInt(255);
                String msg = runtime + ",www.example.com," + ip;
                ProducerRecord<String, String> data = new ProducerRecord<String, String>("page_visits", ip, msg);
                producer.send(data,
                         new Callback() {
                     public void onCompletion(RecordMetadata metadata, Exception e) {
                         if(e != null) {
                            e.printStackTrace();
                         } else {
                            System.out.println("The offset of the record we just sent is: " + metadata.offset());
                         }
                     }
                 });
         }
         producer.close();
    }
}

SimplePartitioner2.java

package cn.ljh.kafka.kafka_helloworld;

import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

public class SimplePartitioner2 implements Partitioner {

    @Override
    public void configure(Map<String, ?> configs) {
        // TODO Auto-generated method stub

    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
            Object value, byte[] valueBytes, Cluster cluster) {
            int partition = 0;
             List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            String stringKey = (String) key;
            int offset = stringKey.lastIndexOf(‘.‘);
            if (offset > 0) {
               partition = Integer.parseInt( stringKey.substring(offset+1)) % numPartitions;
            }

            return partition;
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }

}

HelloWorldConsumer2.java

package cn.ljh.kafka.kafka_helloworld;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class HelloWorldConsumer2 {

    public static void main(String[] args) {
        Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.176:9092,192.168.137.176:9093,192.168.137.176:9094");
        props.put(ConsumerConfig.GROUP_ID_CONFIG ,"test") ;
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("page_visits"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }

}
时间: 2024-10-04 23:48:03

kafka的Java客户端示例代码(kafka_2.12-0.10.2.1)的相关文章

kafka的Java客户端示例代码(kafka_2.11-0.8.2.2)

使用Producer,ConsumerConnector HelloWorldProducer.java package cn.ljh.kafka.kafka_helloworld; import java.util.Date; import java.util.Properties; import java.util.Random; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; impor

4 kafka集群部署及生产者java客户端编程 + kafka消费者java客户端编程

本博文的主要内容有   kafka的单机模式部署 kafka的分布式模式部署 生产者java客户端编程 消费者java客户端编程 运行kafka ,需要依赖 zookeeper,你可以使用已有的 zookeeper 集群或者利用 kafka自带的zookeeper. 单机模式,用的是kafka自带的zookeeper, 分布式模式,用的是外部安装的zookeeper,即公共的zookeeper. Step 6: Setting up a multi-broker cluster So far w

C# WebSocket 服务端示例代码 + HTML5客户端示例代码

WebSocket服务端 C#示例代码 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Net.Sockets; using System.Net; using System.Threading; using System.Text.RegularExpressions; using System.Security.Cryptography; na

HDFS的Java客户端操作代码(HDFS的查看、创建)

1.HDFS的put上传文件操作的java代码: 1 package Hdfs; 2 3 import java.io.FileInputStream; 4 import java.io.FileNotFoundException; 5 import java.io.IOException; 6 import java.net.URI; 7 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.fs.F

Kafka使用Java客户端进行访问

添加maven依赖包 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency> <dependency> <groupId>log4j</groupId> <artifac

JAVA SSM 示例代码

SSM 即spring+spring mvc+mybatis,开发工具IDEA 1.先看下项目结构如图: 2.主要配置文件 spring-mvc.xml <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XM

HDFS的java客户端操作代码(Windows上面打jar包,提交至linux运行)

1.通过java.net.URL实现屏幕显示demo1文件的内容 1 package Hdfs; 2 import java.io.InputStream; 3 import java.net.URL; 4 import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; 5 import org.apache.hadoop.io.IOUtils; 6 7 public class JarDemo { 8 static{ 9 URL.setURLStr

kafka生产者java客户端

producer 包含一个用于保存待发送消息的缓冲池,缓冲池中消息是还没来得及传输到kafka集群的消息. 位于底层的kafka I/O线程负责将缓冲池中的消息转换成请求发送到集群.如果在结束produce时,没有调用close()方法,那么这些资源会发生泄露. 常用配置 bootstrap.servers 用于初始化时建立链接到kafka集群,以host:port形式,多个以逗号分隔host1:port1,host2:port2: acks 生产者需要server端在接收到消息后,进行反馈确认

java封装示例代码

package com.imooc; public class Telphone { private float screen; private float cpu; private float mem; public float getScreen(){ return screen; } public void setScreen(float newScreen){ screen = newScreen; } public Telphone(){ System.out.println("无参的