syncer.go

package mirror

import (
    "github.com/coreos/etcd/clientv3"
    "golang.org/x/net/context"
)

const (
    batchLimit = 1000
)

// Syncer syncs with the key-value state of an etcd cluster.
type Syncer interface {
    // SyncBase syncs the base state of the key-value state.
    // The key-value state are sent through the returned chan.
    SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, chan error)
    // SyncUpdates syncs the updates of the key-value state.
    // The update events are sent through the returned chan.
    SyncUpdates(ctx context.Context) clientv3.WatchChan
}

// NewSyncer creates a Syncer.
func NewSyncer(c *clientv3.Client, prefix string, rev int64) Syncer {
    return &syncer{c: c, prefix: prefix, rev: rev}
}

type syncer struct {
    c      *clientv3.Client
    rev    int64
    prefix string
}

func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, chan error) {
    respchan := make(chan clientv3.GetResponse, 1024)
    errchan := make(chan error, 1)

    // if rev is not specified, we will choose the most recent revision.
    if s.rev == 0 {
        resp, err := s.c.Get(ctx, "foo")
        if err != nil {
            errchan <- err
            close(respchan)
            close(errchan)
            return respchan, errchan
        }
        s.rev = resp.Header.Revision
    }

    go func() {
        defer close(respchan)
        defer close(errchan)

        var key string

        opts := []clientv3.OpOption{clientv3.WithLimit(batchLimit), clientv3.WithRev(s.rev)}

        if len(s.prefix) == 0 {
            // If len(s.prefix) == 0, we will sync the entire key-value space.
            // We then range from the smallest key (0x00) to the end.
            opts = append(opts, clientv3.WithFromKey())
            key = "\x00"
        } else {
            // If len(s.prefix) != 0, we will sync key-value space with given prefix.
            // We then range from the prefix to the next prefix if exists. Or we will
            // range from the prefix to the end if the next prefix does not exists.
            opts = append(opts, clientv3.WithRange(clientv3.GetPrefixRangeEnd(s.prefix)))
            key = s.prefix
        }

        for {
            resp, err := s.c.Get(ctx, key, opts...)
            if err != nil {
                errchan <- err
                return
            }

            respchan <- (clientv3.GetResponse)(*resp)

            if !resp.More {
                return
            }
            // move to next key
            key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0))
        }
    }()

    return respchan, errchan
}

func (s *syncer) SyncUpdates(ctx context.Context) clientv3.WatchChan {
    if s.rev == 0 {
        panic("unexpected revision = 0. Calling SyncUpdates before SyncBase finishes?")
    }
    return s.c.Watch(ctx, s.prefix, clientv3.WithPrefix(), clientv3.WithRev(s.rev+1))
}
				
时间: 2024-11-12 04:09:15

syncer.go的相关文章

事件驱动的HLog写入模型

HLog的作用: HBase写入数据时会同时写入到WAL和Memstore中,其中Memstore是位于内存中的store,类似于写缓存,当Memstore的大小超过限定的阈值时会触发flush行为,将内存中的数据刷写到磁盘做持久化.其中的wal也称为hlog,作用类似于mysql中的binlog,记录了客户端的每次update动作,只有当wal写入成功之后,本次写事务才会返回. 我们知道内存中的数据是易失的,当regionserver宕机时,HMaster会切割按region切割宕机regio

LVS高性能集群

LVS高性能集群  ====负载均衡硬件设备 1.什么是LVS?   linux virtual service,linux虚拟服务,使用多台服务器一起工作来提高服务的访问和处理性能   2.lvs的工作模式   (1)nat:通过地址转换访问服务   (2)tun:通过IP隧道访问服务   (3)dr:直接调度访问服务  ====直接路由调度 1.轮寻    2.加权,根据硬件的好坏来设置权值   3.最小连接4. 加权最小连接5. 基于地址的最小连接调度6. 目标7. 源        3.

CoroSync + Drbd + MySQL 实现MySQL的高可用集群

Corosync + DRBD + MySQL 构建高可用MySQL集群 节点规划: node1.huhu.com172.16.100.103 node2.huhu.com172.16.100.104 资源名称规划 资源名称:可以是除了空白字符外的任意ACSII码字符 DRBD设备:在双节点上,此DRBD设备文件,一般为/dev/drbdN,主设备号147 磁盘:在双方节点上,各自提供存储设备 网络配置:双方数据同步所使用的网络属性 DRBD从Linux内核2.6.33起已经整合进内核 1.配置

Mysql+DRBD+Heartbeat 实现mysql高可用的双击热备(DRBD篇)

DRBD官方tar包下载地址:   http://oss.linbit.com/drbd/ 环境介绍: 系统版本:CentOS 6.4 (64位) 内核版本  2.6.32-358.el6.x86_64 软件版本:drbd-8.4.3.tar.gz 主:10.0.0.1   从:10.0.0.2 两台机器上的hosts都需要修改: [[email protected] ~]# vim /etc/hosts 10.0.0.1    node1 10.0.0.2    node2 两台服务器双网卡,

HA集群之DRBD实现MySQL高可用

一.前言 本篇博文只是实现Corosync + Pacemaker + DRBD + MySQL,实现MySQL的高可用.更多的基础知识在前几篇博文中已有涉猎,故更多的理论细节将不再此篇中详述. 若想了解高可用基础知识,请参考:http://hoolee.blog.51cto.com/7934938/1406951 若想了解Corosync + Pacemaker,请参考:http://hoolee.blog.51cto.com/7934938/1409395 若想了解DRBD,请参考:http

利用lamp架构搭建Discuz论坛,并实现对数据库的高可用

lamp架构=LAMP指的Linux(操作系统).Apache(HTTP 服务器),MySQL(数据库软件) 和PHP(有时也是指Perl或Python) 的第一个字母,一般用来建立web 服务器. #############源码安装php########### tar jxf php-5.6.20.tar.bz2 cd php-5.6.20 解决依赖性: yum install freetype-devel libmcrypt-2.5.8-9.el6.x86_64.rpm net-snmp-d

34补2 HA Cluster与Corosync、pacemaker、drbd

HA Cluster基础及heartbeat实现HA 配置环境 node1:192.168.1.121 CentOS6.7 node2:192.168.1.122 CentOS6.7 node3:192.168.1.123 CentOS6.7 vip 192.168.1.88 配置前准备    # cat /etc/hosts 127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1  

项目:一次测试环境下的高可用NFS文件服务器(DRBD+heartbeat+NFS)

什么是DRBD? 分布式复制块设备.当你将数据写入本地的DRBD设备上的文件系统时,数据同时会被发送到远程端的另一边的DRBD设备文件系统里,保障数据实时同步,当本地的DRBD设备突然故障,远程节点还保留一份一模一样的数据,根据这个特性,可以结合heatbeat的ha开源软件,实现高可用! 我们都把DRBD理解是网络raid1磁盘阵列. DRBD底层设备可以是 1)一块磁盘,或者一个分区. 2)raid设备. 3)逻辑卷lvm. 4)任何块设备. DRBD支持三种不同的复制协议.协议A,协议B,

linux下高可用集群之DRBD详解

1.DRBD:Disrtributed Replicated Block Device,分布式复制块设备 DRBD:主要是在不同服务器之间硬盘或分区同步数据,通过网络,按位同步,即镜像! Raid1主要是将同一服务器硬盘或分区同步数据,通过主板总线,按位同步.即镜像! DRBD区别于DAS,NAS,SAN,也区别于Raid1 DRBD是primary/secondary,主从设备,主从角色可以互换 primary:可读写执行操作,但secondary不能挂载文件系统 DRBD也是Dual pri