自定义kafka Sink

package my.bigdata;

/**
 * Created by lq on 2017/8/22.
 */

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;

import my.utils.PropertiesUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaSink2 extends AbstractSink implements Configurable {
    private static String TOPIC = null;
    private Producer<String, String> producer;
    private static Properties properties = null;

    static {
        final String topicCfg ="topic.cfg";
        final String myKafkaSinkCfg ="myKafkaSink.cfg";
        TOPIC = (String) PropertiesUtils.getPropertiesFromClass(KafkaSink2.class,topicCfg).get("topic");
        properties = PropertiesUtils.getPropertiesFromClass(KafkaSink2.class,myKafkaSinkCfg);
    }

    public Status process() throws EventDeliveryException {
        // TODO Auto-generated method stub
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();

        try {
            transaction.begin();
            Event event = channel.take();
            if (event == null) {
                transaction.rollback();
                return Status.BACKOFF;
            }

            Map<String, String> headers = event.getHeaders();
            String logtype = headers.get("logtype");
            //随机
            String random = System.currentTimeMillis() + "";//随机数,key,避免写热点问题
            String kafkaKey = random + "_" + logtype;
            // public ProducerRecord(String topic, K key, V value)
            ProducerRecord<String, String> data = new ProducerRecord<String, String>(
                    TOPIC, kafkaKey, new String(event.getBody()));
            producer.send(data);
            transaction.commit();
            return Status.READY;
        } catch (Exception e) {
            transaction.rollback();
            return Status.BACKOFF;

        } finally {
            transaction.close();
        }
    }

    public void configure(Context arg0) {
        producer = new KafkaProducer<>(properties);
    }
}
package my.bigdata;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * Created by lq on 2017/8/22.
 */
public class kafkaSinkPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        int parNums = cluster.partitionsForTopic(topic).size();
        try {
            String randomInKey = ((String) key).split("_")[0];
            return (int) Math.abs(Long.parseLong(randomInKey) % parNums);
        } catch (Exception e) {
            return Math.abs(key.hashCode() % parNums);
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }

}
时间: 2025-01-06 16:58:45

自定义kafka Sink的相关文章

自定义Flume Sink:ElasticSearch Sink

Flume Sink的目的是从Flume Channel中获取数据然后输出到存储或者其他Flume Source中.Flume Agent启动的时候,它会为每一个Sink都启动一个SinkRunner的对象,SinkRunner.start()方法会启动一个新的线程去管理每一个Sink的生命周期.每一个Sink需要实现start().Stop()和process()方法.你可以在start方法中去初始化Sink的参数和状态,在stop方法中清理Sink的资源.最关键的是process方法,它将处

Flume简介与使用(三)——Kafka Sink消费数据之Kafka安装

前面已经介绍了如何利用Thrift Source生产数据,今天介绍如何用Kafka Sink消费数据. 其实之前已经在Flume配置文件里设置了用Kafka Sink消费数据 agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink agent1.sinks.kafkaSink.topic = TRAFFIC_LOG agent1.sinks.kafkaSink.brokerList = 10.208.129.3:90

flume 自定义 hbase sink 类

参考(向原作者致敬) http://ydt619.blog.51cto.com/316163/1230586 https://blogs.apache.org/flume/entry/streaming_data_into_apache_hbase flume 1.5 的配置文件示例 #Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the

kafka sink partition

看如下代码段,kafka sink的key完全取决于上游发来的event中的header.所以,如果前面的source是像exec这样的source,由于其中的header为null,所以到了这就发到kafka中就是没key的.没key的情况下,kafka就做不了到不同broker的partition操作.

flume-ng自定义插件sink

sink写入到rabbitmq的实例 package org.apache.flume; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client

基于RobotFramework——自定义kafka库并导入使用

[Kafka] 首先介绍一下我了解的kafka的皮毛信息—— kafka——一个分布流处理系统:流处理:可以像消息队列一样publish或者subscribe信息:分布式:提供了容错性,并发处理消息的机制 集群——kafka运行在集群上,集群包含一个或多个服务器.所谓服务器集群,就是将很多服务器集中在一起进行同一种服务,在客户端看起来像是只有一个服务器.集群可以利用多个计算机进行并行计算从而有很高的计算速度,也可以使用多个计算机做备份,从而使得一个机器坏了,整个系统还能正常运行 Broker——

flume的自定义sink-Kafka

1.创建一个agent,sink类型需指定为自定义sink        vi /usr/local/flume/conf/agent3.conf        agent3.sources=as1        agent3.channels=c1        agent3.sinks=s1 agent3.sources.as1.type=avro        agent3.sources.as1.bind=0.0.0.0        agent3.sources.as1.port=41

#研发解决方案介绍#Recsys-Evaluate(推荐评测)

郑昀 基于刘金鑫文档 最后更新于2014/12/1 关键词:recsys.推荐评测.Evaluation of Recommender System.piwik.flume.kafka.storm.redis.mysql 本文档适用人员:研发 推荐系统可不仅仅是围着推荐算法打转 先明确一下,我们属于工业领域.很多在学术论文里行之有效的新特奇算法,在工业界是行不通的.当年我们做语义聚合时,分词.聚类.相似性计算.实体词识别.情感分析等领域最终还都采用了工业界十几年前乃至于几十年前就流行的成熟算法.

【Flume】自定义sink kafka,并编译打包jar,unapproval license的问题解决

如图所示,新建一个JAVA工程,编辑pom文件,pom文件内容如下[这里取出了parent]: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLoca