ZBUS--轻量级MQ、RPC、服务总线

ZBUS = MQ + RPC + PROXY

  • 支持消息队列, 发布订阅, RPC, 代理(TCP/DMZ)
  • 亿级消息堆积能力、支持HA高可用
  • 单个Jar包无依赖 ~300K
  • 服务代理 -- 适配改造已有业务系统,使之具备跨平台与语言
  • 丰富的API--JAVA/C/C++/C#/Python/Node.JS多语言接入

zbus-dist选择zbus.sh或者zbus.bat直接执行

总线默认占用 15555 端口, http://localhost:15555 可以直接进入监控,注意zbus因为原生兼容HTTP协议所以监控与消息队列使用同一个端口

ZBUS 角色概要

ZBUS 消息通讯基础(NET模块)

ZBUS项目不依赖其他第三方库,消息通讯基于NIO完成(NET子项目)。NET包对NIO做了简洁的封装,相对Netty而言,学习成本低几个量级,模型简单,但不失扩展性。

框架结构保持 Dispatcher + N SelectorThread + IoAdaptor

Dispatcher 负责管理N个Selector线程

SelectorThread 负责NIO读写事件分发

IoAdaptor 个性化读写事件

基于NET的服务器程序基本只要关心IoAdaptor的个性化,比如ZBUS入口就是MqAdaptor

ZBUS API

ZBUS PROXY

ZBUS 示例

Java Maven 依赖

<dependency>
    <groupId>org.zbus</groupId>
    <artifactId>zbus</artifactId>
    <version>6.2.6</version>
</dependency>

生产者

public static void main(String[] args) throws Exception {
    //创建Broker代理
    BrokerConfig config = new BrokerConfig();
    config.setServerAddress("127.0.0.1:15555");
    final Broker broker = new SingleBroker(config);

    Producer producer = new Producer(broker, "MyMQ");
    producer.createMQ(); // 如果已经确定存在,不需要创建

    //创建消息,消息体可以是任意binary,应用协议交给使用者
    Message msg = new Message();
    msg.setBody("hello world");
    producer.sendSync(msg);  

    broker.close();
}

消费者

public static void main(String[] args) throws Exception{
    //创建Broker代表
    BrokerConfig brokerConfig = new BrokerConfig();
    brokerConfig.setServerAddress("127.0.0.1:15555");
    Broker broker = new SingleBroker(brokerConfig);

    MqConfig config = new MqConfig();
    config.setBroker(broker);
    config.setMq("MyMQ");

    //创建消费者
    @SuppressWarnings("resource")
    Consumer c = new Consumer(config);  

    c.onMessage(new MessageHandler() {
        @Override
        public void handle(Message msg, Session sess) throws IOException {
            System.out.println(msg);
        }
    });

    //启动消费线程
    c.start();   

}

RPC动态代理【各类复杂类型】

参考源码test目下的rpc部分

    //1)创建Broker代表(可用高可用替代)
    BrokerConfig config = new BrokerConfig();
    config.setServerAddress("127.0.0.1:15555");
    Broker broker = new SingleBroker(config);

    //2)创建基于MQ的Invoker以及Rpc工厂,指定RPC采用的MQ为MyRpc
    MqInvoker invoker = new MqInvoker(broker, "MyRpc");
    RpcFactory factory = new RpcFactory(invoker); 

    //3) 动态代理出实现类
    Interface hello = factory.getService(Interface.class);

    test(hello);  

    broker.close();

Spring集成--服务端(RPC示例)

无任何代码侵入使得你已有的业务接口接入到zbus,获得跨平台和多语言支持

<!-- 暴露的的接口实现示例 -->
<bean id="interface" class="org.zbus.rpc.biz.InterfaceImpl"></bean>

<bean id="serviceProcessor" class="org.zbus.rpc.RpcProcessor">
    <constructor-arg>
        <list>
            <!-- 放入你需要的暴露的的接口 -->
            <ref bean="interface"/>
        </list>
    </constructor-arg>
</bean>

<bean id="broker" class="org.zbus.broker.SingleBroker">
    <constructor-arg>
        <bean class="org.zbus.broker.BrokerConfig">
            <property name="serverAddress" value="127.0.0.1:15555" />
            <property name="maxTotal" value="20"/>
            <!-- 这里可以增加连接池参数配置,不配置使用默认值(参考commons-pool2) -->
        </bean>
    </constructor-arg>
</bean>

<!-- 默认调用了start方法,由Spring容器直接带起来注册到zbus总线上 -->
<bean id="myrpcService" class="org.zbus.rpc.mq.Service" init-method="start">
    <constructor-arg>
        <bean class="org.zbus.rpc.mq.ServiceConfig">
            <!-- 支持多总线注册 -->
            <constructor-arg>
                <list>
                    <ref bean="broker"/>
                </list>
            </constructor-arg>
            <property name="mq" value="MyRpc"/>
            <property name="consumerCount" value="2"/>
            <property name="messageProcessor" ref="serviceProcessor"/>
        </bean>
    </constructor-arg>
</bean>

Spring集成--客户端

<bean id="broker" class="org.zbus.broker.SingleBroker">
    <constructor-arg>
        <bean class="org.zbus.broker.BrokerConfig">
            <property name="serverAddress" value="127.0.0.1:15555" />
        </bean>
    </constructor-arg>
</bean>

<bean id="myrpc" class="org.zbus.rpc.RpcFactory">
    <constructor-arg>
        <bean class="org.zbus.rpc.mq.MqInvoker">
            <constructor-arg ref="broker"/>
            <constructor-arg value="MyRpc"/>
        </bean>
    </constructor-arg>
</bean>

<bean id="interface" factory-bean="myrpc" factory-method="getService">
    <constructor-arg type="java.lang.Class" value="org.zbus.rpc.biz.Interface"/>
</bean>

Spring完成zbus代理透明化,zbus设施从你的应用逻辑中彻底消失

public static void main(String[] args) {
    ApplicationContext context = new ClassPathXmlApplicationContext("SpringRpcClient.xml");

    Interface intf = (Interface) context.getBean("interface");
    for(int i=0;i<100;i++){
        System.out.println(intf.listMap());
    }
}

ZBUS消息协议

时间: 2024-11-04 16:47:07

ZBUS--轻量级MQ、RPC、服务总线的相关文章

【转】Spring mvc集成ZBUS--轻量级MQ、RPC、服务总线

本文转自:http://www.cnblogs.com/top15from/p/4899954.html ZBUS = MQ + RPC + PROXY 支持消息队列, 发布订阅, RPC, 代理(TCP/DMZ) 亿级消息堆积能力.支持HA高可用 单个Jar包无依赖 ~300K 服务代理 -- 适配改造已有业务系统,使之具备跨平台与语言 丰富的API--JAVA/C/C++/C#/Python/Node.JS多语言接入 zbus-dist选择zbus.sh或者zbus.bat直接执行 总线默认

消息队列、服务总线 zbus

轻量级服务总线/消息队列 1)多种消息模式--支持生产者/消费者,发布订阅,RPC 2)丰富的API--C/C++/C#/JAVA/Python/Node.JS跨平台.多语言支持 3)开放协议标准--原生兼容HTTP协议(长连接),头部动态扩展 4)支持TrackServer与ZbusServer高可用横向动态扩容机制 ZBUS追求极度轻量级,<200K 发行jar包(从早期的基于ZeroMQ C实现演化为JAVA NIO实现),不依赖任何其他包:高度可扩展(异步通讯NIO,Remoting,日

SOA实践之基于服务总线的设计

在上文中,主要介绍了SOA的概念,什么叫做“服务”,“服务”应该具备哪些特性.本篇中,我将介绍SOA的一种很常见的设计实践--基于服务总线的设计. 基于服务总线的设计 基于总线的设计,借鉴了计算机内部硬件组成的设计思想(通过总线传输数据).在分布式系统中,不同子系统之间需要实现相互通信和远程调用,比较直接的方式就是“点对点”的通信方式,但是这样会暴露出一些很明显的问题:系统之间紧密耦合.配置和引用混乱.服务调用关系错综复杂.难以统一管理.异构系统之间存在不兼容等.而基于总线的设计,正是为了解决上

服务总线

丁码农 SOA实践之基于服务总线的设计 在上文中,主要介绍了SOA的概念,什么叫做“服务”,“服务”应该具备哪些特性.本篇中,我将介绍SOA的一种很常见的设计实践--基于服务总线的设计. 基于服务总线的设计 基于总线的设计,借鉴了计算机内部硬件组成的设计思想(通过总线传输数据).在分布式系统中,不同子系统之间需要实现相互通信和远程调用,比较直接的方式就是“点对点”的通信方式,但是这样会暴露出一些很明显的问题:系统之间紧密耦合.配置和引用混乱.服务调用关系错综复杂.难以统一管理.异构系统之间存在不

【转】轻量级分布式 RPC 框架

第一步:编写服务接口 第二步:编写服务接口的实现类 第三步:配置服务端 第四步:启动服务器并发布服务 第五步:实现服务注册 第六步:实现 RPC 服务器 第七步:配置客户端 第八步:实现服务发现 第九步:实现 RPC 代理 第十步:发送 RPC 请求 总结 附录:Maven 依赖 RPC,即 Remote Procedure Call(远程过程调用),说得通俗一点就是:调用远程计算机上的服务,就像调用本地服务一样. RPC 可基于 HTTP 或 TCP 协议,Web Service 就是基于 H

轻量级分布式 RPC 框架

原文出处: 黄勇 RPC,即 Remote Procedure Call(远程过程调用),说得通俗一点就是:调用远程计算机上的服务,就像调用本地服务一样. RPC 可基于 HTTP 或 TCP 协议,Web Service 就是基于 HTTP 协议的 RPC,它具有良好的跨平台性,但其性能却不如基于 TCP 协议的 RPC.会两方面会直接影响 RPC 的性能,一是传输方式,二是序列化. 众所周知,TCP 是传输层协议,HTTP 是应用层协议,而传输层较应用层更加底层,在数据传输方面,越底层越快,

一个轻量级分布式RPC框架--NettyRpc

1.背景 最近在搜索Netty和Zookeeper方面的文章时,看到了这篇文章<轻量级分布式 RPC 框架>,作者用Zookeeper.Netty和Spring写了一个轻量级的分布式RPC框架.花了一些时间看了下他的代码,写的干净简单,写的RPC框架可以算是一个简易版的dubbo.这个RPC框架虽小,但是麻雀虽小,五脏俱全,有兴趣的可以学习一下. 项目地址:https://github.com/luxiaoxun/NettyRpc 自己花了点时间整理了下代码,并修改一些问题,以下是自己学习的一

RSF 分布式 RPC 服务框架的分层设计

RSF 是个什么东西? 一个高可用.高性能.轻量级的分布式服务框架.支持容灾.负载均衡.集群.一个典型的应用场景是,将同一个服务部署在多个Server上提供 request.response 消息通知.使用RSF可以点对点调用,也可以分布式调用.部署方式上:可以搭配注册中心,也可以独立使用. 渊源 RSF 的核心思想参考了淘宝HSF.Dubbo 等优秀框架.功能上大体相似,但是实现逻辑完全不同.因此没有什么历史包袱.总的来说对比淘宝HSF少了历史包袱,相比Dubbo更加轻量化.而且还支持了虚拟机

SOA之基于服务总线的设计

在上文中,主要介绍了SOA的概念,什么叫做"服务","服务"应该具备哪些特性.本篇中,我将介绍SOA的一种很常见的设计实践--基于服务总线的设计. 基于服务总线的设计 基于总线的设计,借鉴了计算机内部硬件组成的设计思想(通过总线传输数据).在分布式系统中,不同子系统之间需要实现相互通信和远程调用,比较直接的方式就是"点对点"的通信方式,但是这样会暴露出一些很明显的问题:系统之间紧密耦合.配置和引用混乱.服务调用关系错综复杂.难以统一管理.异构系统