Camel路由启动过程

路由启动由CamelContext的start()方法开始,在该方法中调用了super.start(),即调用父类ServiceSupport的start()方法,ServiceSupport的start()方法中调用了doStart()方法又回到CamelContext的doStart()方法,该方法中调用了doStartCamel()方法,在doStartCamel()方法中有两个最重要的方法:startRouteDefinitions()与doStartOrResumeRoutes()方法。

我们先看startRouteDefinitions()方法:

protected void startRouteDefinitions(Collection<RouteDefinition> list) throws Exception {
	if (list != null) {
		for (RouteDefinition route : list) {
			startRoute(route);
		}
	}
}

该方法遍历所有路由定义,调用startRoute()方法,下面是源码:

public void startRoute(RouteDefinition route) throws Exception {
    //省略一些代码...

	//设置正在启动路由为true
    isStartingRoutes.set(true);
    try {
        //确定路由已经预处理完毕
        route.prepare(this);
		//创建一个路由list
        List<Route> routes = new ArrayList<Route>();
		//调用RouteDefinition的addRoutes方法,返回一个路由上下文list
        List<RouteContext> routeContexts = route.addRoutes(this, routes);
		//创建路由服务
        RouteService routeService = new RouteService(this, route, routeContexts, routes);
		//启动路由服务
        startRouteService(routeService, true);
    } finally {
        // we are done staring routes
        isStartingRoutes.remove();
    }
}

下面是RouteDefinition的.addRoutes方法源码:

public List<RouteContext> addRoutes(ModelCamelContext camelContext, Collection<Route> routes) throws Exception {
    List<RouteContext> answer = new ArrayList<RouteContext>();

    @SuppressWarnings("deprecation")
    ErrorHandlerFactory handler = camelContext.getErrorHandlerBuilder();
    if (handler != null) {
        setErrorHandlerBuilderIfNull(handler);
    }

	//迭代路由定义输入集体,在示例中只有一个输入
    for (FromDefinition fromType : inputs) {
        RouteContext routeContext;
        try {
        	//调用另一重载addRoutes方法
            routeContext = addRoutes(camelContext, routes, fromType);
        } catch (FailedToCreateRouteException e) {
            throw e;
        } catch (Exception e) {
            // wrap in exception which provide more details about which route was failing
            throw new FailedToCreateRouteException(getId(), toString(), e);
        }
        answer.add(routeContext);
    }
    return answer;
}

下面是重载addRoutes(camelContext, routes, fromType)方法源码:

protected RouteContext addRoutes(CamelContext camelContext, Collection<Route> routes, FromDefinition fromType) throws Exception {
    RouteContext routeContext = new DefaultRouteContext(camelContext, this, fromType, routes);

	//省略很多代码...
	//解析Endpoint
	routeContext.getEndpoint();

    //省略很多代码...

	//创建一list将路由定义的所有输出添加进去
    List<ProcessorDefinition<?>> list = new ArrayList<ProcessorDefinition<?>>(outputs);
    //迭代所有输出
    for (ProcessorDefinition<?> output : list) {
        try {
        	//调用输出ProcessorDefinition的addRoutes方法
            output.addRoutes(routeContext, routes);
        } catch (Exception e) {
            RouteDefinition route = routeContext.getRoute();
            throw new FailedToCreateRouteException(route.getId(), route.toString(), output.toString(), e);
        }
    }

    routeContext.commit();
    return routeContext;
}

在该方法中先调用了RouteContext.getEndpoint()方法,下面是getEndpoint方法源码:

public Endpoint getEndpoint() {
    if (endpoint == null) {
    	//调用FromDefinition的resolveEndpoint方法返回一个Endpoint对象并赋给endpoint成员变量
        endpoint = from.resolveEndpoint(this);
    }
    return endpoint;
}

FromDefinition的resolveEndpoint方法中调用RouteContext的resolveEndpoint(uri, ref)方法,下面源码:

public Endpoint resolveEndpoint(String uri, String ref) {
    Endpoint endpoint = null;
    if (uri != null) {
        endpoint = resolveEndpoint(uri);
        if (endpoint == null) {
            throw new NoSuchEndpointException(uri);
        }
    }
    if (ref != null) {
        endpoint = lookup(ref, Endpoint.class);
        if (endpoint == null) {
            throw new NoSuchEndpointException("ref:" + ref, "check your camel registry with id " + ref);
        }
        // Check the endpoint has the right CamelContext
        if (!this.getCamelContext().equals(endpoint.getCamelContext())) {
            throw new NoSuchEndpointException("ref:" + ref, "make sure the endpoint has the same camel context as the route does.");
        }
    }
    if (endpoint == null) {
        throw new IllegalArgumentException("Either 'uri' or 'ref' must be specified on: " + this);
    } else {
        return endpoint;
    }
}

在上面的方法中一是根据uri解析出Endpoint,二是根据ref值从注册表中获取Endpoint,这里看前一种方式:在resolveEndpoint(uri)方法中调用了RouteDefinition的resolveEndpoint方法,该方法又调用CamelContextHelper.getMandatoryEndpoint(camelContext, uri)方法,

getMandatoryEndpoint再调用CamelContext的getEndpoint(String uri),在这个方法中调用了getComponent(uri)方法,然后由getComponent(uri)返回的Component调用createEndpoint()方法获取Endpoint,这里就和Camel查找组件方式串接起来了。

现在返回到RouteDefinition的addRoutes方法中,接下来就是迭代路由定义输出了,在示例中,两个输出ProcessorDefinition分别为ProcessDefinition与ToDefinition,两者都继承自ProcessorDefinition,最终都是调用ProcessorDefinition的addRoutes方法,下面是该方法源码:

public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
    Processor processor = makeProcessor(routeContext);
    if (processor == null) {
        // no processor to add
        return;
    }

    if (!routeContext.isRouteAdded()) {
        //省略一些代码...

        // only add regular processors as event driven
        if (endpointInterceptor) {
            log.debug("Endpoint interceptor should not be added as an event driven consumer route: {}", processor);
        } else {
            log.trace("Adding event driven processor: {}", processor);
            routeContext.addEventDrivenProcessor(processor);
        }

    }
}

这里最重要的是makeProcessor方法,就是层层包装创建出一个处理器,它其实是一个DefaultChanel实例,然后调用addEventDrivenProcessor将其添加到RouteContext中。下面是makeProcessor(routeContext)方法源码:

protected Processor makeProcessor(RouteContext routeContext) throws Exception {
    Processor processor = null;

    //省略很多代码...

    if (processor == null) {
    	//这里就是在调用路由定义输出ProcessDefinition与ToDefinition的createProcessor方法
        processor = createProcessor(routeContext);
    }

    if (processor == null) {
        // no processor to make
        return null;
    }
    //processor创建出来了进行包装
    return wrapProcessor(routeContext, processor);
}

对于ProcessDefinition的createProcessor方法返回的就是我们传入的Processor对象,ToDefinition的createProcessor方法返回的是一个SendProcessor对象。

在wrapProcessor方法调用了wrapChannel方法,就是要将Processor包装成一个DefaultChannel对象,然后对该对象进行一些初始化的操作。

再回到RouteDefinition的addRoutes方法,调用完ProcessorDefinition的addRoutes方法后调用RouteContext的commit()方法,下面是commit()方法源码:

public void commit() {
    //前面说到创建出的DefaultChannel对象已经全部添加到RouteContext中了所以eventDrivenProcessors就不会为空
    if (!eventDrivenProcessors.isEmpty()) {
    	//创建Pipeline对象,将eventDrivenProcessors传进去
        Processor target = Pipeline.newInstance(getCamelContext(), eventDrivenProcessors);

        String routeId = route.idOrCreate(getCamelContext().getNodeIdFactory());

        //将Pipeline包装进CamelInternalProcessor中
        CamelInternalProcessor internal = new CamelInternalProcessor(target);

        //省略很多代码

        //创建EventDrivenConsumerRoute对象,这才是真正的路由对象,前面的都是路由定义
        //getEndpoint()得到的Endpoint对象就是FromDefinition.resolveEndpoint返回的Endpoint对象
        //示例中就是FileEndpoint对象
        //将CamelInternalProcessor对象传入Route
        Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), internal);

        //省略很多代码

		//将创建出来的路由对象添加进路由集合
        routes.add(edcr);
    }
}

现在回到DefaultCamelContext.startRoute(RouteDefinition route)方法,routeContexts list返回后,用于创建一个RouteService对象然后调用startRouteService方法,该方法中调用safelyStartRouteServices方法时有一判断,shouldStartRoutes是否应该启动路由,对示例中来说,由于这里CamelContext还没有启动完成,所以并没有调用到safelyStartRouteServices()方法,如果是CamelContext先启动的,添加的路由则会启动。

现在回到DefaultCamelContext.doStartCamel()中,接下来就要调用doStartOrResumeRoutes()方法:

protected void doStartOrResumeRoutes(Map<String, RouteService> routeServices, boolean checkClash,
                                         boolean startConsumer, boolean resumeConsumer, boolean addingRoutes) throws Exception {
    // 过滤掉已经启动的路由服务
    Map<String, RouteService> filtered = new LinkedHashMap<String, RouteService>();
    for (Map.Entry<String, RouteService> entry : routeServices.entrySet()) {
        boolean startable = false;

        Consumer consumer = entry.getValue().getRoutes().iterator().next().getConsumer();
        if (consumer instanceof SuspendableService) {
            // consumer could be suspended, which is not reflected in the RouteService status
            startable = ((SuspendableService) consumer).isSuspended();
        }

        if (!startable && consumer instanceof StatefulService) {
            // consumer could be stopped, which is not reflected in the RouteService status
            startable = ((StatefulService) consumer).getStatus().isStartable();
        } else if (!startable) {
            // no consumer so use state from route service
            startable = entry.getValue().getStatus().isStartable();
        }

        if (startable) {
            filtered.put(entry.getKey(), entry.getValue());
        }
    }

    if (!filtered.isEmpty()) {
        //启动还未启动的路由服务
        safelyStartRouteServices(checkClash, startConsumer, resumeConsumer, addingRoutes, filtered.values());
    }

    // now notify any startup aware listeners as all the routes etc has been started,
    // allowing the listeners to do custom work after routes has been started
    for (StartupListener startup : startupListeners) {
        startup.onCamelContextStarted(this, isStarted());
    }
}

上面方法中又调用了safelyStartRouteServices方法,然后其调用一重载方法,重载方法中调用了doWarmUpRoutes与doStartRouteConsumers方法,在doWarmUpRoutes方法中调用了RouteService.warmUp(),下面是warmUp方法源码:

public synchronized void warmUp() throws Exception {
    if (endpointDone.compareAndSet(false, true)) {
        for (Route route : routes) {
            // 确保Endpoint先启动
            ServiceHelper.startService(route.getEndpoint());
        }
    }

    if (warmUpDone.compareAndSet(false, true)) {

        for (Route route : routes) {
            // warm up the route first
            route.warmUp();

	    //该方法中调用addServices方法,addServices方法中创建了Consumer并赋给了Route的consumer属性
	    route.onStartingServices(services);
            //省略很多代码

            List<Service> childServices = new ArrayList<Service>();
            for (Service service : list) {

                // inject the route
                if (service instanceof RouteAware) {
                    ((RouteAware) service).setRoute(route);
                }

                if (service instanceof Consumer) {
                	//如果是Consumer实例则添加到inputs中
                    inputs.put(route, (Consumer) service);
                } else {
                    childServices.add(service);
                }
            }
            startChildService(route, childServices);
        }

        // ensure lifecycle strategy is invoked which among others enlist the route in JMX
        for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
            strategy.onRoutesAdd(routes);
        }

        // add routes to camel context
        camelContext.addRouteCollection(routes);
    }
}

这里说明一个Consumer的创建,下面是EventDrivenConsumerRoute.addServices方法源码:

@Override
protected void addServices(List<Service> services) throws Exception {
	Endpoint endpoint = getEndpoint();
	//这里调用了endpoint(示例中即FileEndpoint)的createConsumer方法
	//传入了一个processor,而这个processor就是前面经过层层包装了的CamelInternalProcessor
	consumer = endpoint.createConsumer(processor);
	if (consumer != null) {
		services.add(consumer);
		if (consumer instanceof RouteAware) {
			((RouteAware) consumer).setRoute(this);
		}
	}
	Processor processor = getProcessor();
	if (processor instanceof Service) {
		services.add((Service)processor);
	}
}

doStartRouteConsumers方法中调用doStartOrResumeRouteConsumers方法,下面是源码:

private void doStartOrResumeRouteConsumers(Map<Integer, DefaultRouteStartupOrder> inputs, boolean resumeOnly, boolean addingRoute) throws Exception {
    List<Endpoint> routeInputs = new ArrayList<Endpoint>();

    for (Map.Entry<Integer, DefaultRouteStartupOrder> entry : inputs.entrySet()) {
        Integer order = entry.getKey();
        Route route = entry.getValue().getRoute();
        //取出RouteService
        RouteService routeService = entry.getValue().getRouteService();

        //省略...

        // start the service
        for (Consumer consumer : routeService.getInputs().values()) {
            Endpoint endpoint = consumer.getEndpoint();

            //省略...

            if (resumeOnly && route.supportsSuspension()) {
                // if we are resuming and the route can be resumed
                ServiceHelper.resumeService(consumer);
                log.info("Route: " + route.getId() + " resumed and consuming from: " + endpoint);
            } else {
                // when starting we should invoke the lifecycle strategies
                for (LifecycleStrategy strategy : lifecycleStrategies) {
                    strategy.onServiceAdd(this, consumer, route);
                }
                //启动Consumer,在示例中就是FileConsumer
                startService(consumer);
                log.info("Route: " + route.getId() + " started and consuming from: " + endpoint);
            }

            //省略...
        }

        if (resumeOnly) {
            routeService.resume();
        } else {
            //启动RouteService,其内部就是调用各个Route的start()方法,但因其doStart没做什么操作
            //所以该方法也就没什么操作了
            routeService.start(false);
        }
    }
}

该方法中就把Consumer(FileConsumer)启动起来了,这篇就先讲到这里,下篇讲解文件是如何被轮询出来再经过处理最后搬运到另一目录的

时间: 2024-10-11 17:18:57

Camel路由启动过程的相关文章

Camel路由启动过程--续

上篇Camel启动路由过程中讲到启动Consumer,调用了DefaultCamelContext.startService(service)方法,下面是方法源码: private void startService(Service service) throws Exception { if (service instanceof StartupListener) { StartupListener listener = (StartupListener) service; addStartu

Camel路由构建过程

个人认为Camel中最重要的两大块,一是路由的构建过程,二是路由构建完成后的执行过程. 下面就参数前面的Camel示例来说说路由的构建细节. 其实这里说的路由构建其实是构建路由定义,对应Camel中的RouteDefinition类,一个RouteDefinition对象规定了或者说指定了一个消息从哪里产生,中间要经过什么样的处理,最后路由到什么地方.RouteDefinition有点类似java中的Class类,包含的都是一个元信息,外界则是参照这些元信息进行工作.RouteDefinitio

Android AudioPolicyService服务启动过程

AudioPolicyService是策略的制定者,比如什么时候打开音频接口设备.某种Stream类型的音频对应什么设备等等.而AudioFlinger则是策略的执行者,例如具体如何与音频设备通信,如何维护现有系统中的音频设备,以及多个音频流的混音如何处理等等都得由它来完成.AudioPolicyService根据用户配置来指导AudioFlinger加载设备接口,起到路由功能. AudioPolicyService启动过程 AudioPolicyService服务运行在mediaserver进

Android4.4 以太网和DHCP启动过程介绍

转自:http://blog.csdn.net/wlwl0071986/article/details/51451843 Android4.4已经加入了以太网的支持.现在对以太网的初始化流程.网络策略配置.dhcp交互过程等做一些简单的介绍. 一.以太网启动流程 1. 创建ConnectivityService SystemServer.Java // networkmanagement.networkStats.networkPolicy已经提前创建好,并作为参数传入 connectivity

Symfony启动过程详细学习

想了解symfony的启动过程,必须从启动文件(这里就以开发者模式)开始. <?php /* * web/app_dev.php */ $loader = require_once __DIR__.'/../app/bootstrap.php.cache'; Debug::enable(); require_once __DIR__.'/../app/AppKernel.php'; //初始化AppKernel $kernel = new AppKernel('dev', true); //Ke

Openstack liberty源码分析 之 云主机的启动过程3

接上篇Openstack liberty源码分析 之 云主机的启动过程2, 简单回顾下:nova-conductor收到nova-scheduler返回的主机列表后,依次发送异步rpc请求给目标主机的nova-compute服务,下面继续来看nova-compute服务的处理过程: nova-compute 根据路由映射,nova-compute中处理云主机启动请求的方法为 nova/compute/manager.py.ComputeManager.py.build_and_run_insta

laravel的启动过程解析

如果没有使用过类似Yii之类的框架,直接去看laravel,会有点一脸迷糊的感觉,起码我是这样的.laravel的启动过程,也是laravel的核心,对这个过程有一个了解,有助于得心应手的使用框架,希望能对大家有点帮助.提示:在此之前,最好看一下官方文档,大概知道laravel,再来看这个笔记,这样效果可能会好一点. 统一入口 laravel框架使用了统一入口,入口文件:/public/index.php <?php //自动加载文件设置 require __DIR__.'/../bootstr

Symfoy2源码分析——启动过程2

上一篇分析Symfony2框架源码,探究Symfony2如何完成一个请求的前半部分,前半部分可以理解为Symfony2框架为处理请求做准备工作,包括container生成.缓存.bundls初始化等一些列准备工作(Symfoy2源码分析——启动过程1).而这一篇讲的是Symfony2如何根据请求的数据生成Response对象,向客户端返回响应数据. 在分析前需要了解Symfony2的事件驱动机制:Symfony2事件驱动. 言归正传,Symfony2请求的工作流程其实是Symfony2内核的事件

laravel启动过程简单解析

:first-child{margin-top:0!important}img.plugin{box-shadow:0 1px 3px rgba(0,0,0,.1);border-radius:3px}iframe{border:0}figure{-webkit-margin-before:0;-webkit-margin-after:0;-webkit-margin-start:0;-webkit-margin-end:0}kbd{border:1px solid #aaa;-moz-bord