kafka系列五、kafka常用java API

引入maven包

<dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.11</artifactId>
       <version>0.10.0.0</version>
</dependency>

一、同步发送消息

package com.example.demo.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class SynProducer {
    private static Properties getProps(){
        Properties props =  new Properties();
        props.put("bootstrap.servers", "47.52.199.51:9092");
        props.put("acks", "all");
        props.put("retries", 2);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1000);
        props.put("buffer.memory", 33554432);
        props.put("client.id", "producer-syn-1");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return props;
    }

    public static void main(String[] args) {
        KafkaProducer<String, String> producer = new KafkaProducer<>(getProps());
        for(int i=0; i< 1000; i++){
            ProducerRecord<String, String> record = new ProducerRecord<>("test-1", "topic_"+i,"test-"+i);
            Future<RecordMetadata> metadataFuture = producer.send(record);
            RecordMetadata recordMetadata = null;
            try {
                recordMetadata = metadataFuture.get();
                System.out.println("发送成功!");
                System.out.println("topic:"+recordMetadata.topic());
                System.out.println("partition:"+recordMetadata.partition());
                System.out.println("offset:"+recordMetadata.offset());
            } catch (InterruptedException|ExecutionException e) {
                System.out.println("发送失败!");
                e.printStackTrace();
            }
        }
    }
}

原文地址:https://www.cnblogs.com/wangzhuxing/p/10099894.html

时间: 2024-10-09 16:04:09

kafka系列五、kafka常用java API的相关文章

Apache Kafka系列(五) Kafka Connect及FileConnector示例

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka系列(三) Java API使用 Apache Kafka系列(四) 多线程Consumer方案 Apache Kafka系列(五) Kafka Connect及FileConnector示例 一. Kafka Connect简介 Kafka是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析).为何集成其他系统和解耦应用,经常使用Producer来发送消

_00017 Kafka的体系结构介绍以及Kafka入门案例(初级案例+Java API的使用)

博文作者:妳那伊抹微笑 个性签名:世界上最遥远的距离不是天涯,也不是海角,而是我站在妳的面前,妳却感觉不到我的存在 技术方向:Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+Mahout+Spark ... 云计算技术 转载声明:可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作! qq交流群:214293307  (期待与你一起学习,共同进步) # Kfaka的体系结构 # 学习前言 Kafka的整个学习过程就是自己看官网的文档,出

常用 Java API

常用Java API 一. java.io.BufferedReader类(用于从文件中读入一段字符:所属套件:java.io) 1. 构造函数BufferedReader(java.io.FileReader FileReader变量) 说明:新建一个BufferReader对象. 2. close方法 void close() 说明:关闭BufferReader对象. 3. readLine方法 java.lang.string readLine() 说明:从文件中读取一行字符.若为NULL

HBase的常用Java API

1. 创建HBase表的对象 HBase表的对项名字叫HTable,创建它的方法有很多,常见的有如下: org.apache.hadoop.hbase.client.HTable hTable = new HTable(org.apache.hadoop.hbase.HBaseConfiguration conf, String tableName); 或 org.apache.hadoop.hbase.client.HTable hTable = new HTable(org.apache.h

Kafka系列之-Kafka监控工具KafkaOffsetMonitor配置及使用

KafkaOffsetMonitor是一个可以用于监控Kafka的Topic及Consumer消费状况的工具,其配置和使用特别的方便.源项目Github地址为:https://github.com/quantifind/KafkaOffsetMonitor. 最简单的使用方式是从Github上下载一个最新的KafkaOffsetMonitor-assembly-0.2.1.jar,上传到某服务器上,然后执行一句命令就可以运行起来.但是在使用过程中有可能会发现页面反应缓慢或者无法显示相应内容的情况

spark2.x由浅入深深到底系列六之RDD java api详解三

学习任何spark知识点之前请先正确理解spark,可以参考:正确理解spark 本文详细介绍了spark key-value类型的rdd java api 一.key-value类型的RDD的创建方式 1.sparkContext.parallelizePairs JavaPairRDD<String, Integer> javaPairRDD =         sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3

spark2.x由浅入深深到底系列六之RDD java api详解一

以下对RDD的三种创建方式.单类型RDD基本的transformation api.采样Api以及pipe操作进行了java api方面的阐述 一.RDD的三种创建方式 从稳定的文件存储系统中创建RDD,比如local fileSystem或者hdfs等,如下: //从hdfs文件中创建 JavaRDD<String> textFileRDD = sc.textFile("hdfs://master:9999/users/hadoop-twq/word.txt"); //从

spark2.x由浅入深深到底系列六之RDD java api调用scala api的原理

RDD java api其实底层是调用了scala的api来实现的,所以我们有必要对java api是怎么样去调用scala api,我们先自己简单的实现一个scala版本和java版本的RDD和SparkContext 一.简单实现scala版本的RDD和SparkContext class RDD[T](value: Seq[T]) {   //RDD的map操作   def map[U](f: T => U): RDD[U] = {     new RDD(value.map(f))   

spark2.x由浅入深深到底系列六之RDD java api用JdbcRDD读取关系型数据库

学习任何的spark技术之前,请先正确理解spark,可以参考:正确理解spark 以下是用spark RDD java api实现从关系型数据库中读取数据,这里使用的是derby本地数据库,当然可以是mysql或者oracle等关系型数据库: package com.twq.javaapi.java7; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; imp