Apache Camel分解与聚合

在使用Camel时,你可能会使用到分解与聚合,例如当你向消息队列发送一个很大的文件的时候,你可能出于文件大小限制或效率的考量,需要将一个文件分解为若干文件包分别发送,在接收到接收到所有文件包后再合并为一个完整的文件。

分解即将一个消息分解为若干份(消息),然后可以对其进行单独处理,如下图:

要实现分解功能,则需要在路由定义中添加SplitDefinition,也就是要调用ProcessorDefinition.split方法,split方法主要是接收一个Expression对象,org.apache.camel.Expression是一个接口,其中只有一个evaluate方法:

package org.apache.camel;

public interface Expression {

    <T> T evaluate(Exchange exchange, Class<T> type);
}

在调用split方法时,evaluate方法需要返回一个的对象类型有一定的规则(要求),具体的规则是什么,看下面的源码则一目录了然:

public static Iterator<Object> createIterator(Object value, String delimiter, final boolean allowEmptyValues) {

	// if its a message than we want to iterate its body
	if (value instanceof Message) {
		value = ((Message) value).getBody();
	}

	if (value == null) {
		return Collections.emptyList().iterator();
	} else if (value instanceof Iterator) {
		return (Iterator<Object>)value;
	} else if (value instanceof Iterable) {
		return ((Iterable<Object>)value).iterator();
	} else if (value.getClass().isArray()) {
		if (isPrimitiveArrayType(value.getClass())) {
			final Object array = value;
			return new Iterator<Object>() {
				int idx = -1;

				public boolean hasNext() {
					return (idx + 1) < Array.getLength(array);
				}

				public Object next() {
					idx++;
					return Array.get(array, idx);
				}

				public void remove() {
					throw new UnsupportedOperationException();
				}

			};
		} else {
			List<Object> list = Arrays.asList((Object[]) value);
			return list.iterator();
		}
	} else if (value instanceof NodeList) {
		// lets iterate through DOM results after performing XPaths
		final NodeList nodeList = (NodeList) value;
		return new Iterator<Object>() {
			int idx = -1;

			public boolean hasNext() {
				return (idx + 1) < nodeList.getLength();
			}

			public Object next() {
				idx++;
				return nodeList.item(idx);
			}

			public void remove() {
				throw new UnsupportedOperationException();
			}
		};
	} else if (value instanceof String) {
		final String s = (String) value;

		// this code is optimized to only use a Scanner if needed, eg there is a delimiter

		if (delimiter != null && s.contains(delimiter)) {
			// use a scanner if it contains the delimiter
			Scanner scanner = new Scanner((String)value);

			if (DEFAULT_DELIMITER.equals(delimiter)) {
				// we use the default delimiter which is a comma, then cater for bean expressions with OGNL
				// which may have balanced parentheses pairs as well.
				// if the value contains parentheses we need to balance those, to avoid iterating
				// in the middle of parentheses pair, so use this regular expression (a bit hard to read)
				// the regexp will split by comma, but honor parentheses pair that may include commas
				// as well, eg if value = "bean=foo?method=killer(a,b),bean=bar?method=great(a,b)"
				// then the regexp will split that into two:
				// -> bean=foo?method=killer(a,b)
				// -> bean=bar?method=great(a,b)
				// http://stackoverflow.com/questions/1516090/splitting-a-title-into-separate-parts
				delimiter = ",(?!(?:[^\\(,]|[^\\)],[^\\)])+\\))";
			}

			scanner.useDelimiter(delimiter);
			return CastUtils.cast(scanner);
		} else {
			// use a plain iterator that returns the value as is as there are only a single value
			return new Iterator<Object>() {
				int idx = -1;

				public boolean hasNext() {
					return idx + 1 == 0 && (allowEmptyValues || ObjectHelper.isNotEmpty(s));
				}

				public Object next() {
					idx++;
					return s;
				}

				public void remove() {
					throw new UnsupportedOperationException();
				}
			};
		}
	} else {
		return Collections.singletonList(value).iterator();
	}
}

该方法定义在org.apache.camel.util.ObjectHelper类,由上可知允许的类型有很多,但最终于转换为了一个java.uti.Iterator对象,这样Camel就能通过该迭代器遍历出各个元素对象,然后针对每一个元素对象创建一个Exchange对象,再把元素对象设置到Message的body中,这样就把一个消息分解为了多份。

聚合,刚好是分解的逆过程,即将根据路由定义路由的多个消息合并为一个消息,如下图:

聚合主要要解决的问题是如何确定哪些消息是要进行聚合的,聚合的过程是怎样的。要实现聚合功能,则需要向路由定义中添加AggregateDefinition,调用ProcessorDefinition.aggregate方法,该方法主要是要提供一个Expression与AggregationStrategy对象,前者用于确定哪些消息需要被聚合,后者用于确定具体的聚合过程如何进行。

下面提供一个分解与聚合具体的例子,实现一个示例功能,将一个文件根据每一行进行分解,被分解后再进行聚合:

package com.xtayfjpk.camel;

import java.io.File;
import java.io.FileNotFoundException;
import java.util.Scanner;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.support.ExpressionAdapter;

public class Test {

	/**
	 * @param args
	 */
	public static void main(String[] args) throws Exception {
		CamelContext camelContext = new DefaultCamelContext();
		camelContext.addRoutes(new RouteBuilder() {

			@Override
			public void configure() throws Exception {
				//轮询指定目录
				this.from("file:H:/temp/in?noop=true")
					//添加SplitDefinition,传入一个自定义的Expression对象
					.split(new ExpressionAdapter() {

					@Override
					public Object evaluate(Exchange exchange) {
						//返回一个实现了Iterator接口的类,每一个迭代出来的元素对象创建一个Exchange
						File file = exchange.getIn().getBody(File.class);
						System.out.println(file);
						Scanner scanner = null;
						if(file!=null) {
							try {
								scanner = new Scanner(file);
								//根据行进行分解
								scanner.useDelimiter("\n");
							} catch (FileNotFoundException e) {
								e.printStackTrace();
							}
						}
						return scanner;
					}
				}).process(new Processor() {
					private int count = 0;

					public void process(Exchange exchange) throws Exception {
						//往消息中设置关联key的值,在聚合的时候将会使用到
						//如果消息的关联key值是相同的则表示需要进行聚合
						exchange.getIn().setHeader("test_correlation_key", (++count)%2);
						System.out.println("body:" + exchange.getIn().getBody());
					}
				}).aggregate(header("test_correlation_key"), new AggregationStrategy() {

					public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
						//如果oldExchange为null,则说明是第一个分解包
						if (oldExchange == null) {
				            return newExchange;
				        }

				        String oldBody = oldExchange.getIn().getBody(String.class);
				        System.out.println("old body:" + oldBody);
				        String newBody = newExchange.getIn().getBody(String.class);
				        System.out.println("new body:" + newBody);
						//将新与旧包进行合并,再设置进Message的body中
				        oldExchange.getIn().setBody(oldBody + "\n" + newBody);
				        return oldExchange;
					}
				}).completionTimeout(5000).process(new Processor() {

					public void process(Exchange exchange) throws Exception {
						//示例后续处理,进行输出
						System.out.println("body:" + exchange.getIn().getBody());

					}
				});

			}
		});

		camelContext.start();

		Object object = new Object();
		synchronized (object) {
			object.wait();
		}
	}
}

在分解与聚合过程中,与此相关的一些数据会做为Exchange的属性进行设置,如分解序号,分解是否完成,聚合序列等,详细内容可参看Camel官方文档:

http://camel.apache.org/splitter.html

http://camel.apache.org/aggregator.html

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-11-08 00:48:33

Apache Camel分解与聚合的相关文章

[每日一学]apache camel简介

apache camel 是轻量级esb框架.如下是它的架构图: 它有几个比较重要的概念就是: endpoint,所谓的endpoint,就是一种可以接收或发送数据的组件.可以支持多种协议,如jms,http,file等. 另一个重要的概念就是processor,它是用来处理具体业务逻辑的组件. 还有一个是:route,用来路由,指示数据从哪里来到哪里去,中间用哪个processor处理. 而processor之间用exchange对象来传送数据,有点像jms,通俗一点就像上学时传的小纸条,记住

架构设计:系统间通信(36)——Apache Camel快速入门(上)

1.本专题主旨 1-1.关于技术组件 在这个专题中,我们介绍了相当数量技术组件:Flume.Kafka.ActiveMQ.Rabbitmq.Zookeeper.Thrift .Netty.DUBBO等等,还包括本文要进行介绍的Apache Camel.有的技术组件讲得比较深入,有的技术组件则是点到为止.于是一些读者朋友发来信息向我提到,这个专题的文章感觉就像一个技术名词的大杂烩,并不清楚作者的想要通过这个专题表达什么思想. 提出这个质疑的朋友不在少数,所以我觉得有必要进行一个统一的说明.这个专题

架构设计:系统间通信(38)——Apache Camel快速入门(下1)

======================= (接上文<架构设计:系统间通信(37)--Apache Camel快速入门(中)>) 3-5-2-3循环动态路由 Dynamic Router 动态循环路由的特点是开发人员可以通过条件表达式等方式,动态决定下一个路由位置.在下一路由位置处理完成后Exchange将被重新返回到路由判断点,并由动态循环路由再次做出新路径的判断.如此循环执行直到动态循环路由不能再找到任何一条新的路由路径为止.下图来源于官网(http://camel.apache.or

Apache Camel 与 Spring Boot 集成,通过FTP定时采集、处理文件

1.概要: 本项目主要是通过在Spring平台上配置Camel.FTP,实现定时从FTP服务器下载文件到本地.解析文件.存入数据库等功能. 2.搭建空项目: Spring Boot有几种自动生成空项目的机制:CLI.Spring tool suite.网站Spring Initializr,我们选择第三个. 访问网站http://start.spring.io/,如下图 在dependencies添加依赖包的时候,在框中输入camle.jdbc.mysql会自动弹出提示,确认即为选中,如下图:

Apache Camel

Apache Camel 1 import org.apache.camel.CamelContext; 2 import org.apache.camel.builder.RouteBuilder; 3 import org.apache.camel.impl.DefaultCamelContext; 4 5 6 public class FileMoveWithCamel { 7 public static void main(String args[]) throws Exception

架构设计:系统间通信(37)——Apache Camel快速入门(中)

========================== (接上文<架构设计:系统间通信(36)--Apache Camel快速入门(上)>) (补上文:Endpoint重要的漏讲内容) 3-1-2.特殊的Endpoint Direct Endpoint Direct用于在两个编排好的路由间实现Exchange消息的连接,上一个路由中由最后一个元素处理完的Exchange对象,将被发送至由Direct连接的下一个路由起始位置(http://camel.apache.org/direct.html)

Apache Camel系列(1)----使用场景

Apache Camel是一个基于Enterprise Integration Pattern(企业整合模式,简称EIP)的开源框架.EIP定义了一些不同应用系统之间的消息传输模型,包括常见的Point-to-Point,Pub/Sub模型.更多关于EIP的信息,可以参见这里 Apache Camel主要提供了以下功能: 1,实现了EIP的大部分模式,如果你要在不同的应用系统之间以不同的方式传递消息,那么你可以从Apache Camel中找到解决反感. 2,提供了大量Component(组件),

Apache Camel系列(3)----Redis组件

Redis组件允许你从Redis接收消息,以及将消息发送给Redis.RedisProducer的功能很强大,几乎能执行所有的Redis Command,这些Command都是在Message的header中进行设置的.遗憾的是RedisConsumer仅仅支持pub/sub模式,不支持Point2Point,这意味这在Camel中,通过阻塞的方式消费Lists中的消息是不可行的.我反馈了这个问题到Apache Camel Mail List,希望以后的版本支持P2P更能.下面演示如何使用cam

Apache Camel框架入门示例

Apache Camel是Apache基金会下的一个开源项目,它是一个基于规则路由和中介引擎,提供企业集成模式的Java对象的实现,通过应用程序接口(或称为陈述式的Java领域特定语言(DSL))来配置路由和中介的规则.领域特定语言意味着Apache Camel支持你在的集成开发工具中使用平常的,类型安全的,可自动补全的Java代码来编写路由规则,而不需要大量的XML配置文件.同时,也支持在Spring中使用XML配置定义路由和中介规则. Camel提供的基于规则的路由(Routing)引擎,可