Flume源码-LoggerSink

package org.apache.flume.sink;

import com.google.common.base.Strings;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LoggerSink extends AbstractSink implements Configurable {

private static final Logger logger = LoggerFactory
      .getLogger(LoggerSink.class);

// Default Max bytes to dump
  public static final int DEFAULT_MAX_BYTE_DUMP = 16;

// Max number of bytes to be dumped
  private int maxBytesToLog = DEFAULT_MAX_BYTE_DUMP;

public static final String MAX_BYTES_DUMP_KEY = "maxBytesToLog";

@Override
  public void configure(Context context) {
    String strMaxBytes = context.getString(MAX_BYTES_DUMP_KEY);
    if (!Strings.isNullOrEmpty(strMaxBytes)) {
      try {
        maxBytesToLog = Integer.parseInt(strMaxBytes);
      } catch (NumberFormatException e) {
        logger.warn(String.format("Unable to convert %s to integer, using default value(%d) for maxByteToDump",
                strMaxBytes, DEFAULT_MAX_BYTE_DUMP));
        maxBytesToLog = DEFAULT_MAX_BYTE_DUMP;
      }
    }
  }

@Override
  public Status process() throws EventDeliveryException {
    Status result = Status.READY;
    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();
    Event event = null;

try {
      transaction.begin();
      event = channel.take();

if (event != null) {
        if (logger.isInfoEnabled()) {
          logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));
        }
      } else {
        // No event found, request back-off semantics from the sink runner
        result = Status.BACKOFF;
      }
      transaction.commit();
    } catch (Exception ex) {
      transaction.rollback();
      throw new EventDeliveryException("Failed to log event: " + event, ex);
    } finally {
      transaction.close();
    }

return result;
  }
}

时间: 2025-01-24 11:35:52

Flume源码-LoggerSink的相关文章

修改flume源码,使其HTTPSource具备访问路径功能

目前有一个需求,就是Flume可以作为一个类似于tomcat的服务器,可以通过post请求进行访问,并且路径需要:ip:port/contextPath格式. 经过一些资料获悉,httpSource只是httpSource的一个玩具工具,可以说毛坯版,目前仅仅支持的是按照ip:port访问,并不具备servlet这种功能. 那么打开源码看一下: 这上面便是httpsource源码了,可以看到主要是5个类:HTTPBadRequestException,HTTPSource,HTTPSourceC

Flume 源码阅读

Flume架构 主要由3个组件,分别是Source,Channel和Sink,3个组件组成Event在Flume中得数据流向或者说流水线,功能可以由Flume的介绍看出:When a Flume source receives an event, it stores it into one or more channels. The channel is a passive store that keeps the event until it’s consumed by a Flume sin

<Flume><Source Code><Flume源码阅读笔记>

Overview source采集的日志首先会传入ChannelProcessor, 在其内首先会通过Interceptors进行过滤加工,然后通过ChannelSelector选择channel. Source和Sink之间是异步的,sink只需要监听自己关系的Channel的变化即可. sink存在写失败的情况,flume提供了如下策略: 默认是一个sink,若写入失败,则该事务失败,稍后重试. 故障转移策略:给多个sink定义优先级,失败时会路由到下一个优先级的sink.sink只要抛出一

第88课:Spark Streaming从Flume Pull数据案例实战及内幕源码解密

本节课分成二部分讲解: 一.Spark Streaming on Pulling from Flume实战 二.Spark Streaming on Pulling from Flume源码解析 先简单介绍下Flume的两种模式:推模式(Flume push to Spark Streaming)和 拉模式(Spark Streaming pull from Flume ) 采用推模式:推模式的理解就是Flume作为缓存,存有数据.监听对应端口,如果服务可以连接,就将数据push过去.(简单,耦

第88课:Spark Streaming从Flume Poll数据案例实战和内幕源码解密

本节课分成二部分讲解: 一.Spark Streaming on Polling from Flume实战 二.Spark Streaming on Polling from Flume源码 第一部分: 推模式(Flume push SparkStreaming) VS 拉模式(SparkStreaming poll Flume) 采用推模式:推模式的理解就是Flume作为缓存,存有数据.监听对应端口,如果服务可以链接,就将数据push过去.(简单,耦合要低),缺点是SparkStreaming

flume修改源码实现source文件名前后缀的更改

业务场景: 需求:通过flume进行数据采集,将本地(windows服务器)不断产生的csv文件采集到hdfs上. 问题:本地文件在生成的过程中,会出现文件名重复的现象.也就是说,在前一秒生成文件名为aaa.csv,该文件经过flume进行处理之后会进行文件名的更改,默认情况下文件名会更改为aaa.csv.COMPLATED,但是在第二秒的时候,接着又生成了aaa.csv文件,此时flume将该文件处理完进行更名的过程中,就会报错,例如: 解决:为了避免文件名重复导致flume程序挂的问题,此时

Flume NG源码分析(五)使用ThriftSource通过RPC方式收集日志

上一篇说了利用ExecSource从本地日志文件异步的收集日志,这篇说说采用RPC方式同步收集日志的方式.笔者对Thrift比较熟悉,所以用ThriftSource来介绍RPC的日志收集方式. 整体的结构图如下: 1. ThriftSource包含了一个Thrift Server,以及一个Thrift Service服务的实现.这里的Thrift Service是由ThriftSourceProtocol定义 2. 应用程序调用Thrift Service的客户端,以RPC的方式将日志发送到Th

【Java】【Flume】Flume-NG源码阅读之AvroSink

org.apache.flume.sink.AvroSink是用来通过网络来传输数据的,可以将event发送到RPC服务器(比如AvroSource),使用AvroSink和AvroSource可以组成分层结构.它继承自AbstractRpcSink  extends AbstractSink implements Configurable这跟其他的sink一样都得extends AbstractSink implements Configurable,所以重点也在confgure.start.

Flume 实战(2)--Flume-ng-sdk源码分析

具体参考: 官方用户手册和开发指南 http://flume.apache.org/FlumeDeveloperGuide.html *) 定位和简单例子 1). Flume-ng-sdk是用于编写往flume agent发送数据的client sdk2). 简单示例 RpcClient client = null; try { client = RpcClientFactory.getDefaultInstance("127.0.0.1", 41414); Event event =