HTTP异步编程
1.1 HTTP请求挂起#
Play的设计初衷在于完成较短的请求。通过HTTP接口,Play使用固定的线程池来处理请求队列。为了达到理想的效果,线程池应该设计得尽可能小。最典型的情况:以处理程序的数量+1作为最佳值来设定线程池的大小。
这意味着如果某个请求非常耗时(比如处理长时间的运算),它将会阻塞线程池并且影响应用程序的响应能力。当然,可以通过增大线程池大小来解决问题,但是这样会造成资源的极大浪费,而且线程池的大小也不可能无止尽地增大。
试想聊天应用程序的例子:浏览器发送一个阻塞的HTTP请求,这个HTTP请求的作用是等待新消息后显示。这种类型的HTTP请求会占用很长时间(通常是好几秒),从而导致线程池的阻塞。如果有100个用户同时连接这个聊天应用程序,那么至少需要提供100个线程。这还是可以接受的,如果有1000个用户呢?甚至有10000个呢?
为了解决这种情况,Play允许临时挂起HTTP请求。挂起的HTTP请求仍然保持连接,但是该请求的执行会被移出线程池并稍后进行尝试。根据需要,Play可以在一段固定的延时后恢复现场,继续执行请求。
下例Action使用now()方法调用ReportAsPDFJob,该Job需要较长的处理时间。按照正常的情况,程序必须等待ReportAsPDFJob执行完后,才能向HTTP发送响应结果:
public static void generatePDF(Long reportId) { Promise<InputStream> pdf = new ReportAsPDFJob(report).now(); InputStream pdfStream = await(pdf); renderBinary(pdfStream);}
Play提供了更加高效的解决方案,调用await()方法将请求挂起,并释放占用的线程。当Promise<InputStream>完成后,恢复现场,继续执行后续操作。
1.2 Continuations#
如果请求非常耗时,就把正在执行的代码挂起,并将占用的线程释放出来为其他的请求提供服务。这种高效的解决方案称为Continuations。在Play的早期版本中并没有await()方法,但可以等价地使用waitFor()方法。两者的效果相同,waitFor()方法也是将Action挂起,并在需要的时候重新调用。
在应用中引入Continuations技术是为了使代码的异步处理变得简单化。由于Continuations允许显式地挂起和重用代码,因此可以采用如下方式:
public static void computeSomething() { Promise<String> delayedResult = veryLongComputation(…); String result = await(delayedResult); render(result);}
事实上,这段代码会分为两步执行,并先后占用两个线程,但对于应用程序的代码却是透明的。以下是基于Continuations实现的循环:
public static void loopWithoutBlocking() { for(int i=0; i<=10; i++) { Logger.info(i); await("1s"); } renderText("Loop finished");}
上述示例中,假设Play在DEV模式下运行(只占用一个线程),但仍可以同时处理多个请求。
Cotinuations主要通过使用控制器调用await()方法实现,该方法接收两种不同类型的参数(事实上有6种重载的方法,但是主要应用场景为2种)。
- 第一种:采用timeout的方式来调用await()方法,参数类型可以为毫秒,或者为字符串类型的字面表达式(例如1s代表一秒钟)。
- 第二种:采用Future对象的方式来调用,并且通常情况下会使用Play的Promise(定义在lib.F中,实现了java Future类)。当Promise完成后会返回并继续执行其余的处理。值得注意的是,Promise中可以触发多个事件,比如waitAny()方法参数中包含多个事件,当完成了其中任何一个,该Promise便能够返回并继续执行。
因此,上述的两种方式都会导致在未来某个时间点触发事件。第一种为预先指定,而第二种需要根据Promise完成的时间。
Play引入Cotinuations,使得编写同类事件结构的代码更加简单:
//相关处理Aawait(timeout or promise);//等待promise执行完毕//相关处理B
在等待处理的过程中,服务器将HTTP线程释放出来,因此Play能够并发处理更多的请求,而且非常高效。当timeout时间到达或者Promise执行完毕,后续的代码会继续执行,并且不需要开发者编写任何与线程唤起相关的方法。
使用timeout的方式来调用await()方法:
public static void useTimeout() { for(int i=0; i<=10; i++) { Logger.info(i); await("1s"); } renderText("Execute finished");}
以上这段代码在执行过程中,一共释放了10次线程,并且在每秒等待结束后重新唤起。从开发者的角度看,这个处理过程是非常透明的,并且允许直观地构建应用(而不需要担心创建非阻塞应用,因为这些都交由Play进行处理)。
使用Promise的方式来调用await()方法:
public static void usePromise(){ F.Promise<WS.HttpResponse> promise1=WS.url("http://domain1.com").getAsync(); F.Promise<WS.HttpResponse> promise2=WS.url("http://domain2.com").getAsync(); F.Promise<List<WS.HttpResponse>> promises = F.Promise.waitAll(promise1, promise2); await(promises); renderText("Execute finished"); }
上述代码使用了lib.F中的waitAll()方法,需要等待promise1和promise2都处理完成后,才能够继续执行后续处理。类似地,Play还提供了waitAny(),waitEither()等方法。
1.3 HTTP流式响应
由于Play提供在非阻塞的情况下轮询处理请求的功能,读者可能会有这样的设想:服务器端能否实现只要生成了一部分可用的结果数据就马上发送给浏览器。在Play中实现这个功能完全没有问题,而实现的关键就是以Content-Type:Chunked作为HTTP的响应类型。它允许将HTTP响应分成不同的块(chunk)分批发送,只要这些分块一被发出,浏览器立马就能接收到。以下是使用await()方法和Continuations的实现:
public static void generateLargeCSV() { CSVGenerator generator = new CSVGenerator(); response.contentType = "text/csv"; while(generator.hasMoreData()) { String someCsvData = await(generator.nextDataChunk()); response.writeChunk(someCsvData); }}
虽然生成这个CSV文件需要花费1个小时,但是Play能够实现只使用一个线程同时处理好几个请求,并且一旦生成可用的数据后就马上发送给客户端。
1.4 WebSocket介绍
1> WebSocket介绍#
WebSocket的目标是通过在浏览器和应用程序之间建立一条通信的频道,实现两者的双向通信。在Play中的WebSocket实现如下:
在浏览器端,可以使用“ws://” URL方式建立socket连接:
new WebSocket("ws://localhost:9000/helloSocket?name=Guillaume")
服务器端配置对应的WebSocket路由规则:
WS /helloSocket MyWebSocket.hello
MyWebSocket是自定义的WebSocket控制器,继承于WebSocketController。WebSocket控制器和标准的HTTP控制器有些类似,但两者在概念上有所区别:
- WebSocket控制器只有request对象,没有response对象。
- WebSocket控制器可以访问session,但访问权限是只读的。
- WebSocket控制器没有renderArgs,routeArgs以及flash作用域。
- WebSocket控制器只能从路由模式或者以查询字符串的形式来读取参数。
- WebSocket控制器拥有inbound和outbound两种通信频道。
当客户端(即浏览器)通过ws://localhost:9000/helloSocket 建立socket连接时,Play会调用MyWebSocket控制器中的hello Action方法,一旦MyWebSocket.hello方法结束,该socket连接就会自动关闭:
public class MyWebSocket extends WebSocketController { public static void hello(String name) { outbound.send("Hello %s!", name); } }
针对上述例子,如果客户端建立了socket连接,就会收到“Hello Guillaume”消息,之后Play将socket连接关闭。
当然,大部分情况下并不需要急于将socket连接关闭,可以使用await()方法进行一些适当的扩展。以下程序使用了Continuations,使服务器具有应答功能:
public class MyWebSocket extends WebSocketController { public static void echo() { while(inbound.isOpen()) { WebSocketEvent e = await(inbound.nextEvent()); if(e instanceof WebSocketFrame) { WebSocketFrame frame = (WebSocketFrame)e; if(!e.isBinary) { if(frame.textData.equals("quit")) { outbound.send("Bye!"); disconnect(); } else { outbound.send("Echo: %s", frame.textData); } } } if(e instanceof WebSocketClose) { Logger.info("Socket closed!"); } } } }
上述例子用了大量的if嵌套语句,书写起来不仅乏味,而且容易出错,这并不是优雅的Java代码。处理如此简单的例子都需要编写复杂的嵌套语句,更何况那些结合了一些数据流,甚至拥有不同事件类型的应用,就会变得异常难以维护。这里介绍一种简洁的书写方式,其中的方法封装在play.libs.F库中。下面对具有应答功能的代码进行重构:
public static void echo() { while(inbound.isOpen()) { WebSocketEvent e = await(inbound.nextEvent()); for(String quit: TextFrame.and(Equals("quit")).match(e)) { outbound.send("Bye!"); disconnect(); } for(String msg: TextFrame.match(e)) { outbound.send("Echo: %s", frame.textData); } for(WebSocketClose closed: SocketClosed.match(e)) { Logger.info("Socket closed!"); } }}
2> 使用WebSocket#
开发WebSocket应用,需要使用支持WebSocket的浏览器(比如Chrome)。Firefox和Opera出于安全考虑,无法使用WebSocket协议。与长时间处理的方法不同,WebSocket中的方法需要在预先定义的时间间隔执行针对模型更新的数据库检查,从而触发事件。WebSocket的理念是保持连接状态,等待事件的触发,然后将事件广播给每个需要的用户。因此在Play中实现WebSocket的最佳方式是使用存储在服务器端的状态对象。虽然这样做有点违背无状态(stateless)以及RESTful风格的理念,但这应该是使用WebSocket的最佳实践。在设计WebSocket应用时,开发者需要根据自己的实际情况对代码做进一步的优化:
下面将演示如何创建WebSocket应用。使用play new命令创建新的应用,名称为websocket:
Play new websocket
在app/models/目录中创建StatefulModel.java:
package models; import play.libs.F; public class StatefulModel { public static StatefulModel instance = new StatefulModel(); public final F.EventStream event = new F.EventStream(); private StatefulModel() { }}
StatefulModel非常简单,由以下几个部分组成:
- 私有的构造方法StatefulModel()。
- 单例的静态实例化对象(单例模式),这意味着服务器端只允许存在一个StatefulModel的实例。
- EventStream对象,是WebSocket在Play中实现的最核心部分,它允许发布事件来通知所有等待监听的用户。
在这个例子当中,使用了标准的EventStream来访问当前的事件。Play同时提供了ArchiveEventStream(读者可以在samples-and-tests目录中查看Play提供的chat应用示例),可以获取所有可用的信息。打开app/controllers/Application.java文件,添加如下代码:
package controllers; import play.mvc.*;import models.*; public class Application extends Controller { public static void index() { render(); } public static class WebSocket extends WebSocketController { public static void listen() { while(inbound.isOpen()) { String event = await(StatefulModel.instance.event.nextEvent()); outbound.send(event); } } }}
在Application控制器中增加了继承WebSocketController的静态类。WebSocketController与标准的控制器有所不同:前者是基于inbound/outbound模式,而后者是面向request/response模式的。上述代码的业务逻辑非常简单,只是通过while循环来检查inbound是否处于打开状态(即WebSocket处于连接状态),接着调用nextEvent()方法来等待事件的触发。
早期的Play版本并没有await()方法。await()方法的作用是挂起HTTP请求,释放当前资源让框架以便继续处理其他的请求。直到有新的事件添加到StatefulModel中,程序会从之前离开的地方继续执行,而不是重新开始。
代码将数据从事件发送到outbound,最终返回到浏览器。那么浏览器需要如何处理这些数据呢?创建views/application/index.html模板:
#{extends ‘main.html‘ /}#{set title:‘Home‘ /} <div id="socketout"></div> <script type="text/javascript"> // Create a socket var socket = new WebSocket(‘@@{Application.WebSocket.listen}‘) // Message received on the socket socket.onmessage = function(event) { $(‘#socketout‘).append(event.data+"<br />"); }</script>
在模版中,仅仅使用div来显示WebSocket发送的数据。JavaScript的内容为:
- 创建WebSocket连接。
- 将获得的数据添加到div中。
在为自定义的WebSocket增加事件之前,先定义好路由:
WS /socket Application.WebSocket.listen
需要注意的是,这里使用WS作为HTTP请求类型来描述WebSocket请求,剩下的部分和之前一样配置。现在WebSocket已经可以运行了,但是这时候开启应用,打开浏览器看到的是空白页面,这是因为服务器并没有返回数据给浏览器,所以最后需要做的是触发事件。创建异步Job,在EventStream中增加一些消息。
在app目录下新建jobs包,然后创建Startup.java文件:
package job; import play.jobs.*;import models.StatefulModel; @OnApplicationStart(async = true)public class Startup extends Job { public void doJob() throws InterruptedException { int i = 0; while (true) { i++; Thread.sleep(1000); StatefulModel.instance.event.publish("On step " + i); } }}
这个Job会在应用启动的时候执行,并一直循环下去,直到应用停止。每次迭代的时候暂停1秒钟,然后为StatefulModel的EventStream发送一个事件。
开启应用,访问http://localhost:9000/查看效果,当打开页面的时候,可以发现事件会广播到每个监听的浏览器: