在使用Camel时,你可能会使用到分解与聚合,例如当你向消息队列发送一个很大的文件的时候,你可能出于文件大小限制或效率的考量,需要将一个文件分解为若干文件包分别发送,在接收到接收到所有文件包后再合并为一个完整的文件。
分解即将一个消息分解为若干份(消息),然后可以对其进行单独处理,如下图:
要实现分解功能,则需要在路由定义中添加SplitDefinition,也就是要调用ProcessorDefinition.split方法,split方法主要是接收一个Expression对象,org.apache.camel.Expression是一个接口,其中只有一个evaluate方法:
package org.apache.camel; public interface Expression { <T> T evaluate(Exchange exchange, Class<T> type); }
在调用split方法时,evaluate方法需要返回一个的对象类型有一定的规则(要求),具体的规则是什么,看下面的源码则一目录了然:
public static Iterator<Object> createIterator(Object value, String delimiter, final boolean allowEmptyValues) { // if its a message than we want to iterate its body if (value instanceof Message) { value = ((Message) value).getBody(); } if (value == null) { return Collections.emptyList().iterator(); } else if (value instanceof Iterator) { return (Iterator<Object>)value; } else if (value instanceof Iterable) { return ((Iterable<Object>)value).iterator(); } else if (value.getClass().isArray()) { if (isPrimitiveArrayType(value.getClass())) { final Object array = value; return new Iterator<Object>() { int idx = -1; public boolean hasNext() { return (idx + 1) < Array.getLength(array); } public Object next() { idx++; return Array.get(array, idx); } public void remove() { throw new UnsupportedOperationException(); } }; } else { List<Object> list = Arrays.asList((Object[]) value); return list.iterator(); } } else if (value instanceof NodeList) { // lets iterate through DOM results after performing XPaths final NodeList nodeList = (NodeList) value; return new Iterator<Object>() { int idx = -1; public boolean hasNext() { return (idx + 1) < nodeList.getLength(); } public Object next() { idx++; return nodeList.item(idx); } public void remove() { throw new UnsupportedOperationException(); } }; } else if (value instanceof String) { final String s = (String) value; // this code is optimized to only use a Scanner if needed, eg there is a delimiter if (delimiter != null && s.contains(delimiter)) { // use a scanner if it contains the delimiter Scanner scanner = new Scanner((String)value); if (DEFAULT_DELIMITER.equals(delimiter)) { // we use the default delimiter which is a comma, then cater for bean expressions with OGNL // which may have balanced parentheses pairs as well. // if the value contains parentheses we need to balance those, to avoid iterating // in the middle of parentheses pair, so use this regular expression (a bit hard to read) // the regexp will split by comma, but honor parentheses pair that may include commas // as well, eg if value = "bean=foo?method=killer(a,b),bean=bar?method=great(a,b)" // then the regexp will split that into two: // -> bean=foo?method=killer(a,b) // -> bean=bar?method=great(a,b) // http://stackoverflow.com/questions/1516090/splitting-a-title-into-separate-parts delimiter = ",(?!(?:[^\\(,]|[^\\)],[^\\)])+\\))"; } scanner.useDelimiter(delimiter); return CastUtils.cast(scanner); } else { // use a plain iterator that returns the value as is as there are only a single value return new Iterator<Object>() { int idx = -1; public boolean hasNext() { return idx + 1 == 0 && (allowEmptyValues || ObjectHelper.isNotEmpty(s)); } public Object next() { idx++; return s; } public void remove() { throw new UnsupportedOperationException(); } }; } } else { return Collections.singletonList(value).iterator(); } }
该方法定义在org.apache.camel.util.ObjectHelper类,由上可知允许的类型有很多,但最终于转换为了一个java.uti.Iterator对象,这样Camel就能通过该迭代器遍历出各个元素对象,然后针对每一个元素对象创建一个Exchange对象,再把元素对象设置到Message的body中,这样就把一个消息分解为了多份。
聚合,刚好是分解的逆过程,即将根据路由定义路由的多个消息合并为一个消息,如下图:
聚合主要要解决的问题是如何确定哪些消息是要进行聚合的,聚合的过程是怎样的。要实现聚合功能,则需要向路由定义中添加AggregateDefinition,调用ProcessorDefinition.aggregate方法,该方法主要是要提供一个Expression与AggregationStrategy对象,前者用于确定哪些消息需要被聚合,后者用于确定具体的聚合过程如何进行。
下面提供一个分解与聚合具体的例子,实现一个示例功能,将一个文件根据每一行进行分解,被分解后再进行聚合:
package com.xtayfjpk.camel; import java.io.File; import java.io.FileNotFoundException; import java.util.Scanner; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.support.ExpressionAdapter; public class Test { /** * @param args */ public static void main(String[] args) throws Exception { CamelContext camelContext = new DefaultCamelContext(); camelContext.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { //轮询指定目录 this.from("file:H:/temp/in?noop=true") //添加SplitDefinition,传入一个自定义的Expression对象 .split(new ExpressionAdapter() { @Override public Object evaluate(Exchange exchange) { //返回一个实现了Iterator接口的类,每一个迭代出来的元素对象创建一个Exchange File file = exchange.getIn().getBody(File.class); System.out.println(file); Scanner scanner = null; if(file!=null) { try { scanner = new Scanner(file); //根据行进行分解 scanner.useDelimiter("\n"); } catch (FileNotFoundException e) { e.printStackTrace(); } } return scanner; } }).process(new Processor() { private int count = 0; public void process(Exchange exchange) throws Exception { //往消息中设置关联key的值,在聚合的时候将会使用到 //如果消息的关联key值是相同的则表示需要进行聚合 exchange.getIn().setHeader("test_correlation_key", (++count)%2); System.out.println("body:" + exchange.getIn().getBody()); } }).aggregate(header("test_correlation_key"), new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { //如果oldExchange为null,则说明是第一个分解包 if (oldExchange == null) { return newExchange; } String oldBody = oldExchange.getIn().getBody(String.class); System.out.println("old body:" + oldBody); String newBody = newExchange.getIn().getBody(String.class); System.out.println("new body:" + newBody); //将新与旧包进行合并,再设置进Message的body中 oldExchange.getIn().setBody(oldBody + "\n" + newBody); return oldExchange; } }).completionTimeout(5000).process(new Processor() { public void process(Exchange exchange) throws Exception { //示例后续处理,进行输出 System.out.println("body:" + exchange.getIn().getBody()); } }); } }); camelContext.start(); Object object = new Object(); synchronized (object) { object.wait(); } } }
在分解与聚合过程中,与此相关的一些数据会做为Exchange的属性进行设置,如分解序号,分解是否完成,聚合序列等,详细内容可参看Camel官方文档:
http://camel.apache.org/splitter.html
http://camel.apache.org/aggregator.html
版权声明:本文为博主原创文章,未经博主允许不得转载。