Kafka 应用实例

Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:

  • 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
  • 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
  • Consumer客户端pull,随机读,利用sendfile系统调用进行zero-copy ,批量拉数据
  • 消费状态保存在客户端
  • 支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输。
  • 数据迁移、扩容对用户透明
  • 支持Hadoop并行数据加载。
  • 支持online(在线)和offline(离线)的场景。
  • 持久化:通过将数据持久化到硬盘以及replication防止数据丢失。
  • scale out:无需停机即可扩展机器。
  • 定期删除机制,支持设定partitions的segment file保留时间。

项目实例:https://github.com/windwant/kafka-test

kafka.properties

value.serializer=org.apache.kafka.common.serialization.StringSerializer
key.serializer=org.apache.kafka.common.serialization.StringSerializer
request.required.acks=1
bootstrap.servers=localhost:9092

value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
group.id=test-consumer-group

Producer:

package org.windwant.kafka;

import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * Producer
 */
public class MyKafkaProducer {

    private Properties props;
    public static void main(String[] args) throws ConfigurationException {
        new MyKafkaProducer().start();
    }

    public MyKafkaProducer() throws ConfigurationException {
        props = new Properties();
        PropertiesConfiguration config = new PropertiesConfiguration("kafka.properties");
        config.setReloadingStrategy(new FileChangedReloadingStrategy());
        //×??ˉ±£′?
        config.setAutoSave(true);
        props.put("value.serializer", config.getString("value.serializer"));
        props.put("key.serializer", config.getString("key.serializer"));
        props.put("request.required.acks", config.getString("request.required.acks"));
        props.put("bootstrap.servers", config.getString("bootstrap.servers"));
    }

    public void start(){
        try {
            Producer<String, String> producer = new KafkaProducer<>(props);
            for(int i = 0; i < 100; i++) {
                RecordMetadata result = producer.send(new ProducerRecord<>("mykafka",
                        "kafka key: " + Integer.toString(i),
                        "kafka value: " + Integer.toString(i))).get();
                System.out.println("producer send: " + result);
                Thread.sleep(1000);
            }
            producer.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

Consumer:

package org.windwant.kafka;

import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * Consumer.
 */
public class MyKafkaConsumer {
    private Properties props;
    public static void main(String[] args) throws ConfigurationException {
        new MyKafkaConsumer().start();
    }

    public MyKafkaConsumer() throws ConfigurationException {
        props = new Properties();
        PropertiesConfiguration config = new PropertiesConfiguration("kafka.properties");
        config.setReloadingStrategy(new FileChangedReloadingStrategy());
        //自动保存
        config.setAutoSave(true);
        props.put("value.deserializer", config.getString("value.deserializer"));
        props.put("key.deserializer", config.getString("key.deserializer"));
        props.put("bootstrap.servers", config.getString("bootstrap.servers"));
        props.put("group.id", config.getString("group.id"));
    }

    public void  start(){
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("mykafka"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
                System.out.println();
            }
        }
    }
}
时间: 2024-10-10 06:06:51

Kafka 应用实例的相关文章

kafka生产实例安装

2019/3/14 星期四 Linux 初始化脚本 (centos6 centos7 通用)Linux 初始化脚本 (centos6 centos7 通用)zookeeper生产环境搭建 zookeeper生产环境搭建 在安装前请务必安装好zookeeper 查看上面2个链接地址! kafka 生产环境搭建 [root@emm-kafka01-10--174 ~]# cd /opt/ins/ [root@emm-kafka01-10--174 ins]# ll total 233044 -rwx

kafka producer实例及原理分析

1.前言 首先,描述下应用场景: 假设,公司有一款游戏,需要做行为统计分析,数据的源头来自日志,由于用户行为非常多,导致日志量非常大.将日志数据插入数据库然后再进行分析,已经满足不了.最好的办法是存日志,然后通过对日志的分析,计算出有用的数据.我们采用kafka这种分布式日志系统来实现这一过程. 步骤如下: 搭建KAFKA系统运行环境 如果你还没有搭建起来,可以参考我的博客: http://zhangfengzhe.blog.51cto.com/8855103/1556650 设计数据存储格式

kafka java实例

生产者 1 package com; 2 import java.util.Properties; 3 import java.util.concurrent.TimeUnit; 4 5 import kafka.javaapi.producer.Producer; 6 import kafka.producer.KeyedMessage; 7 import kafka.producer.ProducerConfig; 8 import kafka.serializer.StringEncode

kafka spring 实例

使用定时器发送后  结果如下 kafka 代码下载 Java代码   15.安装kafka cd /usr/local/ wget http://mirror.bit.edu.cn/apache/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz tar xf kafka_2.10-0.10.0.0.tgz ln -s /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka chown -R hdfs:hadoop /usr/lo

Kafka编程实例

编程 Producer是一个应用程序,它创建消息并发送它们到Kafka broker中.这些producer在本质上是不同.比如,前端应用程序,后端服务,代理服务,适配器对于潜在的系统,Hadoop对于的Producer.这些不同的Producer能够使用不同的语言实现,比如java.C和Python.下面的这部图表解释了消息producer的Kafka API. 下面将详细介绍如果编写一个简单的Producer和Consumer应用程序. 发送简单消息给Kafka broker,Produce

kafka使用实例

定义一个procucer package cn.vko.common.kafka; import java.util.Properties; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.vko.common.utils.mybatis.GenCreateInterceptor; import kafka.javaapi

SpringBoot Kafka 整合实例教程

1.使用IDEA新建工程引导方式,创建消息生产工程 springboot-kafka-producer. 工程POM文件代码如下: 1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instanc

最简洁的kafka开发实例

问题导读 1.如何启动kafka? 2.如何通过代码实现生产者例子 ? 3.如何通过代码实现消费者例子 ? 1.启动kafka. //启动zookeeper server (用&是为了能退出命令行): bin/zookeeper-server-start.sh config/zookeeper.properties  & //启动kafka server:   bin/kafka-server-start.sh config/server.properties  & 2.新建一个生

整合 KAFKA+Flink 实例(第一部分,趟坑记录)

2017年后,一大波网络喧嚣,说流式处理如何牛叉,如何高大上,抱歉,工作满负荷,没空玩那个: 今年疫情隔离在家,无聊,开始学习 KAFKA+Flink ,目前的打算是用爬虫抓取网页数据,传递到Kafka中,再用Flink计算. 个人性格原因,我不愿意过分沉迷于纸质或者电子教程材料,也不是特别喜欢网上某些培训机构已经过时了的所谓培训视频, 喜欢动手直接写代码,所以简单翻看一点PDF教程,看了两集“培训视频”,也没说Kafka.flink两组件咋结合使用,不耐烦,直接开码(码农的糙性): 之前我写过