protected void startRouteDefinitions(Collection<RouteDefinition> list) throws Exception { if (list != null) { for (RouteDefinition route : list) { startRoute(route); } } }
public void startRoute(RouteDefinition route) throws Exception { //省略一些代码... //设置正在启动路由为true isStartingRoutes.set(true); try { //确定路由已经预处理完毕 route.prepare(this); //创建一个路由list List<Route> routes = new ArrayList<Route>(); //调用RouteDefinition的addRoutes方法,返回一个路由上下文list List<RouteContext> routeContexts = route.addRoutes(this, routes); //创建路由服务 RouteService routeService = new RouteService(this, route, routeContexts, routes); //启动路由服务 startRouteService(routeService, true); } finally { // we are done staring routes isStartingRoutes.remove(); } }
public List<RouteContext> addRoutes(ModelCamelContext camelContext, Collection<Route> routes) throws Exception { List<RouteContext> answer = new ArrayList<RouteContext>(); @SuppressWarnings("deprecation") ErrorHandlerFactory handler = camelContext.getErrorHandlerBuilder(); if (handler != null) { setErrorHandlerBuilderIfNull(handler); } //迭代路由定义输入集体,在示例中只有一个输入 for (FromDefinition fromType : inputs) { RouteContext routeContext; try { //调用另一重载addRoutes方法 routeContext = addRoutes(camelContext, routes, fromType); } catch (FailedToCreateRouteException e) { throw e; } catch (Exception e) { // wrap in exception which provide more details about which route was failing throw new FailedToCreateRouteException(getId(), toString(), e); } answer.add(routeContext); } return answer; }
下面是重载addRoutes(camelContext, routes, fromType)方法源码:
protected RouteContext addRoutes(CamelContext camelContext, Collection<Route> routes, FromDefinition fromType) throws Exception { RouteContext routeContext = new DefaultRouteContext(camelContext, this, fromType, routes); //省略很多代码... //解析Endpoint routeContext.getEndpoint(); //省略很多代码... //创建一list将路由定义的所有输出添加进去 List<ProcessorDefinition<?>> list = new ArrayList<ProcessorDefinition<?>>(outputs); //迭代所有输出 for (ProcessorDefinition<?> output : list) { try { //调用输出ProcessorDefinition的addRoutes方法 output.addRoutes(routeContext, routes); } catch (Exception e) { RouteDefinition route = routeContext.getRoute(); throw new FailedToCreateRouteException(route.getId(), route.toString(), output.toString(), e); } } routeContext.commit(); return routeContext; }
public Endpoint getEndpoint() { if (endpoint == null) { //调用FromDefinition的resolveEndpoint方法返回一个Endpoint对象并赋给endpoint成员变量 endpoint = from.resolveEndpoint(this); } return endpoint; }
FromDefinition的resolveEndpoint方法中调用RouteContext的resolveEndpoint(uri, ref)方法,下面源码:
public Endpoint resolveEndpoint(String uri, String ref) { Endpoint endpoint = null; if (uri != null) { endpoint = resolveEndpoint(uri); if (endpoint == null) { throw new NoSuchEndpointException(uri); } } if (ref != null) { endpoint = lookup(ref, Endpoint.class); if (endpoint == null) { throw new NoSuchEndpointException("ref:" + ref, "check your camel registry with id " + ref); } // Check the endpoint has the right CamelContext if (!this.getCamelContext().equals(endpoint.getCamelContext())) { throw new NoSuchEndpointException("ref:" + ref, "make sure the endpoint has the same camel context as the route does."); } } if (endpoint == null) { throw new IllegalArgumentException("Either 'uri' or 'ref' must be specified on: " + this); } else { return endpoint; } }
在上面的方法中一是根据uri解析出Endpoint,二是根据ref值从注册表中获取Endpoint,这里看前一种方式:在resolveEndpoint(uri)方法中调用了RouteDefinition的resolveEndpoint方法,该方法又调用CamelContextHelper.getMandatoryEndpoint(camelContext, uri)方法,
getMandatoryEndpoint再调用CamelContext的getEndpoint(String uri),在这个方法中调用了getComponent(uri)方法,然后由getComponent(uri)返回的Component调用createEndpoint()方法获取Endpoint,这里就和Camel查找组件方式串接起来了。
public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception { Processor processor = makeProcessor(routeContext); if (processor == null) { // no processor to add return; } if (!routeContext.isRouteAdded()) { //省略一些代码... // only add regular processors as event driven if (endpointInterceptor) { log.debug("Endpoint interceptor should not be added as an event driven consumer route: {}", processor); } else { log.trace("Adding event driven processor: {}", processor); routeContext.addEventDrivenProcessor(processor); } } }
protected Processor makeProcessor(RouteContext routeContext) throws Exception { Processor processor = null; //省略很多代码... if (processor == null) { //这里就是在调用路由定义输出ProcessDefinition与ToDefinition的createProcessor方法 processor = createProcessor(routeContext); } if (processor == null) { // no processor to make return null; } //processor创建出来了进行包装 return wrapProcessor(routeContext, processor); }
public void commit() { //前面说到创建出的DefaultChannel对象已经全部添加到RouteContext中了所以eventDrivenProcessors就不会为空 if (!eventDrivenProcessors.isEmpty()) { //创建Pipeline对象,将eventDrivenProcessors传进去 Processor target = Pipeline.newInstance(getCamelContext(), eventDrivenProcessors); String routeId = route.idOrCreate(getCamelContext().getNodeIdFactory()); //将Pipeline包装进CamelInternalProcessor中 CamelInternalProcessor internal = new CamelInternalProcessor(target); //省略很多代码 //创建EventDrivenConsumerRoute对象,这才是真正的路由对象,前面的都是路由定义 //getEndpoint()得到的Endpoint对象就是FromDefinition.resolveEndpoint返回的Endpoint对象 //示例中就是FileEndpoint对象 //将CamelInternalProcessor对象传入Route Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), internal); //省略很多代码 //将创建出来的路由对象添加进路由集合 routes.add(edcr); } }
现在回到DefaultCamelContext.startRoute(RouteDefinition route)方法,routeContexts list返回后,用于创建一个RouteService对象然后调用startRouteService方法,该方法中调用safelyStartRouteServices方法时有一判断,shouldStartRoutes是否应该启动路由,对示例中来说,由于这里CamelContext还没有启动完成,所以并没有调用到safelyStartRouteServices()方法,如果是CamelContext先启动的,添加的路由则会启动。
protected void doStartOrResumeRoutes(Map<String, RouteService> routeServices, boolean checkClash, boolean startConsumer, boolean resumeConsumer, boolean addingRoutes) throws Exception { // 过滤掉已经启动的路由服务 Map<String, RouteService> filtered = new LinkedHashMap<String, RouteService>(); for (Map.Entry<String, RouteService> entry : routeServices.entrySet()) { boolean startable = false; Consumer consumer = entry.getValue().getRoutes().iterator().next().getConsumer(); if (consumer instanceof SuspendableService) { // consumer could be suspended, which is not reflected in the RouteService status startable = ((SuspendableService) consumer).isSuspended(); } if (!startable && consumer instanceof StatefulService) { // consumer could be stopped, which is not reflected in the RouteService status startable = ((StatefulService) consumer).getStatus().isStartable(); } else if (!startable) { // no consumer so use state from route service startable = entry.getValue().getStatus().isStartable(); } if (startable) { filtered.put(entry.getKey(), entry.getValue()); } } if (!filtered.isEmpty()) { //启动还未启动的路由服务 safelyStartRouteServices(checkClash, startConsumer, resumeConsumer, addingRoutes, filtered.values()); } // now notify any startup aware listeners as all the routes etc has been started, // allowing the listeners to do custom work after routes has been started for (StartupListener startup : startupListeners) { startup.onCamelContextStarted(this, isStarted()); } }
public synchronized void warmUp() throws Exception { if (endpointDone.compareAndSet(false, true)) { for (Route route : routes) { // 确保Endpoint先启动 ServiceHelper.startService(route.getEndpoint()); } } if (warmUpDone.compareAndSet(false, true)) { for (Route route : routes) { // warm up the route first route.warmUp(); //该方法中调用addServices方法,addServices方法中创建了Consumer并赋给了Route的consumer属性 route.onStartingServices(services); //省略很多代码 List<Service> childServices = new ArrayList<Service>(); for (Service service : list) { // inject the route if (service instanceof RouteAware) { ((RouteAware) service).setRoute(route); } if (service instanceof Consumer) { //如果是Consumer实例则添加到inputs中 inputs.put(route, (Consumer) service); } else { childServices.add(service); } } startChildService(route, childServices); } // ensure lifecycle strategy is invoked which among others enlist the route in JMX for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { strategy.onRoutesAdd(routes); } // add routes to camel context camelContext.addRouteCollection(routes); } }
@Override protected void addServices(List<Service> services) throws Exception { Endpoint endpoint = getEndpoint(); //这里调用了endpoint(示例中即FileEndpoint)的createConsumer方法 //传入了一个processor,而这个processor就是前面经过层层包装了的CamelInternalProcessor consumer = endpoint.createConsumer(processor); if (consumer != null) { services.add(consumer); if (consumer instanceof RouteAware) { ((RouteAware) consumer).setRoute(this); } } Processor processor = getProcessor(); if (processor instanceof Service) { services.add((Service)processor); } }
private void doStartOrResumeRouteConsumers(Map<Integer, DefaultRouteStartupOrder> inputs, boolean resumeOnly, boolean addingRoute) throws Exception { List<Endpoint> routeInputs = new ArrayList<Endpoint>(); for (Map.Entry<Integer, DefaultRouteStartupOrder> entry : inputs.entrySet()) { Integer order = entry.getKey(); Route route = entry.getValue().getRoute(); //取出RouteService RouteService routeService = entry.getValue().getRouteService(); //省略... // start the service for (Consumer consumer : routeService.getInputs().values()) { Endpoint endpoint = consumer.getEndpoint(); //省略... if (resumeOnly && route.supportsSuspension()) { // if we are resuming and the route can be resumed ServiceHelper.resumeService(consumer); log.info("Route: " + route.getId() + " resumed and consuming from: " + endpoint); } else { // when starting we should invoke the lifecycle strategies for (LifecycleStrategy strategy : lifecycleStrategies) { strategy.onServiceAdd(this, consumer, route); } //启动Consumer,在示例中就是FileConsumer startService(consumer); log.info("Route: " + route.getId() + " started and consuming from: " + endpoint); } //省略... } if (resumeOnly) { routeService.resume(); } else { //启动RouteService,其内部就是调用各个Route的start()方法,但因其doStart没做什么操作 //所以该方法也就没什么操作了 routeService.start(false); } } }