kafka 0.10.2 消息生产者

package cn.xiaojf.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * Created by 肖建锋 on 2017/3/22.
 */
public class MsgProducer2 {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.59.130:9092,192.168.59.131:9092,192.168.59.132:9092");
        props.put("acks", "all");
        props.put("retries ", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer(props);
        for(int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>("my-replicated-topic", Integer.toString(i), Integer.toString(i)));
        producer.close();
    }
}
<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.10.2.0</version>
    </dependency>
时间: 2024-08-08 05:37:48

kafka 0.10.2 消息生产者的相关文章

kafka 0.10.2 消息生产者(producer)

package cn.xiaojf.kafka.producer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka

kafka 0.8.2 消息生产者 KafkaProducer

package com.hashleaf.kafka; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerReco

kafka 0.8.2 消息生产者 producer

package com.hashleaf.kafka; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * 消息生产者 * @author xiaojf [email protected] * @since 2015-7-15 下午10:50:01 */

kafka 0.10.2 消息消费者

package cn.xiaojf.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.Kaf

Kafka 0.10问题点滴

15.如何消费内部topic: __consumer_offsets 主要是要让它来格式化:GroupMetadataManager.OffsetsMessageFormatter 最后用看了它的源码,把这部分挑选出来,自己解析了得到的byte[].核心代码如下: // com.sina.mis.app.ConsumerInnerTopic             ConsumerRecords<byte[], byte[]> records = consumer.poll(512);    

Kafka 0.10.1.0 Cluster的搭建和Topic简单操作实验

[kafka cluster机器]:机器名称 用户名称sht-sgmhadoopdn-01/02/03 root [安装目录]: /root/learnproject/app 1.将scala文件夹同步到集群其他机器(scala 2.11版本,可单独下载解压) [[email protected] app]# scp -r scala [email protected]:/root/learnproject/app/ [[email protected] app]# scp -r scala [

Kafka 0.10 KafkaConsumer流程简述

ConsumerConfig.scala 储存Consumer的配置 按照我的理解,0.10的Kafka没有专门的SimpleConsumer,仍然是沿用0.8版本的. 1.从poll开始 消费的规则如下: 一个partition只能被同一个ConsumersGroup的一个线程所消费. 线程数小于partition数,某些线程会消费多个partition. 线程数等于partition数,一个线程正好消费一个线程. 当添加消费者线程时,会触发rebalance,partition的分配发送变化

关于CDH5.11.0自带kafka 0.10 bootstrap-server 无法消费

近日需要在项目用到kafka,然后本地使用cdh集成的kafka 进行安装调试,以及些样例代码,sparkstreaming 相关调用kafka 的代码使用的原始的api 而没有走zookeeper,虽然消费者能启动,但无法消费内容. 开始我使用shell下的zk方式是可以消费误认为kafka也是没有问题的,后来想了一下是否shell也可能使用api来访问看下情况. 之后我使用bootstrap-server的方式在shell下进行测试,果然后些的样例代码一样,无法消费. 之后就是无脑的百度,谷

kafka 0.10.2 解决java无法生产消息到指定topic问题

主要是修改server.properties的advertised.listeners advertised.listeners=PLAINTEXT://192.168.59.132:9092