基于AsyncRestTemplate异步HTTP请求的一种轻量级技术实现

本文是在学习中的总结,欢迎转载但请注明出处:http://blog.csdn.net/pistolove/article/details/51428562


Ⅰ、前言

在上一篇博客中讲述ListenableFuture通过异步回调机制来实现请求的非阻塞。通常情况下,客户端获取数据并不会只发送一次http请求,可能会有多个http请求。这样,使用上一篇博客中的方法,就会产生大量的冗余代码,因为请求处理的代码除了一些参数不同外,其它地方都大致相同。我们发现不同请求之间的区别在于:请求地址的不同、响应类型的不同,可能还会有额外请求参数的不同。我们可以将请求数据和响应数据进行封装,这样,只需要一个字段来标识每一次http请求属于哪一个业务就可以实现批量发送http请求,整个过程是异步非阻塞的,一旦获取到数据就会触发回调函数,进而获取到响应数据,最后再进行业务逻辑相关处理。



Ⅱ、RestTemplate简介

1、定义

RestTemplate是Spring3.0中出现的新类,其可以简化HTTP服务器通信,处理HTTP连接,使应用程序代码通过提供url和响应类型(可能的模板变量)便可提取结果。


2、方法

//get方法
//其中url为请求地址,responseType为响应类(需要自己依据响应格式来确定)
//urlVariables为数组变量
public <T> T getForObject(String url, Class<T> responseType, Object... urlVariables) throws RestClientException 

//urlVariables为Map类型变量,其中key为请求字段名,value为请求字段值
public <T> T getForObject(String url, Class<T> responseType, Map<String, ?> urlVariables)

public <T> T getForObject(URI url, Class<T> responseType) throws RestClientException

//ResponseEntity
public <T> ResponseEntity<T> getForEntity(String url, Class<T> responseType, Object... urlVariables) throws RestClientException

public <T> ResponseEntity<T> getForEntity(String url, Class<T> responseType, Map<String, ?> urlVariables) throws RestClientException

public <T> ResponseEntity<T> getForEntity(URI url, Class<T> responseType) throws RestClientException 

//post
public <T> T postForObject(String url, Object request, Class<T> responseType, Object... uriVariables) throws RestClientException

public <T> T postForObject(String url, Object request, Class<T> responseType, Map<String, ?> uriVariables) throws RestClientException

public <T> T postForObject(URI url, Object request, Class<T> responseType) throws RestClientException 

public <T> ResponseEntity<T> postForEntity(String url, Object request, Class<T> responseType, Object... uriVariables) throws RestClientException

public <T> ResponseEntity<T> postForEntity(String url, Object request, Class<T> responseType, Map<String, ?> uriVariables) throws RestClientException

public <T> ResponseEntity<T> postForEntity(URI url, Object request, Class<T> responseType) throws RestClientException

3、说明

Spring提供的RestTemplate可用于访问Rest服务的客户端,其提供了多种便捷访问远程Http服务的方法,能够大大提高客户端的编写效率,但其并没有实现异步调用的功能。下面将引入Spring4.0提供的AsyncRestTemplate,该类可实现异步非阻塞处理http请求。



Ⅲ、AsyncRestTemplate简介

1、定义

AsyncRestTemplate是在Spring4.0中对RestTemplate进行扩展产生的新类,其为客户端提供了异步http请求处理的一种机制,通过返回ListenableFuture对象生成回调机制,以达到异步非阻塞发送http请求。


2、方法

//get
public <T> ListenableFuture<ResponseEntity<T>> getForEntity(String url, Class<T> responseType, Object... uriVariables) throws RestClientException

public <T> ListenableFuture<ResponseEntity<T>> getForEntity(String url, Class<T> responseType, Map<String, ?> urlVariables) throws RestClientException

public <T> ListenableFuture<ResponseEntity<T>> getForEntity(URI url, Class<T> responseType) throws RestClientException

//post
public <T> ListenableFuture<ResponseEntity<T>> postForEntity(String url, HttpEntity<?> request, Class<T> responseType, Object... uriVariables) throws RestClientException

public <T> ListenableFuture<ResponseEntity<T>> postForEntity(String url, HttpEntity<?> request, Class<T> responseType, Map<String, ?> uriVariables) throws RestClientException 

public <T> ListenableFuture<ResponseEntity<T>> postForEntity(URI url, HttpEntity<?> request, Class<T> responseType) throws RestClientException 

3、说明

相比于RestTemplate,AsyncRestTemplate通过回调机制能够很好地异步处理多个http请求,使得客户端在主方法中不必等待服务器响应,而是继续执行后续代码,这样就较大地提高了代码的执行效率,减少响应时间。



Ⅳ、基于AsyncRestTemplate实现批量异步调用

下面将介绍基于AsyncRestTemplate异步调用的轻量级框架,说框架有点吹牛皮的感觉,不过代码结构整体上看起来还是挺清晰的,如有不妥之处,请提供宝贵建议。其主要分为5个部分:业务标识、请求、响应,异步调用、请求处理。对应的类如下所示:

业务标识:IEnum、UserEnum(具体业务标识)

请求:BaseRequest、UserRequest(具体业务请求)、ConcreateWapper(请求包装)

响应:BaseResponse、UserRequest(具体业务响应)

异步调用:Templete、AbstractTemplete、AsynClientTemplete、CommonListenableCallBack

请求处理:FutureTpDao


1、业务标识(使用枚举类来标识业务请求)

使用枚举类能够比较好地标识具体业务,但是枚举类无法继承,这里通过定义一个空的接口IEnum对其进行抽象。可能看起来会有所不妥,但是也算是一种解决方法吧。

//空的接口
package acync;

public interface IEnum {
}
//具体业务标识枚举类,实现了IEnum接口
//
public enum UserEnum implements IEnum {
    ADD,
    UPDATE,
    DELETE,
    MODIFY;
}

2、请求

通常情况下,客户端都是发送http请求(使用url的方式)来获取数据,这样,我们主需要获取请求的url地址即可。这里,定义接口BaseRequest提供build方法来构建请求接口,对于具体的业务请求只需实现接口并构建请求url即可。

//基础请求接口,提供构建URL方法
package acync;

public interface BaseRequest {
    public String build();
}
//具体的请求类,依据业务情况自行构建URL地址
package acync;

public class UserRequest implements BaseRequest {
    private static final String REQ_URL = "http://www.126.com";

    @Override
    public String build() {
        return REQ_URL;
    }

}

3、响应

对于请求响应这里也是定义抽象类BaseResponse,提供status来表示请求的响应状态,而具体的业务响应只需要实现抽象类,自定义实现即可。(其中,BaseResponse抽象类可依据具体的业务框架来定义实现)

//基础响应抽象类,提供状态码
package acync;

import java.io.Serializable;

public abstract class  BaseResponse implements Serializable{
    private String status;
}
//具体业务响应类
package acync;

public class UserResponse extends BaseResponse{
//TODO
}

4、异步调用

下面的所列代码是整个请求的核心代码。首先,定义模版接口,接口中只提供了若干主要方法,从整体上看,方法的参数为业务请求类和响应类型,返回值为泛型类型的ListenableFuture对象;其次,定义抽象类和具体的实现类;最后,进过请求处理即可获取请求接口。这里不累赘,见下方代码。

//异步调用模板接口
package acync;

import java.util.Map;

import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.ResponseEntity;
import org.springframework.util.concurrent.ListenableFuture;

public interface Templete {
    <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest, Class<T> responseType) throws Exception;

    <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest,
            ParameterizedTypeReference<T> responseType) throws Exception;

    <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest, Class<T> responseType,
            Map<String, ?> uriVariables) throws Exception;

    <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest,
            ParameterizedTypeReference<T> responseType, Map<String, ?> uriVariables) throws Exception;
}
//异步调用抽象类
//这里仅仅提供少量的调取方法,可以自行扩展

package acync;

import java.util.Map;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.client.AsyncRestTemplate;

public abstract class AbstractTemplete implements Templete{
    public AsyncRestTemplate asyncRestTemplate;

    @Override
    public <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest, Class<T> responseType)
            throws Exception {
        String url = baserequest.build();
        try {
            ListenableFuture<ResponseEntity<T>> t = asyncRestTemplate.getForEntity(url, responseType);
            return t;
        } catch (Exception e) {
            throw e;
        }
    }

    @Override
    public <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest,
            ParameterizedTypeReference<T> responseType) throws Exception {
        String url = baserequest.build();
        try {
            ListenableFuture<ResponseEntity<T>> t = asyncRestTemplate.exchange(url, HttpMethod.GET, null, responseType);
            return t;
        } catch (Exception e) {
            throw e;
        }
    }

    @Override
    public <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest, Class<T> responseType,
            Map<String, ?> uriVariables) throws Exception {
        String url = baserequest.build();
        ListenableFuture<ResponseEntity<T>> t = null;
        try {
            t = asyncRestTemplate.exchange(url, HttpMethod.GET, null, responseType, uriVariables);
            return t;
        } catch (Exception e) {
            throw e;
        }
    }

    @Override
    public <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest,
            ParameterizedTypeReference<T> responseType, Map<String, ?> uriVariables) throws Exception {
        String url = baserequest.build();
        ListenableFuture<ResponseEntity<T>> t = null;
        try {
            t = asyncRestTemplate.exchange(url, HttpMethod.GET, null, responseType, uriVariables);
            return t;
        } catch (Exception e) {
            throw e;
        }
    }

    abstract void setTemplete(AsyncRestTemplate asyncRestTemplate);

}
// 具体的异步调用实现类
package acync;

import java.util.Map;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.ResponseEntity;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.client.AsyncRestTemplate;

public class AsynClientTemplete extends AbstractTemplete {

    public AsynClientTemplete(AsyncRestTemplate template) {
        setTemplete(template);
    }

    public <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest, Class<T> responseType)
            throws Exception {
        return super.getAsyncForObject(baserequest, responseType);
    }

    public <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest,
            ParameterizedTypeReference<T> responseType) throws Exception {
        return super.getAsyncForObject(baserequest, responseType);
    }

    public <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest, Class<T> responseType,
            Map<String, ?> uriVariables) throws Exception {
        return super.getAsyncForObject(baserequest, responseType, uriVariables);
    }

    public <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest,
            ParameterizedTypeReference<T> responseType, Map<String, ?> uriVariables) throws Exception {
        return super.getAsyncForObject(baserequest, responseType, uriVariables);
    }

    @Override
    void setTemplete(AsyncRestTemplate template) {
        asyncRestTemplate = template == null ? new AsyncRestTemplate() : template;
    }

}

5、请求处理

上述四步都是为这一步做准备。请求处理这一步是请求的入口,在FutureTpDao中,通过getHttpData方法传入请求包装类ConcreateWapper,返回的Map对象Map<IEnum, Object>即为响应结果,只需依据具体的业务枚举类即可获取对应的业务请求数据。

//包装了具体的请求信息
//其中的每一个Concreate对应一个具体的请求,baseEnum对应业务标识,variables为请求的额外参数,request为请求类和响应类组成的map

package acync;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class ConcreateWapper {
    private List<Concreate> wrapper = new ArrayList<Concreate>();

    public ConcreateWapper(){}

    public void setParams(IEnum baseEnum, Map<String, ?> variables, Map<BaseRequest, ?> request) {
        wrapper.add(new Concreate(baseEnum, variables, request));
    }

    public List<Concreate> getWrapper() {
        return wrapper;
    }

    public static class Concreate {
        private IEnum baseEnum;
        private Map<String, ?> variables;
        private Map<BaseRequest, ?> request;

        public Concreate(IEnum baseEnum, Map<String, ?> variables, Map<BaseRequest, ?> request) {
            this.baseEnum = baseEnum;
            this.variables = variables;
            this.request = request;
        }

        public IEnum getBaseEnum() {
            return baseEnum;
        }

        public void setBaseEnum(IEnum baseEnum) {
            this.baseEnum = baseEnum;
        }

        public Map<String, ?> getVariables() {
            return variables;
        }

        public void setVariables(Map<String, ?> variables) {
            this.variables = variables;
        }

        public Map<BaseRequest, ?> getRequest() {
            return request;
        }

        public void setRequest(Map<BaseRequest, ?> request) {
            this.request = request;
        }
    }
}
//实现ListenableFutureCallback,实现回调功能
package acync;

import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.springframework.http.ResponseEntity;
import org.springframework.util.concurrent.ListenableFutureCallback;

public class CommonListenableCallBack<T> implements ListenableFutureCallback<T> {
    private IEnum type;
    private Map<IEnum, Object> resultValue;
    private volatile CountDownLatch latch;

    public CommonListenableCallBack(IEnum type, Map<IEnum, Object> resultValue, CountDownLatch latch) {
        this.type = type;
        this.resultValue = resultValue;
        this.latch = latch;
    }

    @Override
    public void onSuccess(T result) {
        ResponseEntity<T> re = (ResponseEntity<T>) result;
        if (re != null && re.getBody() != null) {
            T body = re.getBody();
            if (type != null) {
                resultValue.put(type, body);
            }
        }
        latch.countDown();
    }

    @Override
    public void onFailure(Throwable ex) {
        latch.countDown();
    }

}
//FutureTpDao的构造函数可以传入自定义的AsyncRestTemplate,不传的话就是默认的
//其中的getHttpData()方法传入多个请求的包装类ConcreateWapper,返回数据组成的Map
//其中Map中的key对应的是业务标识,value对应的是请求对应的结果类

package acync;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.client.AsyncRestTemplate;
import acync.ConcreateWapper.Concreate;

public class FutureTpDao {
    public AsynClientTemplete asynHttpClient;

    public FutureTpDao(){
        asynHttpClient = new AsynClientTemplete(null);
    }

    public FutureTpDao(AsyncRestTemplate tp) {
        asynHttpClient = new AsynClientTemplete(tp);
    }

    //获取数据
    public Map<IEnum, Object> getHttpData(ConcreateWapper wapper) {
        if (wapper == null)
            return new HashMap<IEnum, Object>();
        final CountDownLatch latch = new CountDownLatch(wapper.getWrapper().size());
        final Map<IEnum, Object> result = new HashMap<IEnum, Object>();

        if (wapper.getWrapper() != null) {
            for (final Concreate wp : wapper.getWrapper()) {
                try {
                    Map<BaseRequest, ?> requestMap = wp.getRequest();
                    for (final BaseRequest tpRequestInfo : requestMap.keySet()) {
                        getHttpdata(wp, tpRequestInfo, latch, requestMap, result);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            try {
                latch.await();
            } catch (Exception e) {
                throw new RuntimeException(e.getMessage());
            }
        }
        return result;
    }

   //发送http请求,获取请求结果
    private void getHttpdata(Concreate wp, BaseRequest tpRequestInfo, CountDownLatch latch,
            Map<BaseRequest, ?> requestMap, Map<IEnum, Object> result) throws Exception {
        ListenableFuture<?> statResponse = null;

        if (requestMap.get(tpRequestInfo) instanceof ParameterizedTypeReference<?>) {
            ParameterizedTypeReference<?> responseType = (ParameterizedTypeReference<?>) requestMap.get(tpRequestInfo);
            statResponse = asynHttpClient.getAsyncForObject(tpRequestInfo, responseType, wp.getVariables());
        } else if (requestMap.get(tpRequestInfo) instanceof Class<?>) {
            Class<?> responseType = (Class<?>) requestMap.get(tpRequestInfo);
            statResponse = asynHttpClient.getAsyncForObject(tpRequestInfo, responseType);
        } else {
            throw new RuntimeException("requestType error...");
        }

        addCallBack(statResponse, wp.getBaseEnum(), latch, result);
    }

    //增加回调
    private <T> void addCallBack(ListenableFuture<T> statResponse, IEnum baseEnum, CountDownLatch latch,
            Map<IEnum, Object> result) {
        if (statResponse != null) {
            statResponse.addCallback(new CommonListenableCallBack<T>(baseEnum, result, latch));
        }
    }
}

6、示例

package acync;

import java.util.HashMap;
import java.util.Map;

/**
 * 示例
 * 示例仅仅是一个样板,无法运行
 * 需要在web环境下运行,例如启动tomcat服务器并进行相关配置
 * @author liqqc
 *
 */
public class Demo {

    public static void main(String[] args) {
        ConcreateWapper wapper = new ConcreateWapper();

        Map<BaseRequest, Class<? extends BaseResponse>> request = new HashMap<BaseRequest, Class<? extends BaseResponse>>();
        request.put(new UserRequest(), new UserResponse().getClass());
        wapper.setParams(UserEnum.ADD, null, request);
        wapper.setParams(UserEnum.DELETE, null, request);
        wapper.setParams(UserEnum.UPDATE, null, request);
        wapper.setParams(UserEnum.MODIFY, null, request);

        FutureTpDao futureTpDao = new FutureTpDao();
        Map<IEnum, Object> futureData = futureTpDao.getHttpData(wapper);
        for (IEnum ienum : futureData.keySet()) {
            System.err.println(ienum + "=" + futureData.get(ienum));
        }
    }
}


Ⅴ、总结

本文提供了一种基于AsyncRestTemplate来实现批量请求处理的一种方法。整个框架的结构还是比较清晰,由于技术能力有限,若干地方可能考虑有所欠缺,还需进一步深入研究改进。不管怎样,希望本文对你有所帮助。如有疑问可以留言或邮件,谢谢。

时间: 2024-10-29 04:49:35

基于AsyncRestTemplate异步HTTP请求的一种轻量级技术实现的相关文章

异步发送请求的几种方式

asyncio: # import asyncio # def fun1(): # print('start') # yield from asyncio.sleep(1) # print('end') # # tasks=[ # fun1(),fun1() # ] # loop=asyncio.get_event_loop() # loop.run_until_complete(asyncio.gather(*tasks)) # loop.close() gevent:开启协程池 # ##协程

基于netty的异步http请求

package com.pt.utils; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.N

基于Apache+Tomcat负载均衡的两种实现方法

Apache+Tomcat实现负载均衡的两种实现方法 如果我们将工作在不同平台的apache能够实现彼此间的高效通信,因此它需要一种底层机制来实现--叫做apr Apr的主要目的就是为了其能够让apache工作在不同的平台上,但在linux上安装apache的时候通常都是默认安装的 [[email protected] ~]#rpm -qi aprName                 :apr                                        Relocation

基于Hiredis异步API的聊天系统实现

基于Hiredis异步API的聊天系统实现 上一篇文章http://blog.csdn.net/qq_34788352/article/details/51313027使用Hiredis的同步API实现了发送消息的客户端,当我使用同步API实现订阅频道客户端时,一旦订阅频道,就会出现无法操作的情况,这是就是同步和异步的问题.使用同步API,订阅频道后,客户端会进入阻塞状态,等待订阅频道发布的消息,不能实现既订阅频道,又能发布消息的功能.为了实现一个客户端既能订阅频道,又能发布消息的功能,就需要使

C# .Net FrameWork3.5中异步HTTP请求时,由于安全协议的问题System.Net.HttpWebRequest.EndGetResponse(IAsyncResult asyncResult)方法抛出“基础连接已经关闭: 发送时发生错误”的解决办法

现象描述: C# .Net FrameWork3.5中异步HTTP请求时,由于安全协议的问题System.Net.HttpWebRequest.EndGetResponse(IAsyncResult asyncResult)方法抛出“基础连接已经关闭: 发送时发生错误”. 原因分析: 大多数情况下是由于客户端所设置的HTTP访问请求的安全协议不符合服务器端的安全协议要求.比如,NASA提供瓦片服务的http://worldwind25.arc.nasa.gov/wms?service=WMS&v

解决 SharePoint 2010 拒绝访问爬网内容源错误的小技巧(禁用环回请求的两种方式)

这里有一条解决在SharePoint 2010搜索爬网时遇到的“拒绝访问错误”的小技巧. 首先要检查默认内容访问帐户是否具有相应的访问权限,或者添加一条相应的爬网规则.如果目标资源库是一个SharePoint库,验证一下该帐号是否具有对该SharePoint web应用程序具有至少“完全读取”的权限. 当我在升级上来的SharePoint环境中对我新建的博客URL进行爬网时遇到了这个错误. 这个错误发生在当你运行Windows 2008 R2和SharePoint 2010并且爬网进程试图访问一

WebAPI GET和POST请求的几种方式

GET请求 1.无参数get请求 一般get请求有两种写法,一种是$.get()   一种是$.ajax({type:"get"}),我个人比较喜欢用后者. 下面例子主要是get无参数请求,获取人员列表并将数据返回到前台,并将返回的json数据转换成字符串弹出,以便知道是否请求成功,并且返回的数据是否正确 1 $.ajax({ 2 url: "/api/Person/getList", 3 type: "get", 4 success: func

[gevent源码分析] c-ares异步DNS请求

c-ares是异步DNS请求库,libcurl,libevent,wireshark都使用了c-ares,gevent1.0版本前使用的是libevent, 所以它的DNS请求也是使用c-ares,1.0版本后使用cython封装了c-ares. c-ares官方文档,http://c-ares.haxx.se/docs.html. gevent中DNS默认使用的是线程池版本的,可通过设置GEVENT_RESOLVER=ares环境变量使用c-ares异步库. 如何证明的确是异步呢,试着跑一遍你

springMVC项目异步处理请求的错误Async support must be enabled on a servlet and for all filters involved in async

从github上down下来一个项目,springMVC-chat.作者全是用的注解,也就是零配置. 这可苦了我,经过千辛万苦,最终集成到现在的项目中有一点样子了,结果报出来下面的错误.红色部分.解决方法为,在web.xml中也就是springMVC的总配置文件中加上一句话: <async-supported>true</async-supported> 这句话的位置一定要放正确,否则,一切都是徒劳.至于配置spring异步支持(其实是配置servlet异步支持)的放置位置见下图.