五 、 Kafka producer 拦截器(interceptor) 和 六 、Kafka Streaming案例

五 Kafka producer 拦截器(interceptor)

5.1 拦截器原理

Producer 拦截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定

制化控制逻辑。

对于 producer 而言,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会

对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个 interceptor

按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor 的实现接口是

org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

(1)configure(configs)

获取配置信息和初始化数据时调用。

(2)onSend(ProducerRecord):

该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在

消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保

证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算

(3)onAcknowledgement(RecordMetadata, Exception):

该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在 producer 回调逻辑

触发之前。onAcknowledgement 运行在 producer 的 IO 线程中,因此不要在该方法中放入很

重的逻辑,否则会拖慢 producer 的消息发送效率

(4)close:

关闭 interceptor,主要用于执行一些资源清理工作

如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保

线程安全。另外倘若指定了多个 interceptor,则 producer 将按照指定顺序调用它们,并仅仅

是捕获每个 interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中

要特别留意。

5.2 拦截器案例

1)需求:

实现一个简单的双 interceptor 组成的拦截链。第一个 interceptor 会在消息发送前将时间———————————————————————————

戳信息加到消息 value 的最前部;第二个 interceptor 会在消息发送后更新成功发送消息数或

失败发送消息数。

2)案例实操

(1)增加时间戳拦截器

package cn.bw.kafka.interceptor;

import java.util.Map;

import org.apache.kafka.clients.producer.ProducerInterceptor;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;

public class TimeInterceptor implements ProducerInterceptor<String, String> {

@Override

public void configure(Map<String, ?> configs) {

}

@Override

public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record)

{

// 创建一个新的 record,把时间戳写入消息体的最前部

return new ProducerRecord(record.topic(), record.partition(), record.timestamp(),

record.key(),

System.currentTimeMillis() + "," + record.value().toString());

}

@Override———————————————————————————

public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

}

@Override

public void close() {

}

}

(2)统计发送消息成功和发送失败消息数,并在 producer 关闭时打印这两个计数器

package cn.bw.kafka.interceptor;

import java.util.Map;

import org.apache.kafka.clients.producer.ProducerInterceptor;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;

public class CounterInterceptor implements ProducerInterceptor<String, String>{

private int errorCounter = 0;

private int successCounter = 0;

@Override

public void configure(Map<String, ?> configs) {

}

@Override

public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record)

{

return record;

}

@Override

public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

// 统计成功和失败的次数

if (exception == null) {

successCounter++;

} else {

errorCounter++;

}

}

@Override

public void close() {———————————————————————————

// 保存结果

System.out.println("Successful sent: " + successCounter);

System.out.println("Failed sent: " + errorCounter);

}

}

(3)producer 主程序

package cn.bw.kafka.interceptor;

import java.util.ArrayList;

import java.util.List;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.clients.producer.ProducerRecord;

public class InterceptorProducer {

public static void main(String[] args) throws Exception {

// 1 设置配置信息

Properties props = new Properties();

props.put("bootstrap.servers", "node01:9092");

props.put("acks", "all");

props.put("retries", 0);

props.put("batch.size", 16384);

props.put("linger.ms", 1);

props.put("buffer.memory", 33554432);

props.put("key.serializer",

"org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer",

"org.apache.kafka.common.serialization.StringSerializer");

// 2 构建拦截链

List<String> interceptors = new ArrayList<>();

interceptors.add("cn.bw.kafka.interceptor.TimeInterceptor");

interceptors.add("cn.bw.kafka.interceptor.CounterInterceptor");

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

String topic = "first";

Producer<String, String> producer = new KafkaProducer<>(props);

// 3 发送消息

for (int i = 0; i < 10; i++) {

ProducerRecord<String, String> record = new ProducerRecord<>(topic,

"message" + i);

producer.send(record);

}———————————————————————————

// 4 一定要关闭 producer,这样才会调用 interceptor 的 close 方法

producer.close();

}

}

3)测试

(1)在 kafka 上启动消费者,然后运行客户端 java 程序。

[[email protected] kafka]$ in/kafka-console-consumer.sh --zookeeper node01:2181

--from-beginning --topic first

1501904047034,message0

1501904047225,message1

1501904047230,message2

1501904047234,message3

1501904047236,message4

1501904047240,message5

1501904047243,message6

1501904047246,message7

1501904047249,message8

1501904047252,message9

(2)观察 java 平台控制台输出数据如下:

Successful sent: 10

Failed sent: 0

六 kafka Streams

6.1 概述

6.1.1 Kafka Streams (数据处理)

Kafka Streams。Apache Kafka 开源项目的一个组成部分。是一个功能强大,易于使用的

库。用于在 Kafka 上构建高可分布式、拓展性,容错的应用程序。

6.1.2 Kafka Streams 特点

1)功能强大

高扩展性,弹性,容错———————————————————————————

2)轻量级

无需专门的集群

一个库,而不是框架

3)完全集成

100%的 Kafka 0.10.0 版本兼容

易于集成到现有的应用程序

4)实时性

毫秒级延迟

并非微批处理

窗口允许乱序数据

2

6.1.3 为什么要有 Kafka Stream

当前已经有非常多的流式处理系统,最知名且应用最多的开源流式处理系统有 Spark

Streaming 和 Apache Storm。Apache Storm 发展多年,应用广泛,提供记录级别的处理能力,

当前也支持 SQL on Stream。而 Spark Streaming 基于 Apache Spark,可以非常方便与图计算,

SQL 处理等集成,功能强大,对于熟悉其它 Spark 应用开发的用户而言使用门槛低。另外,

目前主流的 Hadoop 发行版,如 Cloudera 和 Hortonworks,都集成了 Apache Storm 和 Apache

Spark,使得部署更容易。

既然 Apache Spark 与 Apache Storm 拥用如此多的优势,那为何还需要 Kafka Stream 呢?

笔者认为主要有如下原因。

第一,Spark 和 Storm 都是流式处理框架,而 Kafka Stream 提供的是一个基于 Kafka 的

流式处理类库。框架要求开发者按照特定的方式去开发逻辑部分,供框架调用。开发者很难

了解框架的具体运行方式,从而使得调试成本高,并且使用受限。而 Kafka Stream 作为流式

处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,方便

使用和调试。———————————————————————————

第二,虽然 Cloudera 与 Hortonworks 方便了 Storm 和 Spark 的部署,但是这些框架的部

署仍然相对复杂。而 Kafka Stream 作为类库,可以非常方便的嵌入应用程序中,它对应用的

打包和部署基本没有任何要求。

第三,就流式处理系统而言,基本都支持 Kafka 作为数据源。例如 Storm 具有专门的

kafka-spout,而 Spark 也提供专门的 spark-streaming-kafka 模块。事实上,Kafka 基本上是主

流的流式处理系统的标准数据源。换言之,大部分流式系统中都已部署了 Kafka,此时使用

Kafka Stream 的成本非常低。

第四,使用 Storm 或 Spark Streaming 时,需要为框架本身的进程预留资源,如 Storm

的 supervisor 和 Spark on YARN 的 node manager。即使对于应用实例而言,框架本身也会占

用部分资源,如 Spark Streaming 需要为 shuffle 和 storage 预留内存。但是 Kafka 作为类库不

占用系统资源。

第五,由于 Kafka 本身提供数据持久化,因此 Kafka Stream 提供滚动部署和滚动升级以

及重新计算的能力。

第六,由于 Kafka Consumer Rebalance 机制,Kafka Stream 可以在线动态调整并行度。

6.2 Kafka Stream 案例

6.2.1 eclipse 打包插件安装

1)将net.sf.fjep.fatjar_0.0.32.jar拷贝到eclipse安装目录中的plugins目录下,然后重启eclipse

即可。

2)插件使用方法——————————————————————————————————————————————————————

6.2.2 数据清洗案例

0)需求:

实时处理单词带有”>>>”前缀的内容。例如输入”hadoop>>>ximenqing”,最终处理成

“ximenqing”

注:如果使用 maven,加入如下依赖

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-streams</artifactId>———————————————————————————

<version>2.0.0</version>

</dependency>

1)创建主类

package cn.bw.kafka.stream;

import java.util.Properties;

import org.apache.kafka.streams.KafkaStreams;

import org.apache.kafka.streams.StreamsConfig;

import org.apache.kafka.streams.processor.Processor;

import org.apache.kafka.streams.processor.ProcessorSupplier;

import org.apache.kafka.streams.processor.TopologyBuilder;

public class Application {

public static void main(String[] args) {

// 定义输入的 topic

String from = "first";

// 定义输出的 topic

String to = "second";

// 设置参数

Properties settings = new Properties();

settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");

settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092");

StreamsConfig config = new StreamsConfig(settings);

// 构建拓扑

//0.10 版本如下

//TopologyBuilder builder = new TopologyBuilder();

//2.0 版本如下

Topology builder = new Topology();

builder.addSource("SOURCE", from)

.addProcessor("PROCESS", new ProcessorSupplier<byte[], byte[]>() {

@Override

public Processor<byte[], byte[]> get() {

// 具体分析处理

return new LogProcessor();

}

}, "SOURCE")

.addSink("SINK", to, "PROCESS");

// 创建 kafka stream

KafkaStreams streams = new KafkaStreams(builder, config);

streams.start();

}

}

2)具体业务处理———————————————————————————

package cn.bw.kafka.stream;

import org.apache.kafka.streams.processor.Processor;

import org.apache.kafka.streams.processor.ProcessorContext;

public class LogProcessor implements Processor<byte[], byte[]> {

private ProcessorContext context;

@Override

public void init(ProcessorContext context) {

this.context = context;

}

@Override

public void process(byte[] key, byte[] value) {

String input = new String(value);

// 如果包含“>>>”则只保留该标记后面的内容

if (input.contains(">>>")) {

input = input.split(">>>")[1].trim();

// 输出到下一个 topic

context.forward("logProcessor".getBytes(), input.getBytes());

}else{

context.forward("logProcessor".getBytes(), input.getBytes());

}

}

@Override

public void punctuate(long timestamp) {

}

@Override

public void close() {

}

}

4) 将程序用 eclipse 插件打成 jar 包

5) 创建 first topic

./kafka-topics.sh --create --topic first --replication-factor 2 --partitions 3 --zookeeper node01:2181

6) 创建 second topic

./kafka-topics.sh --create --topic second --replication-factor 2 --partitions 3 --zookeeper

node01:2181

7) 验证 topic 是否创建完成

./kafka-topics.sh --list --zookeeper node01:2181

8)将 jar 包拷贝 node01 上运行———————————————————————————

[[email protected] kafka]$ java -jar kafka0508_fat.jar cn.bw.kafka.stream.Application

注:maven 插件如果没有指定运行主类使用下面的命令运行:

java -cp processor.jar com.bw.kafka.streams.LogFilter

5)在 node02 上启动生产者

[[email protected] kafka]$ bin/kafka-console-producer.sh --broker-list node02:9092 --topic

first

>hello>>>world

>h>>>hadoop

>hahaha

6)在 node03 上启动消费者

[[email protected] kafka]$ bin/kafka-console-consumer.sh --zookeeper node03:2181

--from-beginning --topic second

world

hadoop

hahaha

原文地址:https://www.cnblogs.com/JBLi/p/11555941.html

时间: 2024-08-11 01:34:52

五 、 Kafka producer 拦截器(interceptor) 和 六 、Kafka Streaming案例的相关文章

Kafka producer拦截器(interceptor)

Producer拦截器(interceptor)是个相当新的功能,它和consumer端interceptor是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑. 对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等.同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain).Intercetpor

大数据技术之_10_Kafka学习_Kafka概述+Kafka集群部署+Kafka工作流程分析+Kafka API实战+Kafka Producer拦截器+Kafka Streams

第1章 Kafka概述1.1 消息队列1.2 为什么需要消息队列1.3 什么是Kafka1.4 Kafka架构第2章 Kafka集群部署2.1 环境准备2.1.1 集群规划2.1.2 jar包下载2.2 Kafka集群部署2.3 Kafka命令行操作第3章 Kafka工作流程分析3.1 Kafka 生产过程分析3.1.1 写入方式3.1.2 分区(Partition)3.1.3 副本(Replication)3.1.4 写入流程3.2 Broker 保存消息3.2.1 存储方式3.2.2 存储策

Kafka Producer 拦截器

Kafka中的拦截器(Interceptor)是0.10.x.x版本引入的一个功能,一共有两种:Kafka Producer端的拦截器和Kafka Consumer端的拦截器.本篇主要讲述的是Kafka Producer端的拦截器,它主要用来对消息进行拦截或者修改,也可以用于Producer的Callback回调之前进行相应的预处理. 使用Kafka Producer端的拦截器非常简单,主要是实现ProducerInterceptor接口,此接口包含4个方法: ProducerRecord<K,

Flume-NG源码阅读之SourceRunner,及选择器selector和拦截器interceptor的执行

在AbstractConfigurationProvider类中loadSources方法会将所有的source进行封装成SourceRunner放到了Map<String, SourceRunner> sourceRunnerMap之中.相关代码如下: 1 Map<String, String> selectorConfig = context.getSubProperties( 2 BasicConfigurationConstants.CONFIG_SOURCE_CHANNE

详述 Spring MVC 框架中拦截器 Interceptor 的使用方法

1 前言 昨天新接了一个需要,"拦截 XXX,然后 OOO",好吧,说白了就是要用拦截器干点事(实现一个具体的功能).之前,也在网络上搜了很多关于Interceptor的文章,但感觉内容都大同小异,而且知识点零零散散,不太方便阅读.因此,正好借此机会,整理一篇关于拦截器的文章,在此分享给大家,以供大家参考阅读. 2 拦截器 2.1 概念 Java 里的拦截器是动态拦截 action 调用的对象.它提供了一种机制可以使开发者可以定义在一个 action 执行的前后执行的代码,也可以在一个

struts2拦截器interceptor的三种配置方法

struts2拦截器interceptor的三种配置方法方法1. 普通配置法 <struts>     <package name="struts2" extends="struts-default">         <interceptors>             <interceptor name="myInterceptor" class="edu.hust.interceptor.

Struts拦截器Interceptor

Struts2 拦截器 [Interceptor] 拦截器的工作原理如上图,每一个Action请求都包装在一系列的拦截器的内部.拦截器可以在Action执行直线做相似的操作也可以在Action执行直后做回收操作. 每一个Action既可以将操作转交给下面的拦截器,Action也可以直接退出操作返回客户既定的画面. 如何自定义一个拦截器? 自定义一个拦截器需要三步: 1 自定义一个实现Interceptor接口(或者继承自AbstractInterceptor)的类. 2 在strutx.xml中

Flume 拦截器(interceptor)详解

flume 拦截器(interceptor)1.flume拦截器介绍拦截器是简单的插件式组件,设置在source和channel之间.source接收到的事件event,在写入channel之前,拦截器都可以进行转换或者删除这些事件.每个拦截器只处理同一个source接收到的事件.可以自定义拦截器.2.flume内置的拦截器 2.1 时间戳拦截器flume中一个最经常使用的拦截器 ,该拦截器的作用是将时间戳插入到flume的事件报头中.如果不使用任何拦截器,flume接受到的只有message.

拦截器interceptor

拦截器interceptor 配置 <mvc:interceptors> <mvc:interceptor> <!--/**回处理/后面的所有请求--> <mvc:mapping path="/**"/> <bean class="cn.pinked.config.MyInterceptor"/> </mvc:interceptor> </mvc:interceptors> 拦截器