java kafka单列模式生产者客户端

1、所需要的依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>kafkaCli</groupId>
    <artifactId>kafkaCli</artifactId>
    <version>1.0-SNAPSHOT</version>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <!--这部分可有可无,加上的话则直接生成可运行jar包-->
                    <!--<archive>-->
                    <!--<manifest>-->
                    <!--<mainClass>${exec.mainClass}</mainClass>-->
                    <!--</manifest>-->
                    <!--</archive>-->
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>3.0.2</version>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                     <encoding>GBK</encoding>
                </configuration>
            </plugin>

        </plugins>

    </build>

    <dependencies>
        <!-- webSocket所需依赖 -->
        <dependency>
            <groupId>javax</groupId>
            <artifactId>javaee-api</artifactId>
            <version>7.0</version>
        </dependency>
        <!-- kafka 所需依赖 -->
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>RELEASE</version>
        </dependency>
    </dependencies>
</project>

2、生产者代码

package com.kafka.producer;

import com.kafka.systemConfig.SystemConfig;
import org.apache.kafka.clients.producer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;

public class ProducerKafka {
    private static final Logger log = LoggerFactory.getLogger(ProducerKafka.class);
    public static  Producer<String, String> procuder;

    {
        Properties props = new Properties();
        props.put("bootstrap.servers", SystemConfig.getProperty("bootstrap.servers","10.12.1.229:9092"));
        props.put("acks", SystemConfig.getProperty("acks","all"));
        props.put("retries", SystemConfig.getProperty("retries","0"));
        props.put("batch.size", SystemConfig.getProperty("batch.size","16384"));
        props.put("linger.ms",SystemConfig.getProperty("linger.ms","1"));
        props.put("buffer.memory", SystemConfig.getProperty("buffer.memory","33554432"));
        props.put("key.serializer", SystemConfig.getProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"));
        props.put("value.serializer", SystemConfig.getProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"));
        procuder = new KafkaProducer<String,String>(props);
    }

    /**
     * 向kafka发送消息
     * @param message
     * @return
     */
    public void sendMessgae(ProducerRecord message)  throws Exception{
     procuder.send(message, new Callback() {
             @Override
             public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                 log.info("向kafka发送数据返回偏移量: {}" , recordMetadata.offset());
             }
         });
    }

    /**
     *  向kafka发送消息
     * @param topic 主题
     * @param value 值
     * @throws Exception
     */
    public void sendMessgae(String topic, String value)  throws Exception{
        sendMessgae(new ProducerRecord<String, String>(topic, value));
    }

    /**
     *  向kafka发送消息
     * @param topic 主题
     * @param value 值
     * @throws Exception
     */
    public void sendMessgae(String topic,String key, String value)  throws Exception{
        sendMessgae(new ProducerRecord(topic, key, value));
    }
    /**
     * 刷新缓存
     */
    public void flush()  {
        procuder.flush();
    }

    /**
     * 关闭连接
     */
    public void close() {
        procuder.close();
    }

    /**
     * 单例模式确保全局中只有一份该实例
     */
    private static class ProducerKafkaHolder{
      private static  ProducerKafka instance = new  ProducerKafka();
    }

    /**
     * 延迟加载,避免启动加载
     * @return
     */
    public static  ProducerKafka getInstance(){
        return ProducerKafkaHolder.instance;
    }

    public static void main(String []args){

        try {
            ProducerKafka producerKafka =   ProducerKafka.getInstance();
            producerKafka.sendMessgae("TEST_JAVA","key","value");
            producerKafka.flush();
            producerKafka.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

3、配置项代码

package com.kafka.systemConfig;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class SystemConfig {
    private static Properties properties = null;
  //  private final static String FILE_PATH = System.getProperty("user.dir") + "/conf/kafkaProducer.properties";
  private final static String FILE_PATH =  "kafkaProducer.properties";

    private SystemConfig() {
        System.out.println("FILE_PATH" + FILE_PATH);
        properties = getConfig();
    }

    /**
     * Get property value.
     *
     * @param name
     *            property name.
     * @return the value.
     */
    public static String getProperty(String name) {
        return getProperty(name, null);
    }

    /**
     * Get property value.
     *
     * @param name
     *            property name.
     * @param defaultValue
     *            value if property not found.
     * @return the value.
     */
    public static String getProperty(String name, String defaultValue) {
        String ret = null;
        if (properties == null) {
            properties = getConfig();
        }
        if (properties != null) {
            ret = properties.getProperty(name);
            if (ret != null) {
                try {
                    ret = new String(ret.getBytes("ISO-8859-1"), "GBK");
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return ret.trim();
            } else{
                return defaultValue;
            }

        }
        return defaultValue;
    }

    /**
     * @param name
     * @param defaultValue
     * @return
     */
    public static int getIntProperty(String name, int defaultValue) {
        int res = Integer.parseInt(getProperty(name, defaultValue + ""));
        return res == 0 ? defaultValue : res;
    }

    private static Properties getConfig() {
        if (properties == null) {
            properties = new Properties();
            InputStream is = null;
            try {
                is = SystemConfig.class.getClassLoader()
                        .getResourceAsStream(FILE_PATH );
                properties.load(is);
            } catch (IOException e) {
            } finally {
                if (is != null) {
                    try {
                        is.close();
                    } catch (IOException e) {
                    }
                }
            }
        }
        return properties;
    }

    public static void main(String args[]){
       // System.out.println(SystemConfig.getProperty("bootstrap.servers"));
       // System.out.println(FILE_PATH);
      System.out.println(SystemConfig.class.getClassLoader().getResourceAsStream(FILE_PATH )); ;
    }
}

3、webSocket代码

package com.kafka.wbSocket;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
/**
 * 用于 webSocket 应用相关
 *
 **/
@ServerEndpoint("/webSocket")
public class WebSocket {
    private static final Logger log = LoggerFactory.getLogger(WebSocket.class);
    private Session session;
    public static CopyOnWriteArraySet<WebSocket> wbSockets = new CopyOnWriteArraySet<WebSocket>();

    /**
     * 建立连接。
     * 建立连接时入参为session
     */
    @OnOpen
    public void onOpen(Session session){
        this.session = session;
        wbSockets.add(this);
        log.info("New session insert,sessionId is "+ session.getId());
    }
    /**
     * 关闭连接
     */
    @OnClose
    public void onClose(){
        wbSockets.remove(this);
        log.info("A session insert,sessionId is "+ session.getId());
    }
    /**
     * 接收数据。
     *
     */
    @OnMessage
    public void onMessage(String message ,Session session){
        log.info(message + "from " + session.getId());
    }

    /**
     * 发送数据
     * @param message
     * @throws IOException
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }
}

        因平台jdk只支持1.6 、kafka所需版本为1.8 顾此消息中间件展示被忽略

原文地址:https://www.cnblogs.com/xiaoyu1994/p/10478583.html

时间: 2024-10-10 02:17:17

java kafka单列模式生产者客户端的相关文章

在JAVA和android中常用的单列模式

在很多开发中,项目为了节约资源,都把一个类的构造函数变为私有化,这样整个项目中就不能创建多个实例,这样的方法我们称为单例模式 现在通过代码来简介下这个单例模式: 在新建一个java项目后,创建一个实体类User.java,和测试类,main.java 代码如下: 1 public class User { 2 private static User user; 3 4 //添加该实例的属性 5 private String name; 6 private String sex; 7 privat

Java单列模式

设计模式 单列模式的定义和作用 目的:使得类的一个对象成为该类系统中的唯一实列: 定义:一个类有且仅有一个实例,并且自行实列化向整个系统提供?: 单列模式分为 恶汉式  (在创建对象的时候就直接初始化了)以空间换时间:恶汉士线程安全 懒汉式(创建类内的时候先不实列化,在第一次使用的时候在初始化)以时间换空间:懒汉士存在线程风险 单列模式的使用优缺点 原文地址:https://www.cnblogs.com/xiaoruirui/p/10699054.html

Java多线程编程模式实战指南之Promise模式

Promise模式简介(转) Promise模式是一种异步编程模式 .它使得我们可以先开始一个任务的执行,并得到一个用于获取该任务执行结果的凭据对象,而不必等待该任务执行完毕就可以继续执行其他操作.等到我们需要该任务的执行结果时,再调用凭据对象的相关方法来获取.这样就避免了不必要的等待,增加了系统的并发性.这好比我们去小吃店,同时点了鸭血粉丝汤和生煎包.当我们点餐付完款后,我们拿到手的其实只是一张可借以换取相应食品的收银小票(凭据对象)而已,而不是对应的实物.由于鸭血粉丝汤可以较快制作好,故我们

java 动态代理模式

一.相关类及其方法:java.lang.reflect.Proxy,Proxy 提供用于创建动态代理类和实例的静态方法.newProxyInstance()返回一个指定接口的代理类实例,该接口可以将方法调用指派到指定的调用处理程序(详见api文档) java.lang.reflect.InvocationHandler,InvocationHandler 是代理实例的调用处理程序 实现的接口.invoke()在代理实例上处理方法调用并返回结果.在与方法关联的代理实例上调用方法时,将在调用处理程序

[转载]转Java 几个memcached 连接客户端对比 选择

原文地址:转Java 几个memcached 连接客户端对比 选择作者:闪出光芒 Xmemcached 1.2.6.1 released,所以更新了一下Java Memcached Client Benchmark.对比下Xmemached,Spymemcached和Java-Memcached-Client这三个开源客户端的性能,具体的测试信息可以看这个链接. 测试源码: Java代码 svn co http://xmemcached.googlecode.com/svn/trunk/benc

JAVA实现的异步redis客户端

再使用redis的过程中,发现使用缓存虽然好,但是有些地方还是比较难权衡,缓存对象大了,存储对象时的序列化工作很繁重,消耗大量cpu:那么切分成很小的部分吧,存取的次数变多了,redis客户端的交互次数上不去,这是一个矛盾.要是有一个客户端能支持更多的交互次数,那么在完成既定指标的前提下,岂不是可以让我们的建模工作变的更宽松一些? 于是参照redis协议,花了5天时间,做了一个具备基本功能的redis客户端.它的特性: 1.支持异步调用,在getA之后不用等结果,能继续getB,getC,等等.

单列模式下的数据库连接与Servlet之间页面访问用户登录的小例子

下面是我自己写的一个关于servlet的例子 首先是数据库配置,使用的是静态的单例模式 代码如下: / 数据库地址连接 // 使用静态单列模式 public class JdbcUtil { private static String driverName; private static String url; private static String username; private static String password; private static Properties pro

JAVA NIO non-blocking模式实现高并发服务器

JAVA NIO non-blocking模式实现高并发服务器 分类: JAVA NIO2014-04-14 11:12 1912人阅读 评论(0) 收藏 举报 目录(?)[+] Java自1.4以后,加入了新IO特性,NIO. 号称new IO. NIO带来了non-blocking特性. 这篇文章主要讲的是如何使用NIO的网络新特性,来构建高性能非阻塞并发服务器. 文章基于个人理解,我也来搞搞NIO.,求指正. 在NIO之前 服务器还是在使用阻塞式的java socket. 以Tomcat最

Java设计模式--合成模式

合成模式:合成模式把部分和整体的关系用树结构表示出来.合成模式使得客户端把一个个单独的成分对象和由他们复合而成的合成对象等同看待. 两种形式:根据所实现的接口分为安全式和透明式 合成模式可以不提供父对象的管理方法,但是合成模式必须在合适的地方提供子对象的管理方法. 安全式 安全式的合成模式要求管理聚集的方法只出现在树枝构件类中,而不出现在树叶构件类中. 角色 抽象构件角色:这是一个抽象角色,他给参加组合的对象定义出公共的接口和默认行为,可以用来管理所有子对象.合成对象通常把它包含的子对象当做类型