flume自定义反序列化器deserializer

需求背景:

  在利用flume进行日志收集的时候,错误信息会将堆栈多行打印,需要将多行信息合并成一行,包装成一个event进行传输。

解决思路: 

  解决上述需求可以通过自定义拦截器和自定义反序列化器来实现。网上关于自定义拦截器的资料比较多,但考虑到拦截器的定位和使用场景,拦截器不应用于多个event拆分组合,并若flume有并发处理的话,不能保证读取event是顺序的。查阅资料发现,通过自定义flume的反序列化器更加合理和安全。

实现步骤:

  1:新建一个类,实现 EventDeserializer 接口

  2: 重写 readEvent()方法或readEvents方法

  3: 修改flume的配置文件,将sources.deserializer属性设置为自定义类

源码:

  1:自定义反序列化器 ---> MyLineDeserializer

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.xxx.flume.serializer;

import com.google.common.collect.Lists;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.serialization.EventDeserializer;
import org.apache.flume.serialization.ResettableInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;

/**
 * A deserializer that parses text lines from a file.
 */
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class MyLineDeserializer implements EventDeserializer {

    private static final Logger logger = LoggerFactory.getLogger
            (MyLineDeserializer.class);

    private final ResettableInputStream in;
    private final Charset outputCharset;
    private final int maxLineLength;
    private volatile boolean isOpen;

    public static final String OUT_CHARSET_KEY = "outputCharset";
    public static final String CHARSET_DFLT = "UTF-8";

    public static final String MAXLINE_KEY = "maxLineLength";
    public static final int MAXLINE_DFLT = 2048;
    private StringBuffer eventStringBuffer = new StringBuffer();

    MyLineDeserializer(Context context, ResettableInputStream in) {
        this.in = in;
        this.outputCharset = Charset.forName(
                context.getString(OUT_CHARSET_KEY, CHARSET_DFLT));
        this.maxLineLength = context.getInteger(MAXLINE_KEY, MAXLINE_DFLT);
        this.isOpen = true;
    }

    /**
     * Reads a line from a file and returns an event
     *
     * @return Event containing parsed line
     * @throws IOException
     */
    @Override
    public Event readEvent() throws IOException {
        ensureOpen();
        String line = readLine();
        Event event = null;

        while (line != null) {
            //  start with 20 is one timestamp , event end
            if (line.trim().startsWith("20")) {
                event = EventBuilder.withBody(eventStringBuffer.toString(), outputCharset);
                eventStringBuffer.delete(0, eventStringBuffer.length());
            }
            //  add current line push to buffer
            if (line.trim().length() > 0) {
                if (eventStringBuffer.length() > 0) {
                    eventStringBuffer.append(System.lineSeparator()).append(line);
                } else {
                    eventStringBuffer.append(line);
                }
            }
            if (line.trim().startsWith("20")) {
                break;
            }
            line = readLine();
        }

        if (line == null && eventStringBuffer.toString().length() > 0 ){
            event =  EventBuilder.withBody(eventStringBuffer.toString(), outputCharset);
            eventStringBuffer.delete(0, eventStringBuffer.length());
            return event;
        }
        return event;
    }

    /**
     * Batch line read
     *
     * @param numEvents Maximum number of events to return.
     * @return List of events containing read lines
     * @throws IOException
     */
    @Override
    public List<Event> readEvents(int numEvents) throws IOException {
        ensureOpen();
        List<Event> events = Lists.newLinkedList();
        for (int i = 0; i < numEvents; i++) {
            Event event = readEvent();
            if (event != null) {
                events.add(event);
            } else {
                break;
            }
        }
        return events;
    }

    @Override
    public void mark() throws IOException {
        ensureOpen();
        in.mark();
    }

    @Override
    public void reset() throws IOException {
        ensureOpen();
        in.reset();
    }

    @Override
    public void close() throws IOException {
        if (isOpen) {
            reset();
            in.close();
            isOpen = false;
        }
    }

    private void ensureOpen() {
        if (!isOpen) {
            throw new IllegalStateException("Serializer has been closed");
        }
    }

    // TODO: consider not returning a final character that is a high surrogate
    // when truncating
    private String readLine() throws IOException {
        StringBuilder sb = new StringBuilder();
        int c;
        int readChars = 0;
        while ((c = in.readChar()) != -1) {
            readChars++;

            // FIXME: support \r\n
            if (c == ‘\n‘) {
                break;
            }

            sb.append((char) c);

            if (readChars >= maxLineLength) {
                logger.warn("Line length exceeds max ({}), truncating line!",
                        maxLineLength);
                break;
            }
        }

        if (readChars > 0) {
            return sb.toString();
        } else {
            return null;
        }
    }

    public static class Builder implements EventDeserializer.Builder {

        @Override
        public MyLineDeserializer build(Context context, ResettableInputStream in) {
            return new MyLineDeserializer(context, in);
        }

    }

}

  2: flume 配置文件

a1.sources.r1.deserializer =  com.xxx.flume.serializer.MyLineDeserializer$Builder

原文地址:https://www.cnblogs.com/yuwenhui/p/9367625.html

时间: 2024-11-26 04:23:12

flume自定义反序列化器deserializer的相关文章

Ember.js 入门指南——自定义序列号器

在Ember应用中,序列化器会格式化与后台交互的数据,包括发送和接收的数据.默认情况下会使用JSON API序列化数据.如果你的后端使用不同的格式,Ember Data允许你自定义序列化器或者定义一个完全不同的序列化器. Ember Data内置了三个序列化器.JSONAPISerializer是默认的序列化器,用与处理后端的JSON API.JSONSerializer是一个简单的序列化器,用与处理单个JSON对象或者是处理记录数组.RESTSerializer是一个复杂的序列化器,支持侧面加

struts2学习笔记---自定义拦截器

什么是拦截器? struts2中拦截器分为Struts2定义好的拦截器和自定义的拦截器.其作用是在一个Action执行之前进行拦截,在Action执行之后又加入某些操作. 实现原理 当请求一个Action时,struts2会查找配置文件,并根据这个Action的配置实例化对应的拦截器对象,然后串成一个列表(list),最后一个一个地调用列表中的拦截器. 拦截器的执行流程 1.对Action进行预处理.(正序执行) 2.拦截器自身决定该不该执行后续的拦截器(由invoke()方法的返回值决定).

自定义类加载器——加载任意指定目录的class文件

public class MyClassLoader extends ClassLoader{ String path;//自定义类加载器所负责的文件夹 public MyClassLoader(String path) { super(); this.path = path; } @SuppressWarnings("deprecation") @Override protected Class<?> findClass(String name) throws Class

struts2基础----&gt;自定义拦截器

这一章,我们开始struts2中拦截器的学习.内容较浅,慎看. 自定义拦截器 一.增加一个自定义的拦截器为类 package com.huhx.interceptor; import com.opensymphony.xwork2.ActionInvocation; import com.opensymphony.xwork2.interceptor.AbstractInterceptor; public class RegisterInterceptor extends AbstractInt

struts2 文件的上传下载 表单的重复提交 自定义拦截器

文件上传中表单的准备 要想使用 HTML 表单上传一个或多个文件 须把 HTML 表单的 enctype 属性设置为 multipart/form-data 须把 HTML 表单的method 属性设置为 post 需添加 <input type=“file”> 字段. Struts 对文件上传的支持 在 Struts 应用程序里, FileUpload 拦截器和 Jakarta Commons FileUpload 组件可以完成文件的上传. 步骤:1. 在 Jsp 页面的文件上传表单里使用

springMVC --拦截器流程详细,使用和自定义拦截器

先看看拦截器都做些什么: 1.日志记录:记录请求信息的日志,以便进行信息监控.信息统计.计算PV(PageView)等. 2.权限检查:如登录检测,进入处理器检测检测是否登录,如果没有直接返回到登录页面: 3.性能监控:有时候系统在某段时间莫名其妙的慢,可以通过拦截器在进入处理器之前记录开始时间,在处理完后记录结束时间,从而得到该请求的处理时间(如果有反向代理,如apache可以自动记录): 4.通用行为:读取cookie得到用户信息并将用户对象放入请求,从而方便后续流程使用,还有如提取Loca

12.Struts2自定义拦截器

12.自定义拦截器 拦截器是Struts2的一个重要特性.因为Struts2的大多数核心功能都是通过拦截器实现的. 拦截器之所以称之为“拦截器”,是因为它可以拦截Action方法的执行, 即在Acton方法执行之前或之后执行,以加强Action方法的功能. 例如,一般情况下,用户在打开某个页面之前,需要先登录,否则是无法对资源进行访问的.这就是权限拦截器. 1.定义拦截器类 自定义的拦截器类需要实现拦截器接口com.opensymphony.xwork2.interceptor.Intercep

Struts2自定义拦截器

自定义拦截器 1). 具体步骤 I. 定义一个拦截器的类 > 可以实现 Interceptor 接口 > 继承 AbstractInterceptor 抽象类 II然后在拦截器类的interceptor()方法中定义这个拦截器的功能 III. 在 struts.xml 文件配置. 1注册拦截器 <interceptors> <interceptor name="hello" class="com.atguigu.struts2.intercept

JAVAEE——struts2_04:自定义拦截器、struts2标签、登陆功能和校验登陆拦截器的实现

一.自定义拦截器 1.架构 2.拦截器创建 //拦截器:第一种创建方式 //拦截器生命周期:随项目的启动而创建,随项目关闭而销毁 public class MyInterceptor implements Interceptor{} //创建方式2: 继承AbstractInterceptor -> struts2的体贴 //帮我们空实现了init 和 destory方法. 我们如果不需要实现这两个方法,就可以只实现intercept方法 public class MyInterceptor2