CXF中Web服务请求处理流程

Web服务其本质上还是HTTP请求,只不过要遵循Web服务特定的规范,比如说服务端与客户端双方通信的媒介为XML文件格式,以完成对一次Web服务调用的描述。当然此XML还有特定的格式,至于这个特定的格式是什么样的取决于使用哪一种数据绑定方式。CXF中称这为数据绑定,个人觉得称之为Java对象的序列化与反序列化更易懂些。

本文将从源码(CXF版本为2.7.6)层面来分析一下CXF是如何完成对一个Web服务(SOAP类型为例)请求的处理的,注意这里分析的是请求的处理,并不包含这个请求是如何生成的,分析将将从CXF服务端接收到一个请求开始。在CXF中,处理HTTP请求是使用Jetty实现的,其处理请求的Handler为org.apache.cxf.transport.http_jetty.JettyHTTPHandler在其handle方法中调用的是org.apache.cxf.transport.http_jetty.JettyHTTPDestination的doService()方法,其中最重要的一句代码是调用了serviceRequest(context,
req, resp)方法,下面是源码:

protected void serviceRequest(final ServletContext context,
                                  final HttpServletRequest req,
                                  final HttpServletResponse resp) throws IOException {
	Request baseRequest = (req instanceof Request) ? (Request)req : getCurrentRequest();

	if (LOG.isLoggable(Level.FINE)) {
		LOG.fine("Service http request on thread: " + Thread.currentThread());
	}
	Message inMessage = retrieveFromContinuation(req);

	if (inMessage == null) {

		inMessage = new MessageImpl();
		ExchangeImpl exchange = new ExchangeImpl();
		exchange.setInMessage(inMessage);
		setupMessage(inMessage, context, req, resp);

		((MessageImpl)inMessage).setDestination(this);

		exchange.setSession(new HTTPSession(req));
	}

	try {
		//处理消息
		//incomingObserver的现实类为org.apache.cxf.transport.ChainInitiationObserver
		incomingObserver.onMessage(inMessage);
		resp.flushBuffer();
		baseRequest.setHandled(true);
	} catch (SuspendedInvocationException ex) {
		//省略...
	}
	//省略...
}

在这个方法中,完成了对Message的创建,初始化设置(setupMessage);Exchange、Session的创建,并将Message与Session放置进Exchange对象中。Message对象的初始化设置(setupMessage方法中)非常重要,向Message中放置了很多信息,如Http请求对象,响应对象,请求的地址等一系列信息,具体请查看setupMessage方法,需要的对象创建完成后调用ChainInitiationObserver.onMessage()方法,源码如下:

public void onMessage(Message m) {
	Bus origBus = BusFactory.getAndSetThreadDefaultBus(bus);
	ClassLoaderHolder origLoader = null;
	try {
		if (loader != null) {
			origLoader = ClassLoaderUtils.setThreadContextClassloader(loader);
		}
		InterceptorChain phaseChain = null;

		//如果拦截器链不为空,则检查其状态,如果是停止或挂起状态则使其恢复
		if (m.getInterceptorChain() != null) {
			phaseChain = m.getInterceptorChain();
			// To make sure the phase chain is run by one thread once
			synchronized (phaseChain) {
				if (phaseChain.getState() == InterceptorChain.State.PAUSED
					|| phaseChain.getState() == InterceptorChain.State.SUSPENDED) {
					phaseChain.resume();
					return;
				}
			}
		}

		//重新创建Message对象,m为MessageImpl对象,而CXF中支持SOAP协议服务与restful服务
		//所以要根据具体使用的是哪一种类型创建出更具体的Message对象,这里,如果是SOAP协议则创建的是SOAPMessage对象
		//如果是restful服务则创建的是XMLMessage对象
		Message message = getBinding().createMessage(m);
		Exchange exchange = message.getExchange();
		if (exchange == null) {
			exchange = new ExchangeImpl();
			m.setExchange(exchange);
		}
		exchange.setInMessage(message);
		//往Exchage对象中填充信息
		setExchangeProperties(exchange, message);

		InterceptorProvider dbp = null;
		if (endpoint.getService().getDataBinding() instanceof InterceptorProvider) {
			//数据绑定对象不为空
			dbp = (InterceptorProvider)endpoint.getService().getDataBinding();
		}
		// setup chain
		if (dbp == null) {
			phaseChain = chainCache.get(bus.getExtension(PhaseManager.class).getInPhases(),
														 bus.getInInterceptors(),
														 endpoint.getService().getInInterceptors(),
														 endpoint.getInInterceptors(),
														 getBinding().getInInterceptors());
		} else {
			//将Bus、Service、Endpoint、协议绑定、数据绑定中的所有输入拦截器汇聚到拦截器链中
			phaseChain = chainCache.get(bus.getExtension(PhaseManager.class).getInPhases(),
										bus.getInInterceptors(),
										endpoint.getService().getInInterceptors(),
										endpoint.getInInterceptors(),
										getBinding().getInInterceptors(),
										dbp.getInInterceptors());
		}

		//将拦截器链设置进Message中
		message.setInterceptorChain(phaseChain);
		phaseChain.setFaultObserver(endpoint.getOutFaultObserver());
		addToChain(phaseChain, message);

		//调用拦截器链的doIntercept方法,该方法中即依次调用链中的各个拦截器的handMessage方法。
		phaseChain.doIntercept(message);

	} finally {
		if (origBus != bus) {
			BusFactory.setThreadDefaultBus(origBus);
		}
		if (origLoader != null) {
			origLoader.reset();
		}
	}
}

到这里我们知道了拦截器是如何被调用的,但奇怪是我们并还没有看到服务的调用,而当doIntercept执行完成后onMessage()方法将退出,serviceRequest方法也就基本上执行完成了。大家应该猜得出来,服务的调用正是在doIntercept方法中完成的,确切点是在拦截器链接中的某个拦截器的handMessage()方法中完成的。使用拦截器来实现框架自身某些特定的功能是不是和struts2中的拦截器有点类似呢(并不是说拦截器的实现方式)。

真实情况是拦截器链从Service注册的拦截器中获取了一个org.apache.cxf.interceptor.ServiceInvokerInterceptor对象,服务的调用由该拦截器完成。

public void handleMessage(final Message message) {
	final Exchange exchange = message.getExchange();
	final Endpoint endpoint = exchange.get(Endpoint.class);
	final Service service = endpoint.getService();
	//获取调用都对象org.apache.cxf.jaxws.JAXWSMethodInvoker
	final Invoker invoker = service.getInvoker();        

	//创建一个Runnable可执行对象
	Runnable invocation = new Runnable() {

		public void run() {
			Exchange runableEx = message.getExchange();
			//调用JAXWSMethodInvoker的invoke方法,其返回值result为MessageContentsList对象
			Object result = invoker.invoke(runableEx, getInvokee(message));
			if (!exchange.isOneWay()) {//如果不是单向Exchange,即是双向的,也就是还有输出
				Endpoint ep = exchange.get(Endpoint.class);
				//创建输出Message
				Message outMessage = runableEx.getOutMessage();
				if (outMessage == null) {
					outMessage = new MessageImpl();
					outMessage.setExchange(exchange);
					outMessage = ep.getBinding().createMessage(outMessage);
					exchange.setOutMessage(outMessage);
				}
				copyJaxwsProperties(message, outMessage);
				if (result != null) {
					MessageContentsList resList = null;
					if (result instanceof MessageContentsList) {
						resList = (MessageContentsList)result;
					} else if (result instanceof List) {
						resList = new MessageContentsList((List<?>)result);
					} else if (result.getClass().isArray()) {
						resList = new MessageContentsList((Object[])result);
					} else {
						outMessage.setContent(Object.class, result);
					}
					if (resList != null) {
						//将结果放置在outMessage中
						outMessage.setContent(List.class, resList);
					}
				}
			}
		}

	};

	//从Service中获取Executor,不为null
	Executor executor = getExecutor(endpoint);
	//从Exchange中获取Executor,为null
	Executor executor2 = exchange.get(Executor.class);
	if (executor2 == executor || executor == null) {
		// already executing on the appropriate executor
		invocation.run();
	} else {
		//执行
		exchange.put(Executor.class, executor);
		FutureTask<Object> o = new FutureTask<Object>(invocation, null) {
			@Override
			protected void done() {
				super.done();
				synchronized (this) {
					this.notifyAll();
				}
			}
		};
		synchronized (o) {
			//执行FutureTask,最后还是执行了刚创建Runnable的run方法,返回run方法中
			executor.execute(o);

			//省略...
		}
	}
}

调用JAXWSMethodInvoker的invoke(Exchange exchange, Object o)从AbstractInvoker类中继承而来,如下:

public Object invoke(Exchange exchange, Object o) {

	//获取服务实现类对象,默认使用单例工厂创建
	final Object serviceObject = getServiceObject(exchange);
	try {

		//获取方法信息
		BindingOperationInfo bop = exchange.get(BindingOperationInfo.class);
		MethodDispatcher md = (MethodDispatcher)
			exchange.get(Service.class).get(MethodDispatcher.class.getName());
		//查找出要执行的方法
		Method m = bop == null ? null : md.getMethod(bop);
		if (m == null && bop == null) {
			LOG.severe(new Message("MISSING_BINDING_OPERATION", LOG).toString());
			throw new Fault(new Message("EXCEPTION_INVOKING_OBJECT", LOG,
										 "No binding operation info", "unknown method", "unknown"));
		}
		List<Object> params = null;
		if (o instanceof List) {
			params = CastUtils.cast((List<?>)o);
		} else if (o != null) {
			params = new MessageContentsList(o);
		}

		m = adjustMethodAndParams(m, exchange, params);

		//Method m = (Method)bop.getOperationInfo().getProperty(Method.class.getName());
		m = matchMethod(m, serviceObject);//再次匹配方法

		//将服务对象,方法,方法参数传入
		return invoke(exchange, serviceObject, m, params);
	} finally {
		releaseServiceObject(exchange, serviceObject);
	}
}

将服务对象,方法,方法参数传入入调用了重载的protected Object invoke(Exchange exchange,  final Object serviceObject, Method m, List<Object> params)方法该方法位于JAXWSMethodInvoker类中,其夫归调用父类的invoke方法,最终调用AbstractInvoker的performInvocation方法,如下:

protected Object performInvocation(Exchange exchange, final Object serviceObject, Method m, Object[] paramArray) throws Exception {
	paramArray = insertExchange(m, paramArray, exchange);
	//省略...
	return m.invoke(serviceObject, paramArray);//通过反射调用服务实现对象方法
}

到这里,你应该知道,服务是如何被调用的,那么服务被调用后又是如何返回的呢?现在回到ServiceInvokerInterceptor拦截器中创建的Runnable对象,因为Exchange是双向的,所以会创建出outMessage,然后将服务执行的结果封装成一个MessageContentsList对象放置在outMessage的content中。

在输入拦截器链中还注册了另外一个org.apache.cxf.interceptor.OutgoingChainInterceptor拦截器:

public void handleMessage(Message message) {
	Exchange ex = message.getExchange();
	BindingOperationInfo binding = ex.get(BindingOperationInfo.class);
	if (null != binding && null != binding.getOperationInfo() && binding.getOperationInfo().isOneWay()) {
		closeInput(message);
		return;
	}
	Message out = ex.getOutMessage();
	if (out != null) {
		getBackChannelConduit(message);
		if (binding != null) {
			out.put(MessageInfo.class, binding.getOperationInfo().getOutput());
			out.put(BindingMessageInfo.class, binding.getOutput());
		}

		InterceptorChain outChain = out.getInterceptorChain();
		if (outChain == null) {
			//对于输出消息来说,输出拦截器链就是其输入,这里与收集输入拦截器类似,这只过这里是
			//获取Bus、Service、Endpoint、协议绑定、数据绑定中的输出拦截器
			outChain = OutgoingChainInterceptor.getChain(ex, chainCache);
			out.setInterceptorChain(outChain);
		}
		//负责收集输出拦截器并创建出输出拦截器链,然后调用其doIntercept方法
		outChain.doIntercept(out);
	}
}

在输出拦截器链中有几个关于向客端输出的拦截器:

1. org.apache.cxf.interceptor.MessageSenderInterceptor

在其handleMessage方法中执行了getConduit(message).prepare(message);而在prepare方法中向Message中设置了OutputStream,并传入了HttpServletResponse对象

public void prepare(Message message) throws IOException {
	message.put(HTTP_RESPONSE, response);
	OutputStream os = message.getContent(OutputStream.class);
	if (os == null) {
		message.setContent(OutputStream.class,
					   new WrappedOutputStream(message, response));
	}
}

Conduit实现类为org.apache.cxf.transport.http.AbstractHTTPDestination.BackChannelConduit

2. org.apache.cxf.interceptor.StaxOutInterceptor

创建XMLStreamWriter对象,并设置进Message中

3. org.apache.cxf.interceptor.BareOutInterceptor

获取服务调用的返回结果,使用XMLStreamWriter对象将包装好的返回结果以XML格式写入WrappedOutputStream中,而WrappedOutputStream是包装了HttpServletRespone.getOutputStream()对象

即,将结果返回给了客户端。

当输出拦截器链执行完成后返回到JettyHTTPDestination的serviceRequest()方法,刷新HttpServletRespone并标记请求处理完成。

时间: 2025-01-05 05:13:58

CXF中Web服务请求处理流程的相关文章

linux中web服务中间件软件及服务端和客户端软件

一.中间件: 1.认识中间件: 中间件(英语:Middleware)是提供系统软件和应用软件之间连接的软件,以便于软件各部件之间的沟通,特别是应用软件对于系统软件的集中的逻辑,在现代信息技术应用框架如Web服务.面向服务的体系结构等中应用比较广泛. IDC对中间件的定义表明,中间件是一类软件,而非一种软件;中间件不仅仅实现互连,还要实现应用之间的互操作;中间件是基于分布式处理的软件,最突出的特点是其网络通信功能. 该技术所提供的互操作性,推动了一致分布式体系架构的演进,该架构通常用于支持并简化那

CXF调用web服务

1.进入apache-cxf-2.7.6\bin目录,按住shift键,鼠标右键点击选择:  在此处打开命令窗口 2.输入 wsdl2java url ,url是对外发布的端口  ,    发布web服务示例: http://www.cnblogs.com/taobd/p/6691871.html 3.把生成的java文件复制到项目,示例如下: 1 package cn.bd.client; 2 import java.util.Date; 3 import javax.xml.bind.ann

用CXF发布Web服务

1.下载apache-cxf-2.7.6jar包,并把lib目录下的所有jar包导入项目 2.编写测试的实体类,示例如下: 1 package cn.bd.weather.entity; 2 3 import java.util.Date; 4 5 import javax.xml.bind.annotation.XmlRootElement; 6 /** 7 * 8 * @author Administrator 9 * @XmlRootElement 表示根元素 10 */ 11 @XmlR

Linux中web服务运行情况的方法

监控一般通过脚本实现,使用定时任务定期执行检测. 1.端口 本地:ss,netstat,lsof 远程:telnet,namp,nc 2.本地进程数 例如: lsof -i:80|wc -l ps -ef|grep nginx|wc -l nmap 192.168.220.139 -p 80|grep open|wc -l 3.查看http返回码 返回的是200就正常 [[email protected] ~]# curl -I -s -w "%{http_code}" -o /dev

企业中web服务http网站的搭建和配置

WEB是基于B/S架构的WEB通信,是服务器与客户端的其中模式,服务端支持HTTP协议的网页提供程序,客户端按标记规范显示网页的浏览器程序. 工作模式: 客户端通过HTTP协议对服务端进行请求,服务端通过HTTP应答给客户端,监听的端口为80端口 . Apache的主目录和文件有哪些? 主配置文件:/etc/httpd/conf/httpd.conf配置目录:/etc/httpd/conf.d/网站根目录:/var/www/html/访问日志:/var/log/httpd/access_log错

CXF客户端请求服务流程

CXF(使用版本2.7.6)对Web服务封装度已经非常高了,你只需要像正常写代码一样,附加几个额外的注解直接发布,服务端差不多就完成了:对于客户端更简单,只需要知道Web服务的URL地址和接口,就能如调用本地代码一样,几乎感觉不到与本地代码有什么区别.这就是封装的威力,虽然高度封装简化了我们对Web服务的使用,但也间接地阻挡了我们对其深入了解.本文就将源码层面来分析CXF其内部是如何完成客户端对Web服务的调用封装的,但并不包含服务端对服务请求的处理过程,如有需要可参看上篇,CXF中Web服务请

简单web服务开发(JWS,CXF,AXIS)

(一)利用JWS开发web服务 开发Web服务器端: 1.定义接口HelloWorld,使用@WebService注解修饰接口名,使用@WebMethod修饰需要对外发布的方法. 2.HelloWorld接口实现类 3.发布类HelloWorldService 4.编译运行HelloWorldService 类之后查看WSDL文档http://localhost:8080/ HelloWorld?wsdl 客户端代码: 1.生成客户端所需要的文件 新建工程WebServerClient,进入工程

Axis2 Web服务配置文件services.xml详解

在Axis1中部署服务时,我们使用service.wsdd文件来配置服务.在Axis2中,不再使用service.wsdd文件来配置服务,改用services.xml了.这两个配置文件的语法是截然不同的. 本文涵盖了services.xml文件的语法和使用说明.在Apache Axis2/Java中,同一个服务包文件既可以用于部署单个服务,也可以部署多个服务.不论以何种方式部署服务,一个有效的服务包文件必须包含services.xml文件.随着我们部署服务的方式不同,services.xml文件

AEAI ESB-基于ESB的Web服务开发

1.概述 Web服务是ESB中间件的常见解决方案,它使得运行在不同机器上的不同应用无须借助附加的.专门的第三方软件或硬件,就可相互交换数据或集成.依据Web Service规范实施的应用之间,无论它们所使用的语言.平台或内部协议是什么,都可以相互交换数据. AEAIESB提供了非常便捷的服务创建方式,本文档主要为读者介绍如何使用ESB来创建一个简单的Web服务接口. 2.样例 创建webservice服务 在WS目录右键,点击"创建web服务",如下图: 配置web服务向导 服务名称填