org.apache.flume.channel.ChannelProcessor 用于实际的Event到Channel的操作(在Source中用到),可以把它想象成channel的proxy,用于控制把Event put到哪些Channel中,以及怎么put(bacth或者单个),同时在put之前会使用 Interceptor对Event进行处理。
把Event put到哪些Channel中是由ChannelSelector 控制的,根据selector的设置,目前主要有两种:
REPLICATING->org.apache.flume.channel.ReplicatingChannelSelector, MULTIPLEXING->org.apache.flume.channel.MultiplexingChannelSelector;
REPLICATING 把Event发送到每一个对应的channel上,每个channel都有完整的一份。
MULTIPLEXING 把Event发送到设置的映射的channel上,类似于hash,每个channel包含一部分
org.apache.flume.channel.MultiplexingChannelSelector会根据header(默认为flume.selector.header),mapping,default,optional的设置获取channel。
这里看下org.apache.flume.channel.ReplicatingChannelSelector的实现,可以看出有两个channel列表,optional和require,分布对应getOptionalChannels和getRequiredChannels方法,如果设置了optional,optionalChannels为optional的设置,requiredChannels为getAllChannels的设置减去optionalChannels的设置
public void configure(Context context) { //通过configure配置requiredChannels String optionalList = context.getString(CONFIG_OPTIONAL); //根据optional的设置 requiredChannels = new ArrayList<Channel>(getAllChannels()); //初始时requiredChannels 即为getAllChannels Map<String, Channel> channelNameMap = getChannelNameMap(); if(optionalList != null && !optionalList.isEmpty()) { //如果optional的设置不为空 for(String optional : optionalList.split("\\s+")) { //对optional按空格进行split Channel optionalChannel = channelNameMap.get(optional); requiredChannels.remove(optionalChannel); //从requiredChannels 数组中去除optionalChannel if (!optionalChannels.contains(optionalChannel)) { optionalChannels.add(optionalChannel); //添加到optionalChannels } } } }
ChannelProcessor的初始调用在SourceRunner中,比如在org.apache.flume.source.EventDrivenSourceRunner的start方法:
public void start() { Source source = getSource(); //通过getSource获取Source对象 ChannelProcessor cp = source.getChannelProcessor(); //获取ChannelProcessor 对象 cp.initialize(); //调用ChannelProcessor.initialize方法 source.start(); //调用Source.start方法 lifecycleState = LifecycleState. START; }
而在org.apache.flume.source.ExecSource.ExecRunnable类中会调用其processEventBatch方法,进行批量插入数据
while ((line = reader.readLine()) != null) { counterGroup.incrementAndGet("exec.lines.read" ); eventList.add(EventBuilder. withBody(line.getBytes(charset))); if(eventList.size() >= bufferCount ) { channelProcessor.processEventBatch(eventList); eventList.clear(); } }
看下ChannelProcessor的具体实现:
首先两个重要的属性
private final ChannelSelector selector ; private final InterceptorChain interceptorChain ;
initialize方法调用InterceptorChain.initialize方法,初始化interceptorChain
public void initialize() { interceptorChain.initialize(); }
configure方法调用configureInterceptors方法,用于根据interceptors设置InterceptorChain
private void configureInterceptors(Context context) { List<Interceptor> interceptors = Lists.newLinkedList(); String interceptorListStr = context.getString( "interceptors", "" ); //获取interceptors的设置 if (interceptorListStr.isEmpty()) { return; } String[] interceptorNames = interceptorListStr.split( "\\s+"); //根据空格分隔 Context interceptorContexts = new Context(context.getSubProperties("interceptors." )); // run through and instantiate all the interceptors specified in the Context InterceptorBuilderFactory factory = new InterceptorBuilderFactory(); for (String interceptorName : interceptorNames) { Context interceptorContext = new Context( interceptorContexts.getSubProperties(interceptorName + ".")); String type = interceptorContext.getString( "type"); if (type == null) { LOG.error("Type not specified for interceptor " + interceptorName); throw new FlumeException("Interceptor.Type not specified for " + interceptorName); } try { Interceptor.Builder builder = factory.newInstance(type); //根据type的设置获取Interceptor builder.configure(interceptorContext); interceptors.add(builder.build()); ...... } interceptorChain.setInterceptors(interceptors); }
另外提供了两个插入数据的方法,processEventBatch和processEvent,processEventBatch用于插入一批Event(参数是List<Event> events),processEvent用于插入一个Event。
看下processEvent的实现:
public void processEvent(Event event) { event = interceptorChain.intercept(event); //调用InterceptorChain.intercept对Event进行处理 if (event == null) { return; } // Process required channels List<Channel> requiredChannels = selector.getRequiredChannels(event); // 根据ChannelSelector获取requiredChannels for (Channel reqChannel : requiredChannels) { // 对requiredChannels 中的每一个channel执行对应的put操作,每个操作都在一个事务内 Transaction tx = reqChannel.getTransaction(); Preconditions.checkNotNull(tx, "Transaction object must not be null"); try { tx.begin(); reqChannel.put(event); tx.commit(); } catch (Throwable t) { tx.rollback(); if (t instanceof Error) { LOG.error("Error while writing to required channel: " + reqChannel, t); throw (Error) t; } else { throw new ChannelException("Unable to put event on required " + "channel: " + reqChannel, t); } } finally { if (tx != null) { tx.close(); } } } // Process optional channels List<Channel> optionalChannels = selector.getOptionalChannels(event); //同样对optionalChannels做相同的操作 for (Channel optChannel : optionalChannels) { Transaction tx = null; try { tx = optChannel.getTransaction(); tx.begin(); optChannel.put(event); tx.commit(); } catch (Throwable t) { tx.rollback(); LOG.error("Unable to put event on optional channel: " + optChannel, t); if (t instanceof Error) { throw (Error) t; } } finally { if (tx != null) { tx.close(); } } } }