分布式内存网格Hazelcast源码导读

去年项目需要看了hazelcast源码,当时记录的笔记。

Node是节点的抽象,里面包含节点引擎、客户端引擎、分区服务、集群服务、组播服务、连接管理、命令管理、组播属性、节点配置、本地成员、tcp地址、组播地址、连接者、节点初始化器、管理中心、安全上下文、

Config类,包含GroupConfig、NetworkConfig、MapConfig、TopicConfig、QueueConfig、MultiMapConfig、ListConfig、SetConfig、ExecutorConfig、SemaphoreConfig、WanReplicationConfig、ServicesConfig、SecurityConfig、ListenerConfig、PartitionGroupConfig、ManagementCenterConfig、SerializationConfig。

GroupConfig集群用户名及密码DEFAULT_GROUP_PASSWORD = "dev-pass";DEFAULT_GROUP_NAME = "dev";
NetworkConfig网络相关配置,包括InterfacesConfig、JoinConfig、SSLConfig等,其中JoinConfig包括MulticastConfig、TcpIpConfig、AwsConfig。
SecurityConfig客户端登陆相关配置。
WanReplicationConfig集群复制配置,包括WanTargetClusterConfig配置,所有目标节点相关配置。

ClusterServiceImpl用于维护集群各个成员节点,实例化时把本地节点加入成员Map中,MulticastService

Node实例化时根据MulticastConfig使用组播加入一个组,MulticastService提供节点基于组播传递的,使用监听模式每当接收到其他节点传播的消息调用监听器的onMessage,传递的参数为JoinMessage,它由序列化模块提供转化,默认NodeMulticastListener监听器,会对集群的节点校验,是否是同个集群用户名密码。

MulticastJoiner用于加入集群节点,创建JoinRequest对象用MulticastService发送给其他成员,不断发送加入集群请求直到node里面的masteraddress,如果配置里面指定了targetAddress就不用使用这种不断发送的方式选举主节点。MulticastService负责发送申请加入的组播消息和组播消息接收及处理的工作。

找主节点方式:循环发送JoinRequest消息向组内发送,如果已加入状态且是master的节点接收到后会向外组播JoinMessage,告诉其他节点我的组成员信息,还没加入集群的成员则将自己节点的master地址设置为master发出的JoinMessage中的地址。所以JoinMessage只有master才会发出。其他已加入组的成员节点接收到JoinMessage类型消息则直接忽略,

JoinMessage包含包版本、地址等信息。

SerializationService序列化转化模块,SerializationServiceBuilder生产者,默认字节存放顺序序列为ByteOrder.BIG_ENDIAN,即使用ByteArrayInputOutputFactory,

Packet为数据包结构,第一字节为版本号,第二字节表示为长度,第三字节表示类型。

SocketConnector会执行连接操作,使用的是阻塞读写。

TcpIpConnectionManager集成ConnectionManager,用于管理TCPIP连接

JoinMessage{
protected byte packetVersion;
    protected int buildNumber;
    protected Address address;
    protected String uuid;
    protected ConfigCheck configCheck;
    protected int memberCount;
}

JoinRequest继承JoinMessage,并添加credentials、tryCount属性。

默认5701作为每个节点对外暴露的端口。

ReadHandler的handle方法会初始化socketReader,集群成员维护的话使用SocketPacketReader读取报文,

OperationServiceImpl类内部类RemoteOperationProcessor用于ResponseOperation对象
response.beforeRun();
response.run();
response.afterRun();
即会调用JoinRequestOperation的run方法,调用ClusterServiceImpl里面的handleJoinRequest方法,完成成员更新工作,接收成功会向源节点返回SetMasterOperation、MemberInfoUpdateOperation等等更新源节点成员及节点joined状态。

里面一切需要处理的都用run方法,会统一处理。

Node.start--TcpIpConnectionManager.start--InSelectorImpl.run(输入即读取选择器,run不断选择可读的对象并获取attachment,调用它的handle方法,这里的attachment是在TcpIpConnection实例化的时候创建readHandle对象并调用start方法把readHandler当做attachment注册到socketChannel里面的,readHandler调用handle方法,如果是集群协议CLUSTER则实例化SocketPacketReader,SocketPacketReader包含一个PacketReader,默认是DefaultPacketReader,用于将套接字读出来的字节转化成Packet包并调用handlePacket方法处理消息包,如果是集群成员消息包则调用handleMemberPacket方法处理,间接调用NodeEngineImpl的handlePacket方法,有三种不同头部类型:Packet.HEADER_OP、Packet.HEADER_EVENT、Packet.HEADER_WAN_REPLICATION,HEADER_OP表示操作类型,会调用operationService的handleOperation方法处理包消息,OperationThread会一直跑调用OperationRunner处理需要执行的Operation,OperationRunner线程数由hazelcast.operation.thread.count设置,如果-1则为机器cpu个数,接着会执行operation的beforeRun、run、operation如果设置了响应的话还要执行产生响应内容在返回给调用者、afterRun方法。)--OutSelectorImpl.run(输出即写入选择器)--SocketAcceptor.run--TcpIpConnection.start(TcpIpConnection包括WriteHandler和ReadHandler)--

private void process(Object task) {
        processedCount++;

if (task instanceof Operation) {
            processOperation((Operation) task);
            return;
        }

if (task instanceof Packet) {
            processPacket((Packet) task);
            return;
        }

if (task instanceof PartitionSpecificRunnable) {
            processPartitionSpecificRunnable((PartitionSpecificRunnable) task);
            return;
        }

throw new IllegalStateException("Unhandled task type for task:" + task);
    }

设计得不错的是它直接支持各种Operation的传递,并执行,本质也是序列化反序列化,然后调用Operation的beforerun、run、afterrun方法,后面还会自动执行handleResponse方法,此方法用于向其他备份节点同步数据,这部分操作是在operation的afterrun之前完成备份。备份工作由OperationBackupHandler完成,backup方法,备份又分为同步备份和异步备份,相加等于总的需要备份数。map操作默认备份一份数据且是同步的,异步的默认为0.异步备份不会阻塞操作。备份的operation是Backup,它的run会执行PutOperation的run方法,即把数据放到缓存中并修改版本,这里的run不会再执行复制操作。
<hazelcast>
  <map name="default">
    <backup-count>1</backup-count>
    <async-backup-count>1</async-backup-count>
  </map>
</hazelcast>
operation的run同样会在备份节点上执行,putoperation其实就是在本地缓存更新值。备份的过程没有一个ack机制,信息传输的可靠性如何保证?

一个节点调用map的put操作时,会在本节点上缓存这个结果,再把operation传输到对应partition的第一个备份节点(这个节点可能就是自己本地节点)上,第一个节点接收到后备份到第二个节点上,所以默认就只有两个备份数据。所以nearcache缓存是可能存在每个节点上的。

PutOperation的afterRun方法主要是触发一些拦截器,触发各个节点的事件监听器什么的、更新各个备份节点缓存等等。

IOBalancer平衡io读取写入。

packet报文结构,1byte版本+2byte头部+4byte分区+4byte长度+nbyte消息。

主节点一个一个发给其他成员节点关于成员的消息,其他节点进行更新。

ServiceManagerImpl用于管理所有的远程,启动时会注册常用的一些服务,包括如下的服务,什么map、queue什么的
    private void registerCoreServices() {
        Node node = nodeEngine.getNode();
        registerService(ClusterServiceImpl.SERVICE_NAME, node.getClusterService());
        registerService(InternalPartitionService.SERVICE_NAME, node.getPartitionService());
        registerService(ProxyServiceImpl.SERVICE_NAME, nodeEngine.getProxyService());
        registerService(TransactionManagerServiceImpl.SERVICE_NAME, nodeEngine.getTransactionManagerService());
        registerService(ClientEngineImpl.SERVICE_NAME, node.clientEngine);
        registerService(QuorumServiceImpl.SERVICE_NAME, nodeEngine.getQuorumService());
    }

private void registerDefaultServices(ServicesConfig servicesConfig) {
        registerService(MapService.SERVICE_NAME, createService(MapService.class));
        registerService(LockService.SERVICE_NAME, new LockServiceImpl(nodeEngine));
        registerService(QueueService.SERVICE_NAME, new QueueService(nodeEngine));
        registerService(TopicService.SERVICE_NAME, new TopicService());
        registerService(ReliableTopicService.SERVICE_NAME, new ReliableTopicService(nodeEngine));
        registerService(MultiMapService.SERVICE_NAME, new MultiMapService(nodeEngine));
        registerService(ListService.SERVICE_NAME, new ListService(nodeEngine));
        registerService(SetService.SERVICE_NAME, new SetService(nodeEngine));
        registerService(DistributedExecutorService.SERVICE_NAME, new DistributedExecutorService());
        registerService(AtomicLongService.SERVICE_NAME, new AtomicLongService());
        registerService(AtomicReferenceService.SERVICE_NAME, new AtomicReferenceService());
        registerService(CountDownLatchService.SERVICE_NAME, new CountDownLatchService());
        registerService(SemaphoreService.SERVICE_NAME, new SemaphoreService(nodeEngine));
        registerService(IdGeneratorService.SERVICE_NAME, new IdGeneratorService(nodeEngine));
        registerService(MapReduceService.SERVICE_NAME, new MapReduceService(nodeEngine));
        registerService(ReplicatedMapService.SERVICE_NAME, new ReplicatedMapService(nodeEngine));
        registerService(RingbufferService.SERVICE_NAME, new RingbufferService(nodeEngine));
        registerService(XAService.SERVICE_NAME, new XAService(nodeEngine));
        registerCacheServiceIfAvailable();
    }

Map通过AbstractMapServiceFactory创建,使用MapRemoteService处理远程操作,RemoteService服务有两个方法createDistributedObject和destroyDistributedObject方法,最终是通过MapProxyImpl。Map的partition策略在MapContainer里面。

PutOperation用于执行集群节点的put操作。一致性哈希根据key使用MurmurHash哈希计算出结果,再根据分区数(默认271)取余。每个节点都是使用new ConcurrentHashMap<Data, Record>(1000, 0.75f, 1)存放记录。

成员组MemberGroup一共有若干个组,多少个成员就多少个组,最大的复制节点数为7,如果成员组小于7则使用成员组数量。

address[271][4]

Address[271][7] 
1、addr0 addr9 addr3 addr4 null null null
2、.
.
.
.
.
271、addr1 addr2 addr3 addr4 null null null

对分配的结果尝试重新分配,把过载的组分配一些成员给不足的组,并检测每个成员组内的节点数相差不会超过系数1.1,否则重新分组,尽可能达到均匀。

最终形成一张表,271个分区每个分区都对应着若干个复制的成员地址。

int avgPartitionPerGroup = partitionCount / groupSize;
就是说最多4个组,假如5个结点,分组情况为2,1,1,1,则每个组分到的partition个数为271/4,可能有些组多1个partition。

一共有271条线程处理operation,OperationThread,里面有队列ScheduleQueue,线程会不断处理,ScheduleQueue用于operation执行缓冲队列,里面的有两种队列,normalQueue和priorityQueue,一种用于正常的排队,一种用于设置优先级的队列,take方法会优先从priorityQueue中获取需要优先处理的operation。

有个同步复制、异步复制。

OperationServiceImpl.createInvocationBuilder(此方法有两个,一个用于partition、一个用于指定的address),->  实例化一个InvocationBuilderImpl对象,调用invoke方法会创建Invocation,Invocation包含PartitionInvocation(针对分区)和TargetInvocation(针对指定节点)两种。

在集群中传输一切以Data形式传输。

Map的服务名为hz:impl:mapService。
假如HazelcastInstance执行instance.getMap("customers")则,通过HazelcastInstanceProxy的getMap方法,代理是调HazelcastInstanceImpl的getMap方法,调用getDistributedObject方法,它会通过ProxyService(它代理了所有服务)代理找到MapService,调用mapservice的createDistributedObject方法创建DistributedObject,间接调用MapRemoteService的createDistributedObject方法创建MapProxyImpl,调用MapProxyImpl的put方法把数据放到集群,主要如下操作
final Data key = toData(k, partitionStrategy);
final Data value = toData(v);
final Data result = putInternal(key, value, ttl, timeunit);
return (V) toObject(result);
分别将key和value转化为Data,其实是序列化,方便网络传输。putInternal方法使用PutOperation,并且要根据key计算出partitionId,接着再完成operation的调用。

SerializationServiceImpl提供各种类型的序列化支持,toData提供由object到Data的转化,toObject提供由Data到object的转化,Data默认是使用DefaultData,DefaultData里面其实就是包含了一个字节数组还有不同的偏移量,例如从几位到几位表示类型,序列化工作其实也跟这种类似,把某一对象的相关信息转化为字节数组,传递到目的地后再根据约定反向组装成指定对象。

OperationThread专门用于执行接收到的请求,process方法,可以有三种请求,包括Operation、Packet、PartitionSpecificRunnable,分别不同的处理逻辑,Operation则直接反序列化后调用beforeRun、run、afterRun等方法。

HealthMonitor是独立一条线程用于监控健康,包括
private class HealthMetrics {
        private final long memoryFree;
        private final long memoryTotal;
        private final long memoryUsed;
        private final long memoryMax;
        private final double memoryUsedOfTotalPercentage;
        private final double memoryUsedOfMaxPercentage;
        //following three load variables are always between 0 and 100.
        private final double processCpuLoad;
        private final double systemLoadAverage;
        private final double systemCpuLoad;
        private final int threadCount;
        private final int peakThreadCount;
        private final long clusterTimeDiff;
        private final int asyncExecutorQueueSize;
        private final int clientExecutorQueueSize;
        private final int queryExecutorQueueSize;
        private final int scheduledExecutorQueueSize;
        private final int systemExecutorQueueSize;
        private final int eventQueueSize;
        private final int pendingInvocationsCount;
        private final double pendingInvocationsPercentage;
        private final int operationServiceOperationExecutorQueueSize;
        private final int operationServiceOperationPriorityExecutorQueueSize;
        private final int operationServiceOperationResponseQueueSize;
        private final int runningOperationsCount;
        private final int remoteOperationsCount;
        private final int proxyCount;
        private final int clientEndpointCount;
        private final int activeConnectionCount;
        private final int currentClientConnectionCount;
        private final int connectionCount;
        private final int ioExecutorQueueSize;
}

PerformanceMonitor表示性能监控,监控的参数包括inSelector的已读取数量,outSelect的已写入事件数量,OperationService相关的性能参数,例如挂起的调用比例、总体调用比例、最大调用数量、271个分区线程已执行数量、271个分区operation线程正在挂起线程(即任务排队队列中的数量),常规operation线程的排队队列数量、常规operation线程已执行数量、响应线程已执行数量、响应线程排队队列数量。

hazelcast核心——Node,包含各种各样重要的基础服务,日志、节点关闭钩子、序列化服务、节点引擎、客户端引擎、分区服务、集群服务、广播服务、连接管理服务、命令服务、配置文件服务、群组属性服务、本节点地址、本地集群成员对象、主节点地址、hazelcast实例引用、日志服务、集群节点加入服务、节点扩展服务、管理中心服务、安全上下文、创建信息服务、版本校对服务、hazelcast线程组。
public class Node {
    private final ILogger logger;
    private final NodeShutdownHookThread shutdownHookThread = new NodeShutdownHookThread("hz.ShutdownThread");
    private final SerializationService serializationService;
    public final NodeEngineImpl nodeEngine;
    public final ClientEngineImpl clientEngine;
    public final InternalPartitionService partitionService;
    public final ClusterServiceImpl clusterService;
    public final MulticastService multicastService;
    public final ConnectionManager connectionManager;
    public final TextCommandServiceImpl textCommandService;
    public final Config config;
    public final GroupProperties groupProperties;
    public final Address address;
    public final MemberImpl localMember;
    private volatile Address masterAddress = null;
    public final HazelcastInstanceImpl hazelcastInstance;
    public final LoggingServiceImpl loggingService;
    private final Joiner joiner;
    private final NodeExtension nodeExtension;
    private ManagementCenterService managementCenterService;
    public final SecurityContext securityContext;
    private final ClassLoader configClassLoader;
    private final BuildInfo buildInfo;
    private final VersionCheck versionCheck = new VersionCheck();
    private final HazelcastThreadGroup hazelcastThreadGroup;
}

NodeEngineImpl作为节点引擎包含了许多服务,重要的例如,事件服务、operation服务、执行服务、等待通知服务、service(内置许多service,例如Map、Queue,用户自定义的服务可配置到hazelcast.xml,启动时会加载进来)管理服务、事务管理服务、代理服务、wan复制服务、包传输服务、证明人服务。
public class NodeEngineImpl implements NodeEngine {

private final Node node;
    private final ILogger logger;
    private final EventServiceImpl eventService;
    private final OperationServiceImpl operationService;
    private final ExecutionServiceImpl executionService;
    private final WaitNotifyServiceImpl waitNotifyService;
    private final ServiceManagerImpl serviceManager;
    private final TransactionManagerServiceImpl transactionManagerService;
    private final ProxyServiceImpl proxyService;
    private final WanReplicationService wanReplicationService;
    private final PacketTransceiver packetTransceiver;
    private final QuorumServiceImpl quorumService;

}

ServiceManagerImpl用于管理所有服务,启动时默认会实例化核心的service、默认的service,如果用户通过配置文件配置了自定义service则也会实例化。启动时注册即将service实例put到ConcurrentMap中,核心service包括ClusterServiceImpl、InternalPartitionService、ProxyServiceImpl、TransactionManagerServiceImpl、ClientEngineImpl、QuorumServiceImpl。默认service包括MapService、LockService、QueueService、TopicService、ReliableTopicService、MultiMapService、ListService、SetService、DistributedExecutorService、AtomicLongService、AtomicReferenceService、CountDownLatchService、SemaphoreService、IdGeneratorService、MapReduceService、ReplicatedMapService、RingbufferService、XAService。如果允许还将把缓存服务CacheService添加进来。
public final class ServiceManagerImpl implements ServiceManager {

private final ConcurrentMap<String, ServiceInfo> services = new ConcurrentHashMap<String, ServiceInfo>(20, .75f, 1);

}

节点加入,Joiner负责加入工作,例如广播则使用MulticastJoiner、单播则使用TcpIpJoiner、AWS则使用TcpIpJoinerOverAWS。Node启动时会根据情况启动个线程,
multicast只是做节点发现工作,真正的节点加入工作是交由tcpip做,向主节点发送加入请求,主节点把请求节点添加到成员列表中,然后返回请求节点让它把主节点地址设置为本人。

DefaultSerializers包括DateSerializer、ObjectSerializer、ClassSerializer等等序列化器,实现StreamSerializer的read和write方法完成序列化和反序列化处理。

Date.class, new DateSerializer());
        BigInteger.class, new BigIntegerSerializer());
        BigDecimal.class, new BigDecimalSerializer());
        Externalizable.class, new Externalizer(enableCompression));
        Serializable.class, new ObjectSerializer(enableSharedObject, enableCompression));
        Class.class, new ClassSerializer());
        Enum.class, new EnumSerializer());

DataSerializable.class, dataSerializerAdapter);
        Portable.class, portableSerializerAdapter);
        Byte.class, new ByteSerializer());
        Boolean.class, new BooleanSerializer());
        Character.class, new CharSerializer());
        Short.class, new ShortSerializer());
        Integer.class, new IntegerSerializer());
        Long.class, new LongSerializer());
        Float.class, new FloatSerializer());
        Double.class, new DoubleSerializer());
        byte[].class, new TheByteArraySerializer());
        char[].class, new CharArraySerializer());
        short[].class, new ShortArraySerializer());
        int[].class, new IntegerArraySerializer());
        long[].class, new LongArraySerializer());
        float[].class, new FloatArraySerializer());
        double[].class, new DoubleArraySerializer());
        String.class, new StringSerializer());

每个service都有自己的context,例如mapservice的MapServiceContext,它里面是保留了一份partition映射表的副本,在底层完成迁移之前并不会更新,当然底层的map数据也不会一边迁移一边删除,而是复制一份进行删除

喜欢java的同学可以交个朋友:

时间: 2024-10-12 09:27:34

分布式内存网格Hazelcast源码导读的相关文章

spring 事件模式 源码导读

一,jdk 事件对象基类 package java.util; import java.io.Serializable; public class EventObject implements Serializable { protected transient Object source; public Object getSource() { return this.source; } public EventObject(Object paramObject) { if (paramObj

Hazelcast源码剖析之Eviction

v\:* {behavior:url(#default#VML);} o\:* {behavior:url(#default#VML);} w\:* {behavior:url(#default#VML);} .shape {behavior:url(#default#VML);}/* Style Definitions */ table.MsoNormalTable {mso-style-name:普通表格; mso-tstyle-rowband-size:0; mso-tstyle-colb

Memcache分布式实现原理---Java_Memcache 源码分析

 Memcache本身只是一个内存缓存服务器,用来缓存数据以缓解数据库压力,但是我们经常会听到分布工Memcache,那么它是如何实现的呢?在使用Java操作Memcache时,我们通常会借助Java_Memcache来帮助我们完成各项操作, get/set/delete等.下面我们阅读一下Java_Memcache的源码来窥探一二.(注:网上很难找到Java_Memcache最新的源代码,但是可以利用Java 反编译工具来查看其源码,具体操作步骤可以参见我的另一篇博客--Eclipse设置

从使用Handler致内存泄漏角度源码追踪Handler工作机制

使用Handler时内存泄漏分析 在Android中,处理完异步任务后常常会在主线程进行一些操作,所以我们可能会使用到Handler,下面是Handler的常见使用方法: public class MainActivity extends AppCompatActivity { private Handler mHanlder = new Handler() { @Override public void handleMessage(Message msg) { //TODO } }; } 但是

内存管理初始化源码4:add_active_range

我们在阅读源码时,函数功能可以分为两类:1. bootmem.c 2. page_alloc.c. 1. bootmem.c是关于bootmem allocator的,上篇文章已经简述过. 2. page_alloc.c是关于Memory Management subsystem的. 关于内存管理子系统的初始化调用了多个函数,我们首先分析在bootmem_init中调用的add_active_range函数. start_kernel --> setup_arch ---> arch_mem_

内存管理初始化源码1:setup_arch

源码声明:基于Linux kernel 3.08 1. 在kernel/arch/mips/kernel/head.S中会做一些特定硬件相关的初始化,然后会调用内核启动函数:start_kernel: 2. start_kernel是通用的内核启动函数,但是在初始化内核过程中,必然有一些参数是特定于硬件体系结构的,这些特定于硬件体系结构的设置通过调用函数setup_arch函数: 3. 我们看看MIPS架构的setup_arch函数做了哪些特定于MIPS的设置: /* kernel/arch/m

详细讲解MFS分布式文件系统搭建(内含源码包)

初步了解分布式原理: 分布式文件系统(Distributed File Systemm)是指文件系统管理的物理存储资源不一定直接连接在本地节点上,而是通过计算机网络与节点相连.简单来说,就是把一些分散的(分布在局域网内各个计算机上)共享文件夹,集合到一个文件夹内(虚拟共享文件夹).对于用户来说,要访问这些共享文件夹时,只要打开这个虚拟共享文件夹,就可以看到所有链接到虚拟共享文件夹内的共享文件夹,用户感觉不到这些共享文件是分散在各个计算机上的.分布式文件系统的好处是集中访问.简化操作.数据容灾,以

百度开源分布式id生成器uid-generator源码剖析

百度uid-generator源码 https://github.com/baidu/uid-generator snowflake算法 uid-generator是基于Twitter开源的snowflake算法实现的. snowflake将long的64位分为了3部分,时间戳.工作机器id和序列号,位数分配如下. 其中,时间戳部分的时间单位一般为毫秒.也就是说1台工作机器1毫秒可产生4096个id(2的12次方). 源码实现分析 与原始的snowflake算法不同,uid-generator支

内存管理初始化源码3:bootmem

start_kernel ——> setup_arch ——> arch_mem_init ——> bootmem_init ——> init_bootmem_node: 此时,不得不说的就是 bootmem . 1. 什么是bootmem: 我们都知道,所有的物理内存是交给内核管理的,或者说是交给内存管理子系统管理的.那么,从内核启动到内核管理子系统启动之间,是否需要内存呢?答案是肯定的,该时间段内是需要物理内存的. 那么bootmem就是负责该时间段的物理内存的分配. 2. 特