go任务调度6(etcd租约机制/自动过期)

对于实现分布式乐观锁非常重要。如果锁了,突然宕机了,锁是需要自动释放的。所以这锁在etcd里是需要生命期的。
过期演示:

package main

import (
    "context"
    "fmt"
    "go.etcd.io/etcd/clientv3"
    "time"
)

func main() {
    var (
        config  clientv3.Config
        client  *clientv3.Client
        err     error
        lease clientv3.Lease
        leaseGrantResp *clientv3.LeaseGrantResponse
        leaseId clientv3.LeaseID
        putResp *clientv3.PutResponse
        kv clientv3.KV
        getResp *clientv3.GetResponse
    )

    //客户端配置
    config = clientv3.Config{
        Endpoints:   []string{"0.0.0.0:2379"}, //集群列表
        DialTimeout: 5 * time.Second,
    }

    //建立客户端
    if client, err = clientv3.New(config); err != nil {
        fmt.Println(err)
        return
    }

    //申请一个lease(租约)
    lease = clientv3.NewLease(client)

    //申请一个5秒的租约
    if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
        fmt.Println(err)
        return
    }

    //拿到租约的id
    leaseId = leaseGrantResp.ID

    //获得kv api子集
    kv = clientv3.NewKV(client)

    //put一个kv,让它与租约关联起来,从而实现10秒后自动过期
    if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "", clientv3.WithLease(leaseId)); err != nil {
        fmt.Println(err)
        return
    }

    fmt.Println("写入成功:", putResp.Header.Revision)

    //定时看key过期没
    for {
        if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1"); err != nil {
            fmt.Println(err)
            return
        }
        if getResp.Count == 0 {
            fmt.Println("kv过期了")
            break
        }
        fmt.Println("还没过期:", getResp.Kvs)
        time.Sleep(time.Second)
    }
}

[[email protected] etcd]# go run demo6.go
写入成功: 27
还没过期: [key:"/cron/lock/job1" create_revision:27 mod_revision:27 version:1 lease:7587837741646622005 ]
还没过期: [key:"/cron/lock/job1" create_revision:27 mod_revision:27 version:1 lease:7587837741646622005 ]
还没过期: [key:"/cron/lock/job1" create_revision:27 mod_revision:27 version:1 lease:7587837741646622005 ]
还没过期: [key:"/cron/lock/job1" create_revision:27 mod_revision:27 version:1 lease:7587837741646622005 ]
还没过期: [key:"/cron/lock/job1" create_revision:27 mod_revision:27 version:1 lease:7587837741646622005 ]
还没过期: [key:"/cron/lock/job1" create_revision:27 mod_revision:27 version:1 lease:7587837741646622005 ]
kv过期了
[[email protected] etcd]#

申请一把分布式锁的时候,是谁抢到了key就是抢到了锁,如果不主动释放这锁,按道理讲不应该让租约过期,租约过期主要是为了程序宕掉之后,锁自动释放,防止程序异常退出。如果程序抢到了这个锁,我们希望锁一直不失效,知道我们主动释放它:

package main

import (
    "context"
    "fmt"
    "go.etcd.io/etcd/clientv3"
    "time"
)

func main() {
    var (
        config  clientv3.Config
        client  *clientv3.Client
        err     error
        lease clientv3.Lease
        leaseGrantResp *clientv3.LeaseGrantResponse
        leaseId clientv3.LeaseID
        putResp *clientv3.PutResponse
        kv clientv3.KV
        getResp *clientv3.GetResponse
        keepResp *clientv3.LeaseKeepAliveResponse
        keepRespChan <-chan *clientv3.LeaseKeepAliveResponse //只读channel
    )

    //客户端配置
    config = clientv3.Config{
        Endpoints:   []string{"0.0.0.0:2379"}, //集群列表
        DialTimeout: 5 * time.Second,
    }

    //建立客户端
    if client, err = clientv3.New(config); err != nil {
        fmt.Println(err)
        return
    }

    //申请一个lease(租约)
    lease = clientv3.NewLease(client)

    //申请一个5秒的租约
    if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
        fmt.Println(err)
        return
    }

    //拿到租约的id
    leaseId = leaseGrantResp.ID

    //(自动续租)当我们申请了租约之后,我们就可以启动一个续租
    if keepRespChan, err = lease.KeepAlive(context.TODO(), leaseId); err != nil {
        fmt.Println(err)
        return
    }

    //处理续租应答的协程
    go func() {
        for {
            select {
            case keepResp = <-keepRespChan:
                if keepRespChan == nil {
                    fmt.Println("租约已经失效")
                    goto END
                } else { //每秒会续租一次,所以就会受到一次应答
                    fmt.Println("收到自动续租应答:", keepResp.ID)
                }
            }
        }
        END:
    }()

    //获得kv api子集
    kv = clientv3.NewKV(client)

    //put一个kv,让它与租约关联起来,从而实现10秒后自动过期
    if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "", clientv3.WithLease(leaseId)); err != nil {
        fmt.Println(err)
        return
    }

    fmt.Println("写入成功:", putResp.Header.Revision)

    //定时看key过期没
    for {
        if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1"); err != nil {
            fmt.Println(err)
            return
        }
        if getResp.Count == 0 {
            fmt.Println("kv过期了")
            break
        }
        fmt.Println("还没过期:", getResp.Kvs)
        time.Sleep(time.Second)
    }
}

[[email protected] etcd]# go run demo7.go
写入成功: 30
收到自动续租应答: 7587837741646622039
还没过期: [key:"/cron/lock/job1" create_revision:29 mod_revision:30 version:2 lease:7587837741646622039 ]
还没过期: [key:"/cron/lock/job1" create_revision:29 mod_revision:30 version:2 lease:7587837741646622039 ]
收到自动续租应答: 7587837741646622039
还没过期: [key:"/cron/lock/job1" create_revision:29 mod_revision:30 version:2 lease:7587837741646622039 ]
还没过期: [key:"/cron/lock/job1" create_revision:29 mod_revision:30 version:2 lease:7587837741646622039 ]
还没过期: [key:"/cron/lock/job1" create_revision:29 mod_revision:30 version:2 lease:7587837741646622039 ]
收到自动续租应答: 7587837741646622039
还没过期: [key:"/cron/lock/job1" create_revision:29 mod_revision:30 version:2 lease:7587837741646622039 ]
还没过期: [key:"/cron/lock/job1" create_revision:29 mod_revision:30 version:2 lease:7587837741646622039 ]
收到自动续租应答: 7587837741646622039
......

原文地址:https://blog.51cto.com/5660061/2381931

时间: 2024-11-09 00:29:31

go任务调度6(etcd租约机制/自动过期)的相关文章

租约机制

背景和介绍 缓存是计算机里广泛使用的一种技术,对降低读取延迟.网络流量和服务器负载都非常有效,但也带来了一致性(Consistency)的问题.所谓一致就是客户端总能读到最新的数据,使用缓存后有可能服务器端的数据已经被修改,但客户端仍然从缓存中读取陈旧的数据.为了保证一致性,有两种常见的解决办法,第一种是轮询(Polling),即每次读取数据时都先询问服务器数据是不是最新的,如果不是就从服务器传输新数据,这种方法需要每次读取数据时都与服务器通信.另一种方法就是回调(Callback)或者无效化(

租约机制简介

背景和介绍 缓存是计算机里广泛使用的一种技术,对降低读取延迟.网络流量和服务器负载都非常有效,但也带来了一致性(Consistency)的问题.所谓一致就是客户端总能读到最新的数据,使用缓存后有可能服务器端的数据已经被修改,但客户端仍然从缓存中读取陈旧的数据.为了保证一致性,有两种常见的解决办法,第一种是轮询(Polling),即每次读取数据时都先询问服务器数据是不是最新的,如果不是就从服务器传输新数据,这种方法需要每次读取数据时都与服务器通信.另一种方法就是回调(Callback)或者无效化(

分布式系统理论之租约机制学习

一,租约机制介绍 在分布式系统中,往往会有一个中心服务器节点.该节点负责存储.维护系统中的元数据.如果系统中的各种操作都依赖于中心服务器上的元数据,那么中心服务器很容易成为性能瓶颈及存在单点故障.而通过租约机制,可以将中心服务器的“权力”下放给其他机器,就可以减轻中心服务器的压力.当然,租约机制还有许多其他的用途:比如,确定集群中结点的状态,还可以实现分布式下的读写锁…… 如下图,GFS master颁发租约给某个chunk server,让它成为Primary 副本,当有多个client并发更

etcd选举机制

etcd是一种先进的key-value的存储系统,本文主要是学习etcd的心得,如有误解,敬请拍砖 主要分成三种形式的选举,先说一下etcd节点的三种状态,分别为leader,candidate和follower 第一种:初始选举 A.B.C.D现在进场,那么谁当领导呢?A(变身candidatae)就分别找BCD谈话,"我来当,你没意见吧".B\C都没什么主见,就同意了,D虽然不同意,但是大家都这么说,只好也同意了.A就开始行使权利,定时从BCD同步日志,并发送心跳(heartbea

mongo设置自动过期时间

.fctbNone { color: #000000 } .fctbStyle5 { color: #0000ff } .fctbStyle7 { color: #4682b4 } .fctbStyle6 { color: #800000 } .fctbStyle2 { color: #ff0000 } .fctbStyle3 { color: #ff00ff } 执行脚本命令: db.collectionName.ensureIndex({"fieldName": 1},{expir

Yii2的深入学习--自动加载机制

Yii2 的自动加载分两部分,一部分是 Composer 的自动加载机制,另一部分是 Yii2 框架自身的自动加载机制. Composer自动加载 对于库的自动加载信息,Composer 生成了一个 vendor/autoload.php 文件.你可以简单的引入这个文件,你会得到一个自动加载的支持. 在之前的文章,入口文件的介绍中,我们可以看到如下内容: // 引入 vendor 中的 autoload.php 文件,会基于 composer 的机制自动加载类 require(__DIR__ .

Yii2的深入学习--自动加载机制(转)

Yii2 的自动加载分两部分,一部分是 Composer 的自动加载机制,另一部分是 Yii2 框架自身的自动加载机制. Composer自动加载 对于库的自动加载信息,Composer 生成了一个 vendor/autoload.php 文件.你可以简单的引入这个文件,你会得到一个自动加载的支持. 在之前的文章,入口文件的介绍中,我们可以看到如下内容: // 引入 vendor 中的 autoload.php 文件,会基于 composer 的机制自动加载类 require(__DIR__ .

go任务调度9(op实现分布式乐观锁)

package main import ( "go.etcd.io/etcd/clientv3" "time" "fmt" "context" ) func main() { var ( config clientv3.Config client *clientv3.Client err error lease clientv3.Lease leaseGrantResp *clientv3.LeaseGrantResponse

浅入深出ETCD之【集群部署与golang客户端使用】

前言 之前说了etcd的简介,命令行使用,一些基本原理.这次来说说现实一点的集群部署和golang版本的客户端使用.因为在实际使用过程中,etcd的节点肯定是需要2N+1个进行部署的,所以有必要说明一下集群的部署. 集群部署 网上有很多集群部署的教程,有的很复杂,其实对于我们实际使用来说,其实配置并不复杂,下面举例一种最简单的集群配置.(简单到你想不到~) 下载 https://github.com/etcd-io/etcd/releases 还是在github上面找到需要下载的版本 我使用的是