java 异步查询转同步多种实现方式:循环等待,CountDownLatch,Spring Even

异步转同步

业务需求

有些接口查询反馈结果是异步返回的,无法立刻获取查询结果。

  • 正常处理逻辑

触发异步操作,然后传递一个唯一标识。

等到异步结果返回,根据传入的唯一标识,匹配此次结果。

  • 如何转换为同步

正常的应用场景很多,但是有时候不想做数据存储,只是想简单获取调用结果。

即想达到同步操作的结果,怎么办呢?

思路

  1. 发起异步操作
  2. 在异步结果返回之前,一直等待(可以设置超时)
  3. 结果返回之后,异步操作结果统一返回

循环等待

  • LoopQuery.java

使用 query(),将异步的操作 remoteCallback() 执行完成后,同步返回。

public class LoopQuery implements Async {

    private String result;

    private static final Logger LOGGER = LogManager.getLogger(LoopQuery.class.getName());

    @Override
    public String query(String key) {
        startQuery(key);
        new Thread(new Runnable() {
            @Override
            public void run() {
                remoteCallback(key);
            }
        }).start();

        final String queryResult = endQuery();
        LOGGER.info("查询结果: {}", queryResult);
        return queryResult;
    }

    /**
     * 开始查询
     * @param key 查询条件
     */
    private void startQuery(final String key) {
        LOGGER.info("执行查询: {}", key);
    }

    /**
     * 远程的回调是等待是随机的
     *
     * @param key 查询条件
     */
    private void remoteCallback(final String key) {
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.result = key + "-result";
        LOGGER.info("remoteCallback set result: {}", result);
    }

    /**
     * 结束查询
     * @return 返回结果
     */
    private String endQuery() {
        while (true) {
            if (null == result) {
                try {
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                return result;
            }
        }
    }
}
  • main()
public static void main(String[] args) {
    new LoopQuery().query("12345");
}
  • 测试结果
18:14:16.491 [main] INFO  com.github.houbb.thread.learn.aysnc.loop.LoopQuery - 执行查询: 12345
18:14:21.498 [Thread-1] INFO  com.github.houbb.thread.learn.aysnc.loop.LoopQuery - remoteCallback set result: 12345-result
18:14:21.548 [main] INFO  com.github.houbb.thread.learn.aysnc.loop.LoopQuery - 查询结果: 12345-result

CountDownLatch

  • AsyncQuery.java

使用 CountDownLatch 类达到同步的效果。

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class AsyncQuery {

    private static final Logger LOGGER = LogManager.getLogger(AsyncQuery.class.getName());

    /**
     * 结果
     */
    private String result;

    /**
     * 异步转同步查询
     * @param key
     */
    public void asyncQuery(final String key) {
        final CountDownLatch latch = new CountDownLatch(1);
        this.startQuery(key);

        new Thread(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("远程回调线程开始");
                remoteCallback(key, latch);
                LOGGER.info("远程回调线程结束");
            }
        }).start();

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        this.endQuery();
    }

    private void startQuery(final String key) {
        LOGGER.info("执行查询: {}", key);
    }

    /**
     * 远程的回调是等待是随机的
     * @param key
     */
    private void remoteCallback(final String key, CountDownLatch latch) {
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.result = key + "-result";
        latch.countDown();
    }

    private void endQuery() {
        LOGGER.info("查询结果: {}", result);
    }

}
  • main()
public static void main(String[] args) {
    AsyncQuery asyncQuery = new AsyncQuery();
    final String key = "123456";
    asyncQuery.asyncQuery(key);
}
  • 日志
18:19:12.714 [main] INFO  com.github.houbb.thread.learn.aysnc.countdownlatch.AsyncQuery - 执行查询: 123456
18:19:12.716 [Thread-1] INFO  com.github.houbb.thread.learn.aysnc.countdownlatch.AsyncQuery - 远程回调线程开始
18:19:17.720 [main] INFO  com.github.houbb.thread.learn.aysnc.countdownlatch.AsyncQuery - 查询结果: 123456-result
18:19:17.720 [Thread-1] INFO  com.github.houbb.thread.learn.aysnc.countdownlatch.AsyncQuery - 远程回调线程结束

Spring EventListener

使用观察者模式也可以。(对方案一的优化)

此处结合 spring 进行使用。

  • BookingCreatedEvent.java

定义一个传输属性的对象。

public class BookingCreatedEvent extends ApplicationEvent {

    private static final long serialVersionUID = -1387078212317348344L;

    private String info;

    public BookingCreatedEvent(Object source) {
        super(source);
    }

    public BookingCreatedEvent(Object source, String info) {
        super(source);
        this.info = info;
    }

    public String getInfo() {
        return info;
    }
}
  • BookingService.java

说明:当 this.context.publishEvent(bookingCreatedEvent); 触发时,
会被 @EventListener 指定的方法监听到。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Service
public class BookingService {

    @Autowired
    private ApplicationContext context;

    private volatile BookingCreatedEvent bookingCreatedEvent;

    /**
     * 异步转同步查询
     * @param info
     * @return
     */
    public String asyncQuery(final String info) {
        query(info);

        new Thread(new Runnable() {
            @Override
            public void run() {
                remoteCallback(info);
            }
        }).start();

        while(bookingCreatedEvent == null) {
            //.. 空循环
            // 短暂等待。
            try {
                TimeUnit.MILLISECONDS.sleep(1);
            } catch (InterruptedException e) {
                //...
            }
            //2. 使用两个单独的 event...

        }

        final String result = bookingCreatedEvent.getInfo();
        bookingCreatedEvent = null;
        return result;
    }

    @EventListener
    public void onApplicationEvent(BookingCreatedEvent bookingCreatedEvent) {
        System.out.println("监听到远程的信息: " + bookingCreatedEvent.getInfo());
        this.bookingCreatedEvent = bookingCreatedEvent;
        System.out.println("监听到远程消息后: " + this.bookingCreatedEvent.getInfo());
    }

    /**
     * 执行查询
     * @param info
     */
    public void query(final String info) {
        System.out.println("开始查询: " + info);
    }

    /**
     * 远程回调
     * @param info
     */
    public void remoteCallback(final String info) {
        System.out.println("远程回调开始: " + info);

        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 重发结果事件
        String result = info + "-result";
        BookingCreatedEvent bookingCreatedEvent = new BookingCreatedEvent(this, result);
        //触发event
        this.context.publishEvent(bookingCreatedEvent);
    }
}
  • 测试方法
@RunWith(SpringJUnit4Cla***unner.class)
@ContextConfiguration(classes = SpringConfig.class)
public class BookServiceTest {

    @Autowired
    private BookingService bookingService;

    @Test
    public void asyncQueryTest() {
        bookingService.asyncQuery("1234");
    }

}
  • 日志
2018-08-10 18:27:05.958  INFO  [main] com.github.houbb.spring.lean.core.ioc.event.BookingService:84 - 开始查询:1234
2018-08-10 18:27:05.959  INFO  [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:93 - 远程回调开始:1234
接收到信息: 1234-result
2018-08-10 18:27:07.964  INFO  [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:73 - 监听到远程的信息: 1234-result
2018-08-10 18:27:07.964  INFO  [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:75 - 监听到远程消息后: 1234-result
2018-08-10 18:27:07.964  INFO  [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:106 - 已经触发event
2018-08-10 18:27:07.964  INFO  [main] com.github.houbb.spring.lean.core.ioc.event.BookingService:67 - 查询结果: 1234-result
2018-08-10 18:27:07.968  INFO  [Thread-1] org.springframework.context.support.GenericApplicationContext:993 - Closing [email protected]ee5251: startup date [Fri Aug 10 18:27:05 CST 2018]; root of context hierarchy

超时和空循环

空循环

空循环会导致 cpu 飙升

while(true) {
}
  • 解决方式
while(true) {
    // 小睡即可
    TimeUnit.sleep(1);
}

超时编写

不可能一直等待反馈,可以设置超时时间。

/**
 * 循环等待直到获取结果
 * @param key key
 * @param timeoutInSeconds 超时时间
 * @param <T> 泛型
 * @return 结果。如果超时则抛出异常
 */
public <T> T loopWaitForValue(final String key, long timeoutInSeconds) {
    long startTime = System.nanoTime();
    long deadline = startTime + TimeUnit.SECONDS.toNanos(timeoutInSeconds);
    //1. 如果没有新回调,或者 key 对应元素不存在。则一直循环
    while(ObjectUtil.isNull(map.get(key))) {
        try {
            TimeUnit.MILLISECONDS.sleep(5);
        } catch (InterruptedException e) {
            LOGGER.warn("Loop meet InterruptedException, just ignore it.", e);
        }
        // 超时判断
        long currentTime = System.nanoTime();
        if(currentTime >= deadline) {
            throw new BussinessException(ErrorCode.READ_TIME_OUT);
        }
    }
    final T target = (T) map.get(key);
    LOGGER.debug("loopWaitForValue get value:{} for key:{}", JSON.toJSON(target), key);
    //2. 获取到元素之后,需要移除掉对应的值
    map.remove(key);
    return target;
}

代码地址

loop

countdownlatch

spring-event-listener

原文地址:http://blog.51cto.com/9250070/2157446

时间: 2024-10-13 02:10:57

java 异步查询转同步多种实现方式:循环等待,CountDownLatch,Spring Even的相关文章

Java异步调用转同步的5种方式

1.异步和同步的概念 同步调用:调用方在调用过程中,持续等待返回结果. 异步调用:调用方在调用过程中,不直接等待返回结果,而是执行其他任务,结果返回形式通常为回调函数. 2 .异步转为同步的概率 需要在异步调用过程中,持续阻塞至获得调用结果. 3.异步调用转同步的5种方式 1.使用wait和notify方法 2.使用条件锁 3.Future 4.使用CountDownLatch 5.使用CyclicBarrier 4.构造一个异步调用模型. 我们主要关心call方法,这个方法接收了一个demo参

转载:java 异步机制与同步机制的区别

出自:http://blog.itpub.net/17074730/viewspace-563262/ 所谓异步输入输出机制,是指在进行输入输出处理时,不必等到输入输出处理完毕才返回.所以异步的同义语是非阻塞(None Blocking). 网上有很多网友用很通俗的比喻  把同步和异步讲解的很透彻 转过来 举个例子:普通B/S模式(同步)AJAX技术(异步)                同步:提交请求->等待服务器处理->处理完毕返回   这个期间客户端浏览器不能干任何事          

总结“异步复位,同步释放”

复位的功能是很必要的,让一切正在处于工作状态的器件的状态恢复到初始态,可以达到重新开始工作的作用.复位有上电复位和按键复位两种常见方式. 先说一下按键复位. 一开始,我们在设计按键复位的逻辑功能时,第一反应就是利用D触发器的异步清零端(clr端),这种方式称为异步复位,代码和RTL图如下: 1 [email protected](posedge clk or negedge rst_n) 2 begin 3 if(rst_n == 1'b0) 4 q <= 1'b0; 5 else 6 q <

单例模式,多种实现方式JAVA

转载请注明出处:http://cantellow.iteye.com/blog/838473 第一种(懒汉,线程不安全): Java代码 public class Singleton { private static Singleton instance; private Singleton (){} public static Singleton getInstance() { if (instance == null) { instance = new Singleton(); } retu

异步上传文件多种方式归纳

最近在做异步上传文件的工作,用到了一些库,这里归纳下,暂且不考虑异常处理,仅作为demo. 1.不用任何插件,利用iframe,将form的taget设为iframe的name,注意设为iframe的id是没用的,跟网上很多说的不太一致 iframe_upload.htm <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtm

Jquery_异步上传文件多种方式归纳

1.不用任何插件,利用iframe,将form的taget设为iframe的name,注意设为iframe的id是没用的,跟网上很多说的不太一致 iframe_upload.htm <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> <html xmlns=&

c# socket 通信 同步的实现方式和异步实现方式的思考

socket通信的套路,无论是同步的方式或者异步的实现方式,套路总是不变的,那就是服务端开启一个线程监听客户端socket,客户端创建一个socket去连接服务端的监听端口,服务端接收这个客户端socket,在开辟一个线程负责与客户端线程通信(send receive 数据),这里有个误区,并不是监听socket与客户端socket进行通信,监听socket只是负责接收客户端连接请求,最后接收到的socket与客户端socket进行通信. 关于同步和异步的主要区别在哪里? Accept Rece

Java高并发之同步异步

1.概念理解: 2.同步的解决方案: 1).基于代码 synchronized 关键字 修饰普通方法:作用于当前实例加锁,进入同步代码前要获得当前实例的锁. 修饰静态方法:作用于当前类对象加锁,进入同步代码前要获得当前类对象的锁. 修饰代码块:指定加锁对象,对给定对象加锁,进入同步代码块前要获得给定对象的锁. code1 package com.thread; import java.util.concurrent.ExecutorService; import java.util.concur

Java异步NIO框架Netty实现高性能高并发

1. 背景 1.1. 惊人的性能数据 近期一个圈内朋友通过私信告诉我,通过使用Netty4 + Thrift压缩二进制编解码技术,他们实现了10W TPS(1K的复杂POJO对象)的跨节点远程服务调用.相比于传统基于Java序列化+BIO(同步堵塞IO)的通信框架.性能提升了8倍多. 其实,我对这个数据并不感到吃惊,依据我5年多的NIO编程经验.通过选择合适的NIO框架,加上高性能的压缩二进制编解码技术,精心的设计Reactor线程模型,达到上述性能指标是全然有可能的. 以下我们就一起来看下Ne