JGroups 入门实践

前言

JGroups是一个开源的纯java编写的可靠的群组通讯工具。其工作模式基于IP多播,但可以在可靠性和群组成员管理上进行扩展。其结构上设计灵活,提供了一种灵活兼容多种协议的协议栈。

JGroups 多线程的方式实现了多个协议之间的协同工作,常见工作线程有心跳检测,诊断等等。

JGroups实现多机器之间的通信一般都会包含维护群组状态、群组通信协议、群组数据可靠性传输这样的一些主题。

JGroups群组的各个节点是存在"管理节点"的,至少可以说某个节点提供了在一段时间内维护状态信息和消息可靠性检测的功能(一般是最先启动的节点)。

目前Jboss、Ecache的分布式缓存是基于Groups通信。

若JGroups通信基于Udp,则可能需要开启机器上UDP相关的设置,比如Open udp。

温馨提示:JGroups各个协议相关的配置文件都可以从JGroups-x.x.x.Final.jar中找到。

JGroups 资料

http://www.jgroups.org/tutorial/index.html(官网)

http://sourceforge.net/projects/javagroups/(JGroups工程&讨论组(Discussion))

JGroups 入门示例

1,节点通信(tcp/ip,udp)方式.

2,通道和消息传送.

3,节点状态同步.

tcp/ip与udp协议

通常我们都知道tcp和udp最大的区别在于可靠性,tcp是基于可靠连接的传输,udp则属非连接,具体可参考百度百科(http://baike.baidu.com/view/1161229.htm?fr=aladdin)。

JGroups当中,udp是比较推荐的通信方式,其特点是不需要知道另一个节点的ip,通过多播网络发现就可以“找到”相应的节点,而tcp则需要在配置文件中固定配置。

示例代码(之后的测试基于tcp,因为不同机器的测试由于udp端口的问题未成功)

tcp配置文件network-tcp.xml

<!--
    TCP based stack, with flow control and message bundling. This is usually used when IP
    multicasting cannot be used in a network, e.g. because it is disabled (routers discard multicast).
    Note that TCP.bind_addr and TCPPING.initial_hosts should be set, possibly via system properties, e.g.
    -Djgroups.bind_addr=192.168.5.2 and -Djgroups.tcpping.initial_hosts=192.168.5.2[7800]
    author: Bela Ban
-->
<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.3.xsd">
    <TCP bind_addr="192.168.19.112"
         bind_port="7800"
         loopback="false"
         recv_buf_size="${tcp.recv_buf_size:5M}"
         send_buf_size="${tcp.send_buf_size:640K}"
         max_bundle_size="64K"
         max_bundle_timeout="30"
         use_send_queues="true"
         sock_conn_timeout="300"

         timer_type="new3"
         timer.min_threads="4"
         timer.max_threads="10"
         timer.keep_alive_time="3000"
         timer.queue_max_size="500"

         thread_pool.enabled="true"
         thread_pool.min_threads="1"
         thread_pool.max_threads="10"
         thread_pool.keep_alive_time="5000"
         thread_pool.queue_enabled="false"
         thread_pool.queue_max_size="100"
         thread_pool.rejection_policy="discard"

         oob_thread_pool.enabled="true"
         oob_thread_pool.min_threads="1"
         oob_thread_pool.max_threads="8"
         oob_thread_pool.keep_alive_time="5000"
         oob_thread_pool.queue_enabled="false"
         oob_thread_pool.queue_max_size="100"
         oob_thread_pool.rejection_policy="discard"/>

    <TCPPING timeout="3000"
             initial_hosts="${jgroups.tcpping.initial_hosts:192.168.19.112[7800],192.168.19.112[7801]}"
             port_range="1"
             num_initial_members="10"/>
    <MERGE2  min_interval="10000"
             max_interval="30000"/>
    <FD_SOCK/>
    <FD timeout="3000" max_tries="3" />
    <VERIFY_SUSPECT timeout="1500"  />
    <BARRIER />
    <pbcast.NAKACK2 use_mcast_xmit="false"
                   discard_delivered_msgs="true"/>
    <UNICAST3 />
    <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
                   max_bytes="4M"/>
    <pbcast.GMS print_local_addr="true" join_timeout="3000"

                view_bundling="true"/>
    <MFC max_credits="2M"
         min_threshold="0.4"/>
    <FRAG2 frag_size="60K"  />
    <!--RSVP resend_interval="2000" timeout="10000"/-->
    <pbcast.STATE_TRANSFER/>
</config>

udp配置文件network-udp.xml

<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.5.xsd">

    <UDP
         mcast_addr="${jgroups.udp.mcast_addr:235.5.5.5}"
         mcast_port="${jgroups.udp.mcast_port:45588}"
         tos="8"
         ucast_recv_buf_size="20M"
         ucast_send_buf_size="640K"
         mcast_recv_buf_size="25M"
         mcast_send_buf_size="640K"
         loopback="true"
         max_bundle_size="64K"
         max_bundle_timeout="30"
         ip_ttl="${jgroups.udp.ip_ttl:2}"
         enable_diagnostics="true"
         thread_naming_pattern="cl"

         timer_type="new"
         timer.min_threads="4"
         timer.max_threads="10"
         timer.keep_alive_time="3000"
         timer.queue_max_size="500"

         thread_pool.enabled="true"
         thread_pool.min_threads="2"
         thread_pool.max_threads="8"
         thread_pool.keep_alive_time="5000"
         thread_pool.queue_enabled="true"
         thread_pool.queue_max_size="10000"
         thread_pool.rejection_policy="discard"

         oob_thread_pool.enabled="true"
         oob_thread_pool.min_threads="1"
         oob_thread_pool.max_threads="8"
         oob_thread_pool.keep_alive_time="5000"
         oob_thread_pool.queue_enabled="false"
         oob_thread_pool.queue_max_size="100"
         oob_thread_pool.rejection_policy="Run"/>

    <PING timeout="2000" num_initial_members="3"/>
    <MERGE2 max_interval="30000" min_interval="10000"/>
    <FD_SOCK/>
    <FD_ALL/>
    <VERIFY_SUSPECT timeout="1500"  />
    <BARRIER />
    <pbcast.NAKACK use_mcast_xmit="true"
                   retransmit_timeout="300,600,1200"
                   discard_delivered_msgs="true"/>

    <pbcast.STABLE stability_delay="1000"
                   desired_avg_gossip="50000"
                   max_bytes="4M"/>
    <pbcast.GMS print_local_addr="true"
                print_physical_addrs="true"
                join_timeout="3000"
                view_bundling="true"
                max_join_attempts="3"/>

    <UFC max_credits="2M" min_threshold="0.4"/>
    <MFC max_credits="2M" min_threshold="0.4"/>
    <FRAG2 frag_size="60K"  />
    <pbcast.STATE_TRANSFER />

</config>

数据节点Node.java

package org.wit.ff;

import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.log4j.Logger;
import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.jgroups.util.Util;

/**
 *
 * <pre>
 * 节点.
 * </pre>
 *
 * @author F.Fang
 * @version $Id: CacheNode.java, v 0.1 2014年10月17日 上午5:27:11 F.Fang Exp $
 */
public class Node extends ReceiverAdapter {

    private final static Logger LOG = Logger.getLogger(Node.class);

    /**
     * 配置文件.
     */
    private static final String CONFIG_XML = "network-tcp.xml";

    /**
     * 集群名称.
     */
    private static final String CLUSTER_NAME = "FF";

    /**
     * 节点通道.
     */
    private JChannel channel = null;

    /**
     * 以此作为节点间初始化的同步数据.
     */
    private Map<String, String> cacheData = new HashMap<String, String>();

    private ReentrantLock lock = new ReentrantLock();

    public Node() {
        InputStream is = this.getClass().getClassLoader().getResourceAsStream(CONFIG_XML);
        try {
            channel = new JChannel(is);
            channel.setReceiver(this);
            channel.connect(CLUSTER_NAME);            channel.getState(null,50000);
        } catch (Exception e) {
            LOG.error("启动节点异常!", e);
            // 最好是自定义RuntimeException!
            throw new RuntimeException("启动节点异常!", e);
        }
    }

    /**
     *
     * <pre>
     * 发送消息给目标地址.
     * </pre>
     *
     * @param dest
     *            为空表示发给所有节点.
     * @param textMsg
     *            消息.
     */
    public void sendMsg(Address dest, Object textMsg) {
        Message msg = new Message(dest, null, textMsg);
        try {
            channel.send(msg);
        } catch (Exception e) {
            LOG.error("消息发送失败!", e);
            // 应自定异常,最好是自定义Exception类型!
            throw new RuntimeException("消息发送失败!", e);
        }
    }

    @Override
    public void getState(OutputStream output) throws Exception {
        //cacheData过大可能会造成节点的状态同步时间过长.     lock.lock();        try {             Util.objectToStream(state, new DataOutputStream(output));        }catch(Exception e){             throw e;        }finally{             lock.unlock();        }
    }

    @Override
    public void receive(Message msg) {
        //当前节点不接收自己发送到通道当中的消息.
        if (msg.getSrc().equals(channel.getAddress())) {
            return;
        }
        LOG.info(msg.getObject());
    }

    @Override
    public void setState(InputStream input) throws Exception {
        lock.lock();
        try {
            @SuppressWarnings("unchecked")
            Map<String, String> cacheData = (Map<String, String>) Util.objectFromStream(new DataInputStream(input));
            this.cacheData.putAll(cacheData);
        } catch (Exception e) {
            LOG.error("从主节点同步状态到当前节点发生异常!", e);
        } finally {
           lock.unlock();
        }

    }

    @Override
    public void viewAccepted(View view) {
        LOG.info("当前成员[" + this.channel.getAddressAsString() + "]");
        LOG.info(view.getCreator());
        LOG.info(view.getMembers());        LOG.info("当前节点数据:" + cacheData);    }
/**
     *
     * <pre>
     * 提供一个简单的初始化数据的方法.
     * </pre>
     *
     */
    public void addData(String key,String val){
        if(key!=null&&!key.isEmpty()){
            cacheData.put(key, val);
        }
    }
}

实例节点1 Node1.java

package org.wit.ff;

import java.util.Scanner;
import java.util.concurrent.TimeUnit;

import org.wit.ff.Node;

/**
 *
 * <pre>
 * tcp模式下:
 * 如果是同一台机器测试,请注意在
 * TCPPING 元素下修改 initial_hosts的配置端口:
 * 例如:"${jgroups.tcpping.initial_hosts:192.168.19.100[7800],192.168.19.100[7801]}
 * 如果是多台机器测试,请注意在
 * TCPPING 元素下修改 initial_hosts的ip,端口随意:
 * 例如:"${jgroups.tcpping.initial_hosts:192.168.19.100[7800],192.168.19.178[7800]}
 *
 * udp模式下:
 * 同一台机器的不同端口(端口是动态的)可通信.
 * 不同机器之间的ip多播可能会受到一些因素限制而造成节点之间无法彼此发现.
 * </pre>
 *
 * @author F.Fang
 * @version $Id: Node1.java, v 0.1 2014年10月15日 上午5:31:32 F.Fang Exp $
 */
public class Node1 {

    public static void main(String[] args) {
        Node node = new Node();
        node.addData("hello", "world");
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 使用控制台发送消息给Node2.
        Scanner scanner = new Scanner(System.in);
        while(true){
            String text = scanner.next();
            if("exit".equals(text)){
                break;
            }
            node.sendMsg(null,"hello "+text+",node2!");
        }

    }

}

实例节点2 Node2.java

package org.wit.ff;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;

/**
 *
 * <pre>
 * tcp模式下:
 * 如果是同一台机器测试,请注意在
 * TCPPING 元素下修改 initial_hosts的配置端口:
 * 例如:"${jgroups.tcpping.initial_hosts:192.168.19.100[7800],192.168.19.100[7801]}
 * 如果是多台机器测试,请注意在
 * TCPPING 元素下修改 initial_hosts的ip,端口随意:
 * 例如:"${jgroups.tcpping.initial_hosts:192.168.19.100[7800],192.168.19.178[7800]}
 *
 * @author F.Fang
 * @version $Id: Node2.java, v 0.1 2014年10月15日 上午5:31:44 F.Fang Exp $
 */
public class Node2 {

    public static void main(String[] args) {
        Node node = new Node();
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 使用控制台发送消息给Node1.
        Scanner scanner = new Scanner(System.in);
        while (true) {
            String text = scanner.next();
            if ("exit".equals(text)) {
                break;
            }
            node.sendMsg(null,"hello " + text + ",node1!");
        }

    }

}

测试Case

启动Node1,Node1平稳后启动Node2。

Node1运行信息如下:

DEBUG Configurator                   - set property TCP.diagnostics_addr to default value /ff0e:0:0:0:0:0:75:75

-------------------------------------------------------------------
GMS: address=DSH07fFang-18185, cluster=FF, physical address=192.168.19.112:7800
-------------------------------------------------------------------
DEBUG NAKACK2                        -
[DSH07fFang-18185 setDigest()]
existing digest:  []
new digest:       DSH07fFang-18185: [0 (0)]
resulting digest: DSH07fFang-18185: [0 (0)]
DEBUG GMS                            - DSH07fFang-18185: installing view [DSH07fFang-18185|0] (1) [DSH07fFang-18185]
DEBUG STABLE                         - resuming message garbage collection
DEBUG FD_SOCK                        - VIEW_CHANGE received: [DSH07fFang-18185]
INFO  Node                           - 当前成员[DSH07fFang-18185]
INFO  Node                           - DSH07fFang-18185
INFO  Node                           - [DSH07fFang-18185]
INFO  Node                           - 当前节点数据:{}
DEBUG STABLE                         - resuming message garbage collection
DEBUG GMS                            - created cluster (first member). My view is [DSH07fFang-18185|0], impl is org.jgroups.protocols.pbcast.CoordGmsImpl
DEBUG STABLE                         - suspending message garbage collection
DEBUG STABLE                         - DSH07fFang-18185: resume task started, max_suspend_time=33000
DEBUG GMS                            - DSH07fFang-18185: installing view [DSH07fFang-18185|1] (2) [DSH07fFang-18185, DSH07fFang-2882]
DEBUG FD_SOCK                        - VIEW_CHANGE received: [DSH07fFang-18185, DSH07fFang-2882]
INFO  Node                           - 当前成员[DSH07fFang-18185]
INFO  Node                           - DSH07fFang-18185
INFO  Node                           - [DSH07fFang-18185, DSH07fFang-2882]
INFO  Node                           - 当前节点数据:{hello=world}
DEBUG FD_SOCK                        - ping_dest is DSH07fFang-2882, pingable_mbrs=[DSH07fFang-18185, DSH07fFang-2882]
DEBUG STABLE                         - resuming message garbage collection
DEBUG FD                             - DSH07fFang-18185: sending are-you-alive msg to DSH07fFang-2882
DEBUG FD                             - DSH07fFang-18185: sending are-you-alive msg to DSH07fFang-2882
DEBUG FD                             - DSH07fFang-18185: sending are-you-alive msg to DSH07fFang-2882
DEBUG FD                             - DSH07fFang-18185: sending are-you-alive msg to DSH07fFang-2882
DEBUG FD                             - DSH07fFang-18185: sending are-you-alive msg to DSH07fFang-2882
DEBUG FD                             - DSH07fFang-18185: sending are-you-alive msg to DSH07fFang-2882

主要包括ip通信信息、状态、心跳等等。

Node2运行消息如下:

DEBUG Configurator                   - set property TCP.diagnostics_addr to default value /ff0e:0:0:0:0:0:75:75

-------------------------------------------------------------------
GMS: address=DSH07fFang-2882, cluster=FF, physical address=192.168.19.112:7801
-------------------------------------------------------------------
DEBUG GMS                            - DSH07fFang-2882: sending JOIN(DSH07fFang-2882) to DSH07fFang-18185
DEBUG NAKACK2                        -
[DSH07fFang-2882 setDigest()]
existing digest:  []
new digest:       DSH07fFang-18185: [0 (0)], DSH07fFang-2882: [0 (0)]
resulting digest: DSH07fFang-18185: [0 (0)], DSH07fFang-2882: [0 (0)]
DEBUG GMS                            - DSH07fFang-2882: installing view [DSH07fFang-18185|1] (2) [DSH07fFang-18185, DSH07fFang-2882]
DEBUG FD_SOCK                        - VIEW_CHANGE received: [DSH07fFang-18185, DSH07fFang-2882]
INFO  Node                           - 当前成员[DSH07fFang-2882]
INFO  Node                           - DSH07fFang-18185
INFO  Node                           - [DSH07fFang-18185, DSH07fFang-2882]
INFO  Node                           - 当前节点数据:{hello=world}
DEBUG FD_SOCK - ping_dest is DSH07fFang-18185, pingable_mbrs=[DSH07fFang-18185, DSH07fFang-2882] DEBUG FD - DSH07fFang-2882: sending are-you-alive msg to DSH07fFang-18185 DEBUG FD - DSH07fFang-2882: sending are-you-alive msg to DSH07fFang-18185

节点之间存在通信和状态同步,可以通过控制台输入发送消息的命令观察节点变化。

时间: 2024-10-18 05:53:13

JGroups 入门实践的相关文章

JGroups 入门实践(转)

前言 JGroups是一个开源的纯java编写的可靠的群组通讯工具.其工作模式基于IP多播,但可以在可靠性和群组成员管理上进行扩展.其结构上设计灵活,提供了一种灵活兼容多种协议的协议栈. JGroups 多线程的方式实现了多个协议之间的协同工作,常见工作线程有心跳检测,诊断等等. JGroups实现多机器之间的通信一般都会包含维护群组状态.群组通信协议.群组数据可靠性传输这样的一些主题. JGroups群组的各个节点是存在"管理节点"的,至少可以说某个节点提供了在一段时间内维护状态信息

scrapy入门实践1

Scrapy是一个为了爬取网站数据,提取结构性数据而编写的应用框架. 可以应用在包括数据挖掘,信息处理或存储历史数据等一系列的程序中. 这就是整个Scrapy的架构图了: 各部件职能: Scrapy Engine: 这是引擎,负责Spiders.ItemPipeline.Downloader.Scheduler中间的通讯,信号.数据传递等 Scheduler(调度器): 它负责接受引擎发送过来的requests请求,并按照一定的方式进行整理排列,入队.并等待Scrapy Engine(引擎)来请

VS单元测试入门实践教程

摘要:本教程不会介绍单元测试的基本理论知识,也不会和大家讨论在实际项目中是否需要写单元测试代码的问题.但是如果你此时想使用VS中的单元测试的工具来测试某个方法是否正确,可你又从来没真正实践过,那么本教程将带你一步一步使用VS2010集成的Unit Test进行断言(Assert)式验证数据的正确性,及代码覆盖率的查看. 关键词:Unit Test.单元测试.代码覆盖率.Assert.Twifly 正文: 在本入门教程中我们假设来测试一个简单的加法运算方法是否正确.该方法能够接受任意个参数,参数类

Docker快速入门实践-纯干货文章

Docker快速入门实践-纯干货文章,如果细看还能发现讲解视频呦!小伙伴们赶紧猛戳吧! Docker快速入门实践-纯干货文章 老男孩教育2016启用最新的官方博文地址: http://blog.oldboyedu.com/ 欢迎小伙伴们收藏关注,干货连连!

sass、less和stylus的安装使用和入门实践

刚 开始的时候,说实话,我很反感使用css预处理器这种新玩意的,因为其中涉及到了编程的东西,私以为很复杂,而且考虑到项目不是一天能够完成的,也很少是 一个人完成的,对于这种团队的项目开发,前端实践用css预处理器来合作,是一种很痛苦,即使不痛苦那也是需要花费非常多的时间来协调合作上的.对于预处 理器的态度,目前是本着学习新技术和推动css向前进的思想来学习新玩意.下面这篇文章来自w3cplus,这是一篇非常强大的文章,私以为互联网上介绍这方面知识的就属这篇文章是鼻祖了. 经过了这篇文章的学习,我

【实战】Docker入门实践二:Docker服务基本操作 和 测试Hello World

操作环境 操作系统:CentOS7.2 内存:1GB CPU:2核 Docker服务常用命令 docker服务操作命令如下 service docker start #启动服务 service docker stop  #停止服务 service docker restart #重启服务 service docker status   #查看服务状态 启动Docker服务 docker是一个CS模型,需要先启动服务端,直接执行 sudo service docker start 启动docker

docker入门实践教程 -date: 20191108

docker入门实践教程 -date: 2019108 1.docker架构 Docker 是一个开源的应用容器引擎,基于 Go 语言 并遵从 Apache2.0 协议开源. Docker 从 17.03 版本之后分为 CE(Community Edition: 社区版) 和 EE(Enterprise Edition: 企业版) 容器非常适合持续集成和持续交付(CI / CD)工作流程.Docker 的可移植性和轻量级的特性,还可以使您轻松地完成动态管理的工作负担,并根据业务需求指示,实时扩展

机器学习入门实践——线性回归&非线性回归&mnist手写体识别

把一本<白话深度学习与tensorflow>给啃完了,了解了一下基本的BP网络,CNN,RNN这些.感觉实际上算法本身不是特别的深奥难懂,最简单的BP网络基本上学完微积分和概率论就能搞懂,CNN引入的卷积,池化等也是数字图像处理中比较成熟的理论,RNN使用的数学工具相对而言比较高深一些,需要再深入消化消化,最近也在啃白皮书,争取从数学上把这些理论吃透 当然光学理论不太行,还是得要有一些实践的,下面是三个入门级别的,可以用来辅助对BP网络的理解 环境:win10 WSL ubuntu 18.04

Docker入门实践小结

Docker是近来大热的技术,似乎没有用过Docker都要落伍了,于是我也在一个个人的项目中用上了Docker,也算有了一些实践经验,与大家分享交流一下. 首先是安装和配置本地的Docker环境,因为Docker是Linux容器引擎,所以宿主机必须是Linux,Windows和Mac系统使用Docker必须先安装Boot2Docker运行一个包含Docker依赖的最小化Linux虚拟机作为宿主机,当然也可以用虚拟机运行一个完整Linux桌面系统然后再在里面安装Docker,个人更推荐后面一种方法