lease.go

package clientv3

import (
    "sync"
    "time"

    "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
    pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
    "golang.org/x/net/context"
    "google.golang.org/grpc"
)

type (
    LeaseRevokeResponse pb.LeaseRevokeResponse
    LeaseID             int64
)

// LeaseGrantResponse is used to convert the protobuf grant response.
type LeaseGrantResponse struct {
    *pb.ResponseHeader
    ID    LeaseID
    TTL   int64
    Error string
}

// LeaseKeepAliveResponse is used to convert the protobuf keepalive response.
type LeaseKeepAliveResponse struct {
    *pb.ResponseHeader
    ID  LeaseID
    TTL int64
}

// LeaseTimeToLiveResponse is used to convert the protobuf lease timetolive response.
type LeaseTimeToLiveResponse struct {
    *pb.ResponseHeader
    ID LeaseID `json:"id"`

    // TTL is the remaining TTL in seconds for the lease; the lease will expire in under TTL+1 seconds.
    TTL int64 `json:"ttl"`

    // GrantedTTL is the initial granted time in seconds upon lease creation/renewal.
    GrantedTTL int64 `json:"granted-ttl"`

    // Keys is the list of keys attached to this lease.
    Keys [][]byte `json:"keys"`
}

const (
    // defaultTTL is the assumed lease TTL used for the first keepalive
    // deadline before the actual TTL is known to the client.
    defaultTTL = 5 * time.Second
    // a small buffer to store unsent lease responses.
    leaseResponseChSize = 16
    // NoLease is a lease ID for the absence of a lease.
    NoLease LeaseID = 0
)

type Lease interface {
    // Grant creates a new lease.
    Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)

    // Revoke revokes the given lease.
    Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)

    // TimeToLive retrieves the lease information of the given lease ID.
    TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)

    // KeepAlive keeps the given lease alive forever.
    KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)

    // KeepAliveOnce renews the lease once. In most of the cases, Keepalive
    // should be used instead of KeepAliveOnce.
    KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)

    // Close releases all resources Lease keeps for efficient communication
    // with the etcd server.
    Close() error
}

type lessor struct {
    mu sync.Mutex // guards all fields

    // donec is closed when recvKeepAliveLoop stops
    donec chan struct{}

    remote pb.LeaseClient

    stream       pb.Lease_LeaseKeepAliveClient
    streamCancel context.CancelFunc

    stopCtx    context.Context
    stopCancel context.CancelFunc

    keepAlives map[LeaseID]*keepAlive

    // firstKeepAliveTimeout is the timeout for the first keepalive request
    // before the actual TTL is known to the lease client
    firstKeepAliveTimeout time.Duration
}

// keepAlive multiplexes a keepalive for a lease over multiple channels
type keepAlive struct {
    chs  []chan<- *LeaseKeepAliveResponse
    ctxs []context.Context
    // deadline is the time the keep alive channels close if no response
    deadline time.Time
    // nextKeepAlive is when to send the next keep alive message
    nextKeepAlive time.Time
    // donec is closed on lease revoke, expiration, or cancel.
    donec chan struct{}
}

func NewLease(c *Client) Lease {
    l := &lessor{
        donec:                 make(chan struct{}),
        keepAlives:            make(map[LeaseID]*keepAlive),
        remote:                RetryLeaseClient(c),
        firstKeepAliveTimeout: c.cfg.DialTimeout + time.Second,
    }
    if l.firstKeepAliveTimeout == time.Second {
        l.firstKeepAliveTimeout = defaultTTL
    }

    l.stopCtx, l.stopCancel = context.WithCancel(context.Background())
    go l.recvKeepAliveLoop()
    go l.deadlineLoop()
    return l
}

func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
    cctx, cancel := context.WithCancel(ctx)
    done := cancelWhenStop(cancel, l.stopCtx.Done())
    defer close(done)

    for {
        r := &pb.LeaseGrantRequest{TTL: ttl}
        resp, err := l.remote.LeaseGrant(cctx, r)
        if err == nil {
            gresp := &LeaseGrantResponse{
                ResponseHeader: resp.GetHeader(),
                ID:             LeaseID(resp.ID),
                TTL:            resp.TTL,
                Error:          resp.Error,
            }
            return gresp, nil
        }
        if isHaltErr(cctx, err) {
            return nil, toErr(cctx, err)
        }
        if nerr := l.newStream(); nerr != nil {
            return nil, nerr
        }
    }
}

func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
    cctx, cancel := context.WithCancel(ctx)
    done := cancelWhenStop(cancel, l.stopCtx.Done())
    defer close(done)

    for {
        r := &pb.LeaseRevokeRequest{ID: int64(id)}
        resp, err := l.remote.LeaseRevoke(cctx, r)

        if err == nil {
            return (*LeaseRevokeResponse)(resp), nil
        }
        if isHaltErr(ctx, err) {
            return nil, toErr(ctx, err)
        }
        if nerr := l.newStream(); nerr != nil {
            return nil, nerr
        }
    }
}

func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) {
    cctx, cancel := context.WithCancel(ctx)
    done := cancelWhenStop(cancel, l.stopCtx.Done())
    defer close(done)

    for {
        r := toLeaseTimeToLiveRequest(id, opts...)
        resp, err := l.remote.LeaseTimeToLive(cctx, r, grpc.FailFast(false))
        if err == nil {
            gresp := &LeaseTimeToLiveResponse{
                ResponseHeader: resp.GetHeader(),
                ID:             LeaseID(resp.ID),
                TTL:            resp.TTL,
                GrantedTTL:     resp.GrantedTTL,
                Keys:           resp.Keys,
            }
            return gresp, nil
        }
        if isHaltErr(cctx, err) {
            return nil, toErr(cctx, err)
        }
    }
}

func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
    ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize)

    l.mu.Lock()
    ka, ok := l.keepAlives[id]
    if !ok {
        // create fresh keep alive
        ka = &keepAlive{
            chs:           []chan<- *LeaseKeepAliveResponse{ch},
            ctxs:          []context.Context{ctx},
            deadline:      time.Now().Add(l.firstKeepAliveTimeout),
            nextKeepAlive: time.Now(),
            donec:         make(chan struct{}),
        }
        l.keepAlives[id] = ka
    } else {
        // add channel and context to existing keep alive
        ka.ctxs = append(ka.ctxs, ctx)
        ka.chs = append(ka.chs, ch)
    }
    l.mu.Unlock()

    go l.keepAliveCtxCloser(id, ctx, ka.donec)

    return ch, nil
}

func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
    cctx, cancel := context.WithCancel(ctx)
    done := cancelWhenStop(cancel, l.stopCtx.Done())
    defer close(done)

    for {
        resp, err := l.keepAliveOnce(cctx, id)
        if err == nil {
            if resp.TTL == 0 {
                err = rpctypes.ErrLeaseNotFound
            }
            return resp, err
        }
        if isHaltErr(ctx, err) {
            return nil, toErr(ctx, err)
        }

        if nerr := l.newStream(); nerr != nil {
            return nil, nerr
        }
    }
}

func (l *lessor) Close() error {
    l.stopCancel()
    <-l.donec
    return nil
}

func (l *lessor) keepAliveCtxCloser(id LeaseID, ctx context.Context, donec <-chan struct{}) {
    select {
    case <-donec:
        return
    case <-l.donec:
        return
    case <-ctx.Done():
    }

    l.mu.Lock()
    defer l.mu.Unlock()

    ka, ok := l.keepAlives[id]
    if !ok {
        return
    }

    // close channel and remove context if still associated with keep alive
    for i, c := range ka.ctxs {
        if c == ctx {
            close(ka.chs[i])
            ka.ctxs = append(ka.ctxs[:i], ka.ctxs[i+1:]...)
            ka.chs = append(ka.chs[:i], ka.chs[i+1:]...)
            break
        }
    }
    // remove if no one more listeners
    if len(ka.chs) == 0 {
        delete(l.keepAlives, id)
    }
}

func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
    cctx, cancel := context.WithCancel(ctx)
    defer cancel()

    stream, err := l.remote.LeaseKeepAlive(cctx, grpc.FailFast(false))
    if err != nil {
        return nil, toErr(ctx, err)
    }

    err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
    if err != nil {
        return nil, toErr(ctx, err)
    }

    resp, rerr := stream.Recv()
    if rerr != nil {
        return nil, toErr(ctx, rerr)
    }

    karesp := &LeaseKeepAliveResponse{
        ResponseHeader: resp.GetHeader(),
        ID:             LeaseID(resp.ID),
        TTL:            resp.TTL,
    }
    return karesp, nil
}

func (l *lessor) recvKeepAliveLoop() {
    defer func() {
        l.mu.Lock()
        close(l.donec)
        for _, ka := range l.keepAlives {
            ka.Close()
        }
        l.keepAlives = make(map[LeaseID]*keepAlive)
        l.mu.Unlock()
    }()

    stream, serr := l.resetRecv()
    for serr == nil {
        resp, err := stream.Recv()
        if err != nil {
            if isHaltErr(l.stopCtx, err) {
                return
            }
            stream, serr = l.resetRecv()
            continue
        }
        l.recvKeepAlive(resp)
    }
}

// resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests
func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
    if err := l.newStream(); err != nil {
        return nil, err
    }
    stream := l.getKeepAliveStream()
    go l.sendKeepAliveLoop(stream)
    return stream, nil
}

// recvKeepAlive updates a lease based on its LeaseKeepAliveResponse
func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
    karesp := &LeaseKeepAliveResponse{
        ResponseHeader: resp.GetHeader(),
        ID:             LeaseID(resp.ID),
        TTL:            resp.TTL,
    }

    l.mu.Lock()
    defer l.mu.Unlock()

    ka, ok := l.keepAlives[karesp.ID]
    if !ok {
        return
    }

    if karesp.TTL <= 0 {
        // lease expired; close all keep alive channels
        delete(l.keepAlives, karesp.ID)
        ka.Close()
        return
    }

    // send update to all channels
    nextKeepAlive := time.Now().Add(1 + time.Duration(karesp.TTL/3)*time.Second)
    ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
    for _, ch := range ka.chs {
        select {
        case ch <- karesp:
            ka.nextKeepAlive = nextKeepAlive
        default:
        }
    }
}

// deadlineLoop reaps any keep alive channels that have not received a response
// within the lease TTL
func (l *lessor) deadlineLoop() {
    for {
        select {
        case <-time.After(time.Second):
        case <-l.donec:
            return
        }
        now := time.Now()
        l.mu.Lock()
        for id, ka := range l.keepAlives {
            if ka.deadline.Before(now) {
                // waited too long for response; lease may be expired
                ka.Close()
                delete(l.keepAlives, id)
            }
        }
        l.mu.Unlock()
    }
}

// sendKeepAliveLoop sends LeaseKeepAliveRequests for the lifetime of a lease stream
func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
    for {
        select {
        case <-time.After(500 * time.Millisecond):
        case <-stream.Context().Done():
            return
        case <-l.donec:
            return
        case <-l.stopCtx.Done():
            return
        }

        var tosend []LeaseID

        now := time.Now()
        l.mu.Lock()
        for id, ka := range l.keepAlives {
            if ka.nextKeepAlive.Before(now) {
                tosend = append(tosend, id)
            }
        }
        l.mu.Unlock()

        for _, id := range tosend {
            r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
            if err := stream.Send(r); err != nil {
                // TODO do something with this error?
                return
            }
        }
    }
}

func (l *lessor) getKeepAliveStream() pb.Lease_LeaseKeepAliveClient {
    l.mu.Lock()
    defer l.mu.Unlock()
    return l.stream
}

func (l *lessor) newStream() error {
    sctx, cancel := context.WithCancel(l.stopCtx)
    stream, err := l.remote.LeaseKeepAlive(sctx, grpc.FailFast(false))
    if err != nil {
        cancel()
        return toErr(sctx, err)
    }

    l.mu.Lock()
    defer l.mu.Unlock()
    if l.stream != nil && l.streamCancel != nil {
        l.stream.CloseSend()
        l.streamCancel()
    }

    l.streamCancel = cancel
    l.stream = stream
    return nil
}

func (ka *keepAlive) Close() {
    close(ka.donec)
    for _, ch := range ka.chs {
        close(ch)
    }
}

// cancelWhenStop calls cancel when the given stopc fires. It returns a done chan. done
// should be closed when the work is finished. When done fires, cancelWhenStop will release
// its internal resource.
func cancelWhenStop(cancel context.CancelFunc, stopc <-chan struct{}) chan<- struct{} {
    done := make(chan struct{}, 1)

    go func() {
        select {
        case <-stopc:
        case <-done:
        }
        cancel()
    }()

    return done
}
				
时间: 2024-10-13 23:25:08

lease.go的相关文章

Configure Windows Server 2008 based DHCP database cleanup interval & lease grace period

My Windows Server 2008 based DHCP server related settings: 这里,LeaseExtension设置为10 minutes,意味着默认4个小时的grace period缩短到10分钟.DatabaseCleanupInterval默认为60 minutes,即默认每小时执行一次DHCP database cleanup. Configure Windows Server 2008 based DHCP database cleanup in

sudo -u hdfs hdfs balancer出现异常 No lease on /system/balancer.id

16/06/02 20:34:05 INFO balancer.Balancer: namenodes = [hdfs://dlhtHadoop101:8022, hdfs://dlhtHadoop101:8020] 16/06/02 20:34:05 INFO balancer.Balancer: parameters = Balancer.Parameters[BalancingPolicy.Node, threshold=10.0, number of nodes to be exclud

统计dhcpd.lease IP 地址

#!/usr/bin/env python # coding=utf-8 import string alist=[] lease_IP=' ' lease_start=' ' lease_end=' ' istatus=' ' MAC=' ' client_Hostname=' '  f=open('/var/lib/dhcpd/dhcpd.leases') lines = f.readlines() f.close() for line in lines:     if line.find(

Centos 6.5 启动DHCP报错 Can&#39;t chown new lease file: Operation not permitted

在使用centos 6.5安装DHCP时,配置已经全部ok,但在启动服务时报错 message" Can't chown new lease file: Operation not permitted" 服务无法正常启动, 在6.5中多了一个配置要修改,如下: 编辑 /etc/rc.d/init.d/dhcpd 文件,将其中的 user=dhcpd group=dhcpd 改为 user=root group=root 注: 如果不做此修改,启动DHCP时在 "/var/lo

No lease on /目录: File does not exist. [Lease. Holder: DFSClient_NONMAPREDUCE_-2059237550_1, pendingcreates: 8]错误及解决方法

感觉程序员的世界真是一个变幻无常且精彩绝伦的世界,每次跑程序都会发现不一样的问题.今天跑MapReduce程序来统计邮箱次数时遇到了一个问题,明明一样的代码,别人能跑,我却跑不了.我相信,基本做这行的都遇到过这种问题.好了,话不多说,来聊聊今天的错误吧. 根据日志查看到报的错误为:No lease on /目录: File does not exist. [Lease.  Holder: DFSClient_NONMAPREDUCE_-2059237550_1, pendingcreates:

Powershell function to get all dhcp lease

前提是安装RSAT tool for win10,这样才有相应的powershell module 来管理DHCP 另外要有权限来查询DHCP服务器 加入下面代码到$profile 中 function get-dhcplease{ $dhcpservers=Get-DhcpServerInDC $dhcpscopes=$dhcpservers|%{$computername=$_.dnsname;Get-DhcpServerv4Scope -ComputerName $computername

CentOS系统搭建OpenVPN远程访问

修改2处(有效文件再此目录/usr/share/easy-rsa/2.0/:/etc/openvpn/checkpsw.sh 需要加X权限) openvpn是一个vpn工具,用于创建虚拟专用网络(Virtual Private Network)加密通道的免费开源软件,提供证书验证功能,也支持用户名密码认证登录方式,当然也支持两者合一,为服务器登录和连接提供更加安全的方式,可以在不同网络访问场所之间搭建类似于局域网的专用网络通道,配合特定的代理服务器,可用于访问特定受限网站(你懂得)或者突破内部网

Centos 7下IPV6 有状态DHCPV6配置

yum源安装kea yum install epel-release yum install kea DEMON ps aux| grep kea rpm -qa kea 查看dhcpv6默认配置 cat /etc/kea/kea.conf 启动kea-dhcpv6并查看kea启动过程 systemctl start kea-dhcp6 systemctl status kea-dhcp6 systemctl -l status kea-dhcp6 通过tcpdump查看服务器数据包交互过程 t

Java_Hbase Timeout issue

设置参数hbase.rpc.timeout <property><name>hbase.regionserver.lease.period</name><value>180000</value></property> <property> <name>zookeeper.session.timeout</name> <value>120000</value> </pro