分布式异步消息框架构建笔记4——分布是消息路由

上一篇实现了消息的自动路由,这边写了一个小测试,大家可以猜一下运行输出结果是什么?

 public class RouterTest
    {
        public static void DoRouterTest()
        {
            var contextA = Context.Creat("A");
            var contextB = Context.Creat("B");
            contextA.RegServiceClass(typeof (MyClassA));
            contextB.RegServiceClass(typeof (MyClassB));
            Context.CreatProxyAndRun(MyClassB.FunB, (r) =>
            {

                Console.WriteLine("FunB CallBack Run in " + Thread.CurrentThread.ManagedThreadId);
            },100);

        }

        public class MyClassA
        {
            public static int FunA(int x)
            {
                Console.WriteLine("FunA Run in " + Thread.CurrentThread.ManagedThreadId);

                return x + 10;
            }
        }

        public class MyClassB
        {
            public static IEnumerable<int> FunB(int x)
            {
                Console.WriteLine("FunB Run in " + Thread.CurrentThread.ManagedThreadId);

                Context.Wating(MyClassA.FunA, (r) =>
                {
                    Console.WriteLine("FunA CallBackRun in " + Thread.CurrentThread.ManagedThreadId);

                    x = r;
                }, x);
                yield return -1;

                yield return x - 1;
            }
        }
    }

以上是跨线程的一个分布式的实现,那么我们如何做到跨域、跨进程和跨服务器呢。

从以上代码可以看出,传递进去的参数有回调函数,这个是不可序列化传递的,参数和调用方法之类的只要可序列化都是没问题的。

先看一下基础的消息处理状态:(晚点可能上个流程图)

1.创建一个Request请求,包含方法,参数,以及回调,这事消息状态为Wating,丢入当前队列

2.事件处理器收到请求后,执行对应的目标方法,把结果包装成一个WatingCallback状态的待处理对象

2.1如果对象是Yield对像,则包装成Yield状态的待处理对象

3.当消息处理器发现这个待处理对象是Yield状态,执行MoveNext

4.如果采用Wating方法则把当前句柄的待处理对象变成YieldWating状态

5.YieldWating会不断的把自己丢入队列,直到当前句柄被释放,重新变成Yield状态

6.当MoveNext不可用,也就是执行完毕,那么这个结果状态会变成WatingCallback

7.WatingCallback状态的对象,执行回调后,完成生命周期,状态变成Dead

然后是跨线程的流程与基本流程的区别:(红色表示)

1.创建一个Request请求,包含方法,参数,以及回调,这事消息状态为Wating,丢入事件路由,时间路由会寻找指定的Context发布

2.事件处理器收到请求后,执行对应的目标方法,把结果包装成一个WatingCallback状态的待处理对象

2.1如果对象是Yield对像,则包装成Yield状态的待处理对象

3.当消息处理器发现这个待处理对象是Yield状态,执行MoveNext

4.如果采用Wating方法则把当前句柄的待处理对象变成YieldWating状态

5.YieldWating会不断的把自己丢入队列,直到当前句柄被释放,重新变成Yield状态

6.当MoveNext不可用,也就是执行完毕,那么这个结果状态会变成WatingCallback

7.WatingCallback状态的对象,检查事件id,如果不为本Context处理,则丢入事件路由,路由会寻找指定的Context发布

7.1当本事件ID的
WatingCallback状态的对象,执行回调后,完成生命周期,状态变成Dead

调用区别并不是很大,那么用网络传递也变得比较简单了

(由于用到了YieldWating状态,远程传递只能用IEnumerable接口,这个有点不满!!)

1.创建一个Request请求,包含方法,参数,以及回调,这事消息状态为Wating,丢入事件路由,时间路由会寻找指定的Context发布

1.1本地创建一个YieldWating状态,并且把Callback保存起来.

1.2当收到事件RemoteWatingCallBack,移除YieldWating句柄,并把结果传入YieldWating回调;

1.3接着执行YieldWating.

2.远程事件处理器收到请求后,执行对应的目标方法,把结果包装成一个WatingCallback状态的待处理对象

2.1如果对象是Yield对像,则包装成Yield状态的待处理对象

3.当消息处理器发现这个待处理对象是Yield状态,执行MoveNext

4.如果采用Wating方法则把当前句柄的待处理对象变成YieldWating状态

5.YieldWating会不断的把自己丢入队列,直到当前句柄被释放,重新变成Yield状态

6.当MoveNext不可用,也就是执行完毕,那么这个结果状态会变成WatingCallback

7.WatingCallback状态的对象,检查事件id,如果不为本Context处理,则丢入事件路由,路由会寻找指定的Context发布

7.1 当WatingCallBack为远程调用时,把结果包装成RemoteWatingCallBack事件,发布.

7.2当本事件ID的
WatingCallback状态的对象,执行回调后,完成生命周期,状态变成Dead

大致流程就是:

1.本地创建等待回调句柄,等待远程返回.

2.远程执行完成后,发布远程回调事件.

最后,看起来和实现起来都不怎么复杂的样子.

时间: 2024-10-07 03:31:50

分布式异步消息框架构建笔记4——分布是消息路由的相关文章

分布式异步消息框架构建笔记5——如何避开并行编程中的数据共享陷阱

任何多线程/并行/分布式都会面临一个问题,"数据状态共享". 有经验的开发者会说,要想正确有效的避开避开状态共享,那么就应该别用任何状态共享. 虽然不得不说,这是一个不错的建议,但是没有状态共享,你需要如何才能知道非本地数据的状态? 也许你会说使用消息,使用消息来处理,那么我们丑陋的回调金字塔应该叠的更高了. 不得不说这是一个解决办法,但是为了保持状态不被修改,那么我们还得在远程申请一个写入锁,防止数据被别的任务所修改. 那么流程就是 申请锁->请求某个消息状态->释放锁

基于Dubbo框架构建分布式服务 (二)

Dubbo是Alibaba开源的分布式服务框架,我们可以非常容易地通过Dubbo来构建分布式服务,并根据自己实际业务应用场景来选择合适的集群容错模式,这个对于很多应用都是迫切希望的,只需要通过简单的配置就能够实现分布式服务调用,也就是说服务提供方(Provider)发布的服务可以天然就是集群服务,比如,在实时性要求很高的应用场景下,可能希望来自消费方(Consumer)的调用响应时间最短,只需要选择Dubbo的Forking Cluster模式配置,就可以对一个调用请求并行发送到多台对等的提供方

[转载] 基于Dubbo框架构建分布式服务

转载自http://shiyanjun.cn/archives/1075.html Dubbo是Alibaba开源的分布式服务框架,我们可以非常容易地通过Dubbo来构建分布式服务,并根据自己实际业务应用场景来选择合适的集群容错模式,这个对于很多应用都是迫切希望的,只需要通过简单的配置就能够实现分布式服务调用,也就是说服务提供方(Provider)发布的服务可以天然就是集群服务,比如,在实时性要求很高的应用场景下,可能希望来自消费方(Consumer)的调用响应时间最短,只需要选择Dubbo的F

基于Dubbo框架构建分布式服务 【转】

Dubbo是Alibaba开源的分布式服务框架,我们可以非常容易地通过Dubbo来构建分布式服务,并根据自己实际业务应用场景来选择合适的集群容错模式,这个对于很多应用都是迫切希望的,只需要通过简单的配置就能够实现分布式服务调用,也就是说服务提供方(Provider)发布的服务可以天然就是集群服务,比如,在实时性要求很高的应用场景下,可能希望来自消费方(Consumer)的调用响应时间最短,只需要选择Dubbo的Forking Cluster模式配置,就可以对一个调用请求并行发送到多台对等的提供方

基于Dubbo框架构建分布式服务

Dubbo是Alibaba开源的分布式服务框架,我们可以非常容易地通过Dubbo来构建分布式服务,并根据自己实际业务应用场景来选择合适的集群容错模式,这个对于很多应用都是迫切希望的,只需要通过简单的配置就能够实现分布式服务调用,也就是说服务提供方(Provider)发布的服务可以天然就是集群服务,比如,在实时性要求很高的应用场景下,可能希望来自消费方(Consumer)的调用响应时间最短,只需要选择Dubbo的Forking Cluster模式配置,就可以对一个调用请求并行发送到多台对等的提供方

Python开发【模块】:Celery 分布式异步消息任务队列

Celery 前言: Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子: 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情. 你想做一个定时任务,比如每天检测一下你们所有客户的

spring框架搭建笔记

◆简介 目的:解决企业应用开发的复杂性 功能:使用基本的JavaBean代替EJB,并提供了更多的企业应用功能 范围:任何Java应用 Spring 框架是一个分层架构,由 7 个定义良好的模块组成.Spring 模块构建在核心容器之上,核心容器定义了创建.配置和管理 bean 的方式. 组成 Spring 框架的每个模块(或组件)都可以单独存在,或者与其他一个或多个模块联合实现.每个模块的功能如下: ? 核心容器:核心容器提供 Spring 框架的基本功能.核心容器的主要组件是 BeanFac

Java异步NIO框架Netty实现高性能高并发

1. 背景 1.1. 惊人的性能数据 近期一个圈内朋友通过私信告诉我,通过使用Netty4 + Thrift压缩二进制编解码技术,他们实现了10W TPS(1K的复杂POJO对象)的跨节点远程服务调用.相比于传统基于Java序列化+BIO(同步堵塞IO)的通信框架.性能提升了8倍多. 其实,我对这个数据并不感到吃惊,依据我5年多的NIO编程经验.通过选择合适的NIO框架,加上高性能的压缩二进制编解码技术,精心的设计Reactor线程模型,达到上述性能指标是全然有可能的. 以下我们就一起来看下Ne

RSF 分布式 RPC 服务框架的分层设计

RSF 是个什么东西? 一个高可用.高性能.轻量级的分布式服务框架.支持容灾.负载均衡.集群.一个典型的应用场景是,将同一个服务部署在多个Server上提供 request.response 消息通知.使用RSF可以点对点调用,也可以分布式调用.部署方式上:可以搭配注册中心,也可以独立使用. 渊源 RSF 的核心思想参考了淘宝HSF.Dubbo 等优秀框架.功能上大体相似,但是实现逻辑完全不同.因此没有什么历史包袱.总的来说对比淘宝HSF少了历史包袱,相比Dubbo更加轻量化.而且还支持了虚拟机