使用Golang利用ectd实现一个分布式锁

http://blog.codeg.cn/post/blog/2016-02-24-distrubute-lock-over-etcd/

By zieckey · 2016年02月24日 · 1205 Words · ~3min reading time | 编辑这个页面 | Tags: Golang etcd 分布式

本文 http://blog.codeg.cn/post/blog/2016-02-24-distrubute-lock-over-etcd/ 是作者zieckey在研究和学习相关内容时所做的笔记,欢迎广大朋友指正和交流! 版权所有,欢迎转载和分享,但请保留此段声明。

etcd是随着CoreOS项目一起成长起来的,随着Golang和CoreOS等项目在开源社区日益火热, etcd作为一个高可用、强一致性的分布式Key-Value存储系统被越来越多的开发人员关注和使用。

这篇文章全方位介绍了etcd的应用场景,这里简单摘要如下:

  • 服务发现(Service Discovery)
  • 消息发布与订阅
  • 负载均衡
  • 分布式通知与协调
  • 分布式锁
  • 分布式队列
  • 集群监控与Leader竞选
  • 为什么用etcd而不用ZooKeeper

本文重点介绍如何利用ectd实现一个分布式锁。 锁的概念大家都熟悉,当我们希望某一事件在同一时间点只有一个线程(goroutine)在做,或者某一个资源在同一时间点只有一个服务能访问,这个时候我们就需要用到锁。 例如我们要实现一个分布式的id生成器,多台服务器之间的协调就非常麻烦。分布式锁就正好派上用场。

其基本实现原理为:

  1. 在ectd系统里创建一个key
  2. 如果创建失败,key存在,则监听该key的变化事件,直到该key被删除,回到1
  3. 如果创建成功,则认为我获得了锁

具体代码如下:

package etcdsync

import (
	"fmt"
	"io"
	"os"
	"sync"
	"time"

	"github.com/coreos/etcd/client"
	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
)

const (
	defaultTTL = 60
	defaultTry = 3
	deleteAction = "delete"
	expireAction = "expire"
)

// A Mutex is a mutual exclusion lock which is distributed across a cluster.
type Mutex struct {
	key    string
	id     string // The identity of the caller
	client client.Client
	kapi   client.KeysAPI
	ctx    context.Context
	ttl    time.Duration
	mutex  *sync.Mutex
	logger io.Writer
}

// New creates a Mutex with the given key which must be the same
// across the cluster nodes.
// machines are the ectd cluster addresses
func New(key string, ttl int, machines []string) *Mutex {
	cfg := client.Config{
		Endpoints:               machines,
		Transport:               client.DefaultTransport,
		HeaderTimeoutPerRequest: time.Second,
	}

	c, err := client.New(cfg)
	if err != nil {
		return nil
	}

	hostname, err := os.Hostname()
	if err != nil {
		return nil
	}

	if len(key) == 0 || len(machines) == 0 {
		return nil
	}

	if key[0] != ‘/‘ {
		key = "/" + key
	}

	if ttl < 1 {
		ttl = defaultTTL
	}

	return &Mutex{
		key:    key,
		id:     fmt.Sprintf("%v-%v-%v", hostname, os.Getpid(), time.Now().Format("20060102-15:04:05.999999999")),
		client: c,
		kapi:   client.NewKeysAPI(c),
		ctx: context.TODO(),
		ttl: time.Second * time.Duration(ttl),
		mutex:  new(sync.Mutex),
	}
}

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() (err error) {
	m.mutex.Lock()
	for try := 1; try <= defaultTry; try++ {
		if m.lock() == nil {
			return nil
		}

		m.debug("Lock node %v ERROR %v", m.key, err)
		if try < defaultTry {
			m.debug("Try to lock node %v again", m.key, err)
		}
	}
	return err
}

func (m *Mutex) lock() (err error) {
	m.debug("Trying to create a node : key=%v", m.key)
	setOptions := &client.SetOptions{
		PrevExist:client.PrevNoExist,
		TTL:      m.ttl,
	}
	resp, err := m.kapi.Set(m.ctx, m.key, m.id, setOptions)
	if err == nil {
		m.debug("Create node %v OK [%q]", m.key, resp)
		return nil
	}
	m.debug("Create node %v failed [%v]", m.key, err)
	e, ok := err.(client.Error)
	if !ok {
		return err
	}

	if e.Code != client.ErrorCodeNodeExist {
		return err
	}

	// Get the already node‘s value.
	resp, err = m.kapi.Get(m.ctx, m.key, nil)
	if err != nil {
		return err
	}
	m.debug("Get node %v OK", m.key)
	watcherOptions := &client.WatcherOptions{
		AfterIndex : resp.Index,
		Recursive:false,
	}
	watcher := m.kapi.Watcher(m.key, watcherOptions)
	for {
		m.debug("Watching %v ...", m.key)
		resp, err = watcher.Next(m.ctx)
		if err != nil {
			return err
		}

		m.debug("Received an event : %q", resp)
		if resp.Action == deleteAction || resp.Action == expireAction {
			return nil
		}
	}

}

// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() (err error) {
	defer m.mutex.Unlock()
	for i := 1; i <= defaultTry; i++ {
		var resp *client.Response
		resp, err = m.kapi.Delete(m.ctx, m.key, nil)
		if err == nil {
			m.debug("Delete %v OK", m.key)
			return nil
		}
		m.debug("Delete %v falied: %q", m.key, resp)
		e, ok := err.(client.Error)
		if ok && e.Code == client.ErrorCodeKeyNotFound {
			return nil
		}
	}
	return err
}

func (m *Mutex) debug(format string, v ...interface{}) {
	if m.logger != nil {
		m.logger.Write([]byte(m.id))
		m.logger.Write([]byte(" "))
		m.logger.Write([]byte(fmt.Sprintf(format, v...)))
		m.logger.Write([]byte("\n"))
	}
}

func (m *Mutex) SetDebugLogger(w io.Writer) {
	m.logger = w
}

其实类似的实现有很多,但目前都已经过时,使用的都是被官方标记为deprecated的项目。且大部分接口都不如上述代码简单。 使用上,跟Golang官方sync包的Mutex接口非常类似,先New(),然后调用Lock(),使用完后调用Unlock(),就三个接口,就是这么简单。示例代码如下:

package main

import (
	"github.com/zieckey/etcdsync"
	"log"
)

func main() {
	//etcdsync.SetDebug(true)
	log.SetFlags(log.Ldate|log.Ltime|log.Lshortfile)
	m := etcdsync.New("/etcdsync", "123", []string{"http://127.0.0.1:2379"})
	if m == nil {
		log.Printf("etcdsync.NewMutex failed")
	}
	err := m.Lock()
	if err != nil {
		log.Printf("etcdsync.Lock failed")
	} else {
		log.Printf("etcdsync.Lock OK")
	}

	log.Printf("Get the lock. Do something here.")

	err = m.Unlock()
	if err != nil {
		log.Printf("etcdsync.Unlock failed")
	} else {
		log.Printf("etcdsync.Unlock OK")
	}
}

参考

  1. etcdsync项目地址
  2. ectd项目官方地址
时间: 2024-10-28 06:06:29

使用Golang利用ectd实现一个分布式锁的相关文章

自己实现的一个分布式锁的工具以及后面的一些计划

https://github.com/ruanjianlxm/distributedLock 顶上的链接是我自己简单实现的一个分布式锁的工具,目前只支持基于zookeeper.功能也不太完善,准备后期优化下. 借着各个工具的代码梳理下一些简单的架构应该如何去设计,在哪些位置应该捕获异常,哪些异常应该抛出.以及如何去封装与继承. 同时: 1.1个版本将会优化各个地方的异常处理情况,以及加上一些抽象与继承. 1.2版本加上基于redis的分布式锁的实现. 1.3版本打包成工具类.并且完善各种异常情况

spring boot 利用redisson实现redis的分布式锁

原文:http://liaoke0123.iteye.com/blog/2375469 利用redis实现分布式锁,网上搜索的大部分是使用java jedis实现的. redis官方推荐的分布式锁实现为redisson http://ifeve.com/redis-lock/ 以下为spring boot实现分布式锁的步骤 项目pom中需要添加官方依赖 我是1.8JDK固为 Pom代码   <!-- redisson --> <dependency> <groupId>

利用Redisson实现分布式锁及其底层原理解析

Redis介绍 参考地址:https://blog.csdn.net/turbo_zone/article/details/83422215 redis是一个key-value存储系统.和Memcached类似,它支持存储的value类型相对更多,包括string(字符串).list(链表).set(集合).zset(sorted set --有序集合)和hash(哈希类型).这些数据类型都支持push/pop.add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的.在此

聊一聊分布式锁的设计

起因 前段时间,看到redis作者发布的一篇文章<Is Redlock safe?>,Redlock是redis作者基于redis设计的分布式锁的算法.文章起因是有一位分布式的专家写了一篇文章<How to do distributed locking>,质疑Redlock的正确性.redis作者则在<Is Redlock safe?>文章中给予回应,一来一回甚是精彩.文本就为读者一一解析两位专家的争论. 在了解两位专家的争论前,让我先从我了解的分布式锁一一道来.文章中

zookeeper 分布式锁原理:

1 大家也许都很熟悉了多个线程或者多个进程间的共享锁的实现方式了,但是在分布式场景中我们会面临多个Server之间的锁的问题,实现的复杂度比较高.利用基于google chubby原理开发的开源的zookeeper,可以使得这个问题变得简单很多.下面介绍几种可能的实现方式,并且对比每种实现方式的优缺点. 1. 利用节点名称的唯一性来实现共享锁 ZooKeeper抽象出来的节点结构是一个和unix文件系统类似的小型的树状的目录结构.ZooKeeper机制规定:同一个目录下只能有一个唯一的文件名.例

基于Redis实现分布式锁-Redisson使用及源码分析

在分布式场景下,有很多种情况都需要实现最终一致性.在设计远程上下文的领域事件的时候,为了保证最终一致性,在通过领域事件进行通讯的方式中,可以共享存储(领域模型和消息的持久化数据源),或者做全局XA事务(两阶段提交,数据源可分开),也可以借助消息中间件(消费者处理需要能幂等).通过Observer模式来发布领域事件可以提供很好的高并发性能,并且事件存储也能追溯更小粒度的事件数据,使各个应用系统拥有更好的自治性. 本文主要探讨另外一种实现分布式最终一致性的解决方案--采用分布式锁.基于分布式锁的解决

分布式锁的实现方式

目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题.分布式的CAP理论告诉我们“任何一个分布式系统都无法同时满足一致性(Consistency).可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项.”所以,很多系统在设计之初就要对这三者做出取舍.在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证“最终一致性”,只要这个最终时间是在用户可以接受的范围内即

分布式锁的实现概述

做惯了讲究响应速度的微小化web服务,当有人给我讲分布式锁时深刻怀疑说这个名词的哥们要么准备给我挖坑,要么自己把架构玩脱了已经掉进了坑里.这个东西虽然常见,但是稍有不慎就会掉坑里出不来. 系统做的越多现在越来越害怕那种千钧一发的系统,动辄每秒单例服务响应web业务请求百万上下,这样的实现功力确实佩服,就是不知道服务宕机时的瞬时数据在恢复时能不能达到同样的速度了.从概率上来讲,服务器越多,发生问题的概率越小,再加上自己常人天资,最希望的就是一台服务器只处理一个请求-------当然这种开发富贵病也

从构建分布式秒杀系统聊聊分布式锁

前言 最近懒成一坨屎,学不动系列一波接一波,大多还都是底层原理相关的.上周末抽时间重读了周志明大湿的 JVM 高效并发部分,每读一遍都有不同的感悟.路漫漫,借此,把前段时间搞着玩的秒杀案例中的分布式锁深入了解一下. 案例介绍 在尝试了解分布式锁之前,大家可以想象一下,什么场景下会使用分布式锁? 单机应用架构中,秒杀案例使用ReentrantLcok或者synchronized来达到秒杀商品互斥的目的.然而在分布式系统中,会存在多台机器并行去实现同一个功能.也就是说,在多进程中,如果还使用以上JD