Apache Ignite 改装(一) -- 服务异步化支持

本文假设读者了解Apache Ignite,阅读过ignite service grid的官方文档,或使用过ignite的service grid,本文同样假设读者了解 java的CompletionStage的相关用法。本文涉及的ignite版本为2.4.0。

使用Apache Ignite的Service grid作为微服务开发框架, 通常是如下定义和实现Service的:

服务接口:
public interface MyService {
    public String sayHello(String to);
}

本文将实现如下样式的Service,使其异步化:

异步化的服务接口:
public interface MyServiceAsync {
    public CompletionStage<String> sayHello(String to);
}

当前ignite对上边这样的异步的service方法并没有remote支持。当调用端与服务部署再同一节点时,ignite会发起一个本地方法调用,这样是没有问题的,但是当服务部署端与调用端在不同节点时,ignite通过发起一个distributed task,将调用通过消息方式发布到服务部署节点,由于服务实现是异步的,通常来说,会返回一个未完成状态的CompletionStage,后续当真正complete的时候,调用端的CompletionStage并不会被notify,即调用端永远无法得到真正的调用结果。
为了能够支持CompletionStage的远程状态专递,我们需要对ignite进行如下改动:

org/apache/ignite/internal/processors/service/GridServiceProxy.java
...
// line 192
if(CompletionStage.class.isAssignableFrom(mtd.getReturnType())) {
    CompletableFuture<Object> cs = new CompletableFuture<>();
    //call async and notify completion stage
    ctx.closure().callAsyncNoFailover(
        GridClosureCallMode.BROADCAST,
        new ServiceProxyCallable(mtd.getName(), name, mtd.getParameterTypes(), args),
            Collections.singleton(node),
            false,
            waitTimeout,
            true).listen(f -> {
                if(f.error() != null) {
                    cs.completeExceptionally(f.error());
                }else if(f.isCancelled()) {
                    cs.cancel(false);
                }
                if(f.isDone()) {
                    try {
                        Object result = f.get();
                        if(result != null && IgniteException.class.isAssignableFrom(result.getClass())) {
                            cs.completeExceptionally((IgniteException)result);
                        }else {
                            cs.complete(f.get());
                        }
                    } catch (IgniteCheckedException e) {
                        cs.completeExceptionally(e);
                    }
                }
            });
    return cs;
}
...

这段代码做了如下的事情:检测当服务方法返回值是一个CompletionStage的时候,则创建一个CompletableFuture作为代理对象返回给调用端。随后监听服务的远程调用的结果,并且用这个结果来更新这个CompletableFuture。到这里,调用端的service proxy的改造就完成了。接下来,我们还需要改造服务节点这一端:

org/apache/ignite/internal/processors/job/GridJobWorker.java(line 618起的这个finally块),改造前:
finally {
    // Finish here only if not held by this thread.
  if (!HOLD.get())
        finishJob(res, ex, sndRes);
    else
    // Make sure flag is not set for current thread.
    // This may happen in case of nested internal task call with continuation.
    HOLD.set(false);
    ctx.job().currentTaskSession(null);
    if (reqTopVer != null)
        GridQueryProcessor.setRequestAffinityTopologyVersion(null);
    }
}
改造后:
finally {
    if(res != null && CompletionStage.class.isAssignableFrom(res.getClass())) {
        final boolean sendResult = sndRes;
        final IgniteException igException = ex;
        @SuppressWarnings("unchecked")
        CompletionStage<Object> cs = (CompletionStage<Object>)res;
        cs.exceptionally(t->{
            return new IgniteException(t);
        }).thenAccept(r->{
            if (!HOLD.get()) {
                IgniteException e = igException;
                finishJob(r, e, sendResult);
            } else
            // Make sure flag is not set for current thread.
            // This may happen in case of nested internal task call with continuation.
                HOLD.set(false);
            ctx.job().currentTaskSession(null);
            if (reqTopVer != null)
                GridQueryProcessor.setRequestAffinityTopologyVersion(null);
        });
    } else {
        // Finish here only if not held by this thread.
        if (!HOLD.get())
            finishJob(res, ex, sndRes);
        else
        // Make sure flag is not set for current thread.
        // This may happen in case of nested internal task call with continuation.
            HOLD.set(false);
        ctx.job().currentTaskSession(null);
        if (reqTopVer != null)
            GridQueryProcessor.setRequestAffinityTopologyVersion(null);
    }
}

这里做的事情是:当在服务部署节点上拿到执行结果的时候,如果发现服务返回结果是一个CompletionStage,那么处理这个CompletionStage的exceptionally和thenAccept, 把结果发送给remote的调用端。

就这样,通过简单的改装,我们使ignite有了处理异步服务方法调用的能力。下边我们实现一个服务来看看改装结果:

服务定义与实现:
import java.util.concurrent.CompletionStage;
import org.apache.ignite.services.Service;
public interface MyService extends Service {
    public CompletionStage<String> sayHelloAsync(String to);
    public String sayHelloSync(String to);
}
import java.util.concurrent.CompletionStage;
public class MyServiceImpl implements MyService {
    private ScheduledExecutorService es;
    @Override public void init(ServiceContext ctx) throws Exception {
        es = Executors.newSingleThreadScheduledExecutor();
    }
    @Override public CompletionStage<String> sayHelloAsync(String to){
      CompletableFuture<String> ret = new CompletableFuture<>();
        //return "async hello $to" after 3 secs
        es.schedule(()->ret.complete("async hello " + to), 3, TimeUnit.SECONDS);
        return ret;
    }
    @Override public String sayHelloSync(String to){
        return "sync hello " + to;
    }
    ...
}

然后将服务部署在Service grid中:

...
ServiceConfiguration sConf = new ServiceConfiguration();
sConf.setName("myservice.version.1");
sConf.setService(new MyServiceImpl());
sConf.setMaxPerNodeCount(2);
sConf.setTotalCount(4);
ignite.services().deploy(sConf);
...

然后启动一个客户端节点进行服务调用:

MyService service = ignite.services().serviceProxy("myservice.version.1",  MyService.class, false);
//test async service
service.sayHelloAsync("nathan").thenAccept(r->{
    System.out.println(r);
});
//test sync service
System.out.println(service.sayHelloSync("nathan"));
...

输出结果:

sync hello nathan
async hello nathan

可以看到先输出了sync的结果,大约3秒后输出async的结果。

原文地址:http://blog.51cto.com/12274728/2097094

时间: 2024-11-01 13:42:09

Apache Ignite 改装(一) -- 服务异步化支持的相关文章

使用消息队列异步化系统

使用消息队列异步化系统 基于Spring与ActiveMQ的配置实现方案 前言 前期为了快速开发,项目结构较为混乱,代码维护与功能扩展都比较困难,为了方便后续功能开发,最近对项目进行的重构,顺便在重构的过程中将之前的部分操作进行了异步处理,也第一次实际接触了JMS与消息队列.项目中采用的消息中间件为ActiveMQ. 什么是JMS Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分

分布式数据库缓存系统Apache Ignite

Apache Ignite内存数据组织是高性能的.集成化的以及分布式的内存平台,他可以实时地在大数据集中执行事务和计算,和传统的基于磁盘或者闪存的技术相比,性能有数量级的提升. 将数据存储在缓存中能够显著地提高应用的速度,因为缓存能够降低数据在应用和数据库中的传输频率.Apache Ignite允许用户将常用的热数据储存在内存中,它支持分片和复制两种方式,让开发者可以均匀地将数据分布式到整个集群的主机上.同时,Ignite还支撑任何底层存储平台,不管是RDBMS.NoSQL,又或是HDFS. 在

apache ignite系列(一): 简介

apache-ignite简介(一) 1,简介 ? ignite是分布式内存网格的一种实现,其基于java平台,具有可持久化,分布式事务,分布式计算等特点,此外还支持丰富的键值存储以及SQL语法(基于h2引擎),可以看成是一个分布式内存数据库. 与ignite类似的产品有gemfire(12306目前正在使用),其开源版为geode.与gemfire相比,ignite对sql的支持比较完善,提供了数据并置来提升性能,还有对分布式事物的支持以及对spring的集成都比较友好,很方便进行嵌入式集成进

异步化,高并发大杀器

聊聊如何让项目异步化的一些事. 1.同步和异步,阻塞和非阻塞 同步和异步,阻塞和非阻塞, 这个几个词已经是老生常谈,当时常常还是有很多同学分不清楚,以为同步肯定就是阻塞,异步肯定就是非阻塞,其他他们不是一回事. 同步和异步关注的是结果消息的通信机制 同步:同步的意思就是调用方需要主动等待结果的返回异步:异步的意思就是不需要主动等待结果的返回,而是通过其他手段比如,状态通知,回调函数等.阻塞和非阻塞主要关注的是等待结果返回调用方的状态 阻塞:是指结果返回之前,当前线程被挂起,不做任何事非阻塞:是指

Dubbo 2.7新特性之异步化改造

这是why技术的第1篇原创文章 我与Dubbo的二三事 我是2016年毕业的,在我毕业之前,我在学校里面学到的框架都是SSH,即struts+spring+hibernate,是的你没有看错,在大学里面的课本里面学的是strusts,这个还没毕业就被基本抛弃的框架.然而我大四出去实习,用的技术是SSM,即Spring,SpringMVC,Mybatis.实习的时候做的项目都是外包项目,非常传统的单体大项目,和学校里面做课程设计一样,所有的功能包括前后端都糅合在一个项目里面,根本不知道什么是分布式

Apache Ignite剖析

1.概述 Apache Ignite和Apache Arrow很类似,属于大数据范畴中的内存分布式管理系统.在<Apache Arrow 内存数据>中介绍了Arrow的相关内容,它统一了大数据领域各个生态系统的数据格式,避免了序列化和反序列化所带来的资源开销(能够节省80%左右的CPU资源).今天来给大家剖析下Apache Ignite的相关内容. 2.内容 Apache Ignite是一个以内存为中心的数据平台,具有强一致性.高可用.强大的SQL.K/V以及其所对应的应用接口(API).结构

Sentinel 发布0.2.0,异步调用支持、热点参数限流等成产品新亮点

Sentinel 是阿里中间件团队开源的,面向分布式服务架构的轻量级流量控制组件,主要以流量为切入点,从流量控制.熔断降级.系统负载保护等多个维度来帮助用户保护服务的稳定性. 近日,Sentinel 0.2.0 正式发布.作为一个重要的里程碑版本,Sentinel 0.2.0 释放了多项产品新特性,如 异步调用支持.热点参数限流 等,并包括了大量的体验优化与 bug 修复.下面我们来看一下 Sentinel 0.2.0 的重要新特性. 异步调用支持 未来各种 RPC 框架.Web 框架都朝着异步

Apache Ignite上的TensorFlow

任何深度学习都是从数据开始的,这是关键点.没有数据,就无法训练模型,也无法评估模型质量,更无法做出预测,因此,数据源非常重要.在做研究.构建新的神经网络架构.以及做实验时,会习惯于使用最简单的本地数据源,通常是不同格式的文件,这种方法确实非常有效.但有时需要更加接近于生产环境,那么简化和加速生产数据的反馈,以及能够处理大数据就变得非常重要,这时就需要Apache Ignite大展身手了. Apache Ignite是以内存为中心的分布式数据库.缓存,也是事务性.分析性和流式负载的处理平台,可以实

Apache Ignite——新一代数据库缓存系统

Apache Ignite是一个通用的数据库缓存系统,它不仅支持所有的底层数据库系统,比如RDBMS.NoSQL和HDFS,还支持Write-Through和Read-Through.Write-Behind Caching等可选功能. Apache Ignite是一个聚焦分布式内存计算的开源项目,它在内存中储存数据,并分布在多个节点上以提供快速数据访问.此外,可选地将数据同步到缓存层同样是一大优势.最后,可以支持任何底层数据库存储同样让 Ignite成为数据库缓存的首先.