基于2PC和延迟队列的分布式事务实现

背景

分布式多消息事务问题

 在消息队列使用场景中,有时需要同时下发多条消息,但现在的消息队列比如kafka只支持单条消息的事务保证,不能保证多条消息,今天说的这个方案就时kafka内部的一个子项目中基于2PC和延迟更新来实现分布式事务

2PC

 2PC俗称两阶段提交,通过将一个操作分为两个阶段:准备阶段和提交阶段来尽可能保证操作的原子执行(实际上不可能,大家有个概念先)

延迟更新

 延迟更新其实是一个很常用的技术手段,简单来说,当某个操作条件不满足时,通过一定手段将数据暂存,等条件满足时在进行执行

基于2PC和延迟队列的分布式事务实现

系统架构

 实现也蛮简单的, 在原来的业务消息之后再添加一条事务消息(事务消息可以通过类似唯一ID来关联到之前提交的消息), worker未消费到事物提交的消息,就会一直将消息放在本地延迟存储中,只有当接收到事物提交消息,才会进行业务逻辑处理

业务流程

生产者

  1. 逐条发送业务消息组
  2. 发送事务提交消息

消费者

  1. 消费消息队列,将业务消息存放本地延迟存储
  2. 接收提交事务消息,从本地延迟存储获取所有数据,然后从延迟存储中删除该消息

代码实现

核心组件

 MemoryQuue: 用于模拟消息队列,接收事件分发事件 Worker: 模拟具体业务服务,接收消息,存入本地延迟更新存储,或者提交事务触发业务回调

Event与EventListener

Event: 用于标识事件,用户将业务数据封装成事件存入到MemoryQueue中 EventListener: 事件回调接口,用于MemoryQueue接收到数据后的回调 事件在发送的时候,需要通过一个前缀来进行事件类型标识,这里有三种TaskPrefix、CommitTaskPrefix、ClearTaskPrefix

const (
	// TaskPrefix 任务key前缀
	TaskPrefix string = "task-"
	// CommitTaskPrefix 提交任务key前缀
	CommitTaskPrefix string =www.qlincheng.cn    "commit-"
	// ClearTaskPrefix 清除任务
	ClearTaskPrefix string = "clear-"
)

// Event 事件类型
type Event struct {
	Key   string
	Name  string
	Value interface{}
}

// EventListener 用于接收消息回调
type EventListener interface {
	onEvent(event www.shengrenyp.cn*Event)
}

MemoryQueue

MemoryQueue内存消息队列,通过Push接口接收用户数据,通过AddListener来注册EventListener, 同时内部通过poll来从chan event取出数据分发给所有的Listener

// MemoryQueue 内存消息队列
type MemoryQueue struct {
	done      chan struct{www.baichuangyul.com}
	queue     chan Event
	listeners []EventListener
	wg        sync.WaitGroup
}

// Push 添加数据
func (mq *MemoryQueue) Push(eventType, name string, value interface{}) {
	mq.queue <- Event{Key: eventType + name, Name: name, Value: value}
	mq.wg.Add(1)
}

// AddListener 添加监听器
func (mq *MemoryQueue) AddListener(listener EventListener) bool {
	for _, item := range mq.listeners {
		if item == listener www.chengmingyuLe.com{
			return false
		}
	}
	mq.listeners = append(mq.www.shengrenpt.com listeners, listener)
	return true
}

// Notify 分发消息
func (mq *MemoryQueue) Notify(event *Event) {
	defer mq.wg.Done()
	for _, listener := range mq.listeners {
		listener.onEvent(event)
	}
}

func (mq *MemoryQueue) poll() {
	for {
		select {
		case <-mq.done:
			break
		case event := <-mq.queue:
			mq.Notify(&event)
		}
	}
}

// Start 启动内存队列
func (mq *MemoryQueue) Start() {
	go mq.poll()
}

// Stop 停止内存队列
func (mq *MemoryQueue) Stop() {
	mq.wg.Wait()
	close(mq.done)
}
 

Worker

Worker接收MemoryQueue里面的数据,然后在本地根据不同类型来进行对应事件事件类型处理, 主要是通过事件的前缀来进行对应事件回调函数的选择


// Worker 工作进程
type Worker struct {
	name                string
	deferredTaskUpdates map[string][]Task
	onCommit            ConfigUpdateCallback
}

func (w *Worker) onEvent(event *Event) {
	switch {
	// 获取任务事件
	case strings.Contains(event.Key, TaskPrefix):
		w.onTaskEvent(event)
		// 清除本地延迟队列里面的任务
	case strings.Contains(event.Key, ClearTaskPrefix):
		w.onTaskClear(event)
		// 获取commit事件
	case strings.Contains(event.Key, CommitTaskPrefix):
		w.onTaskCommit(event)
	}
}

事件处理任务

事件处理任务主要分为:onTaskClear(从本地清楚该数据)、onTaskEvent(数据存储本地延迟存储进行暂存)、onTaskCommit(事务提交)

func (w *Worker) onTaskClear(event *Event) {
	task, err := event.Value.(Task)
	if !err {
		// log
		return
	}
	_, found := w.deferredTaskUpdates[task.Group]
	if !found {
		return
	}
	delete(w.deferredTaskUpdates, task.Group)
	// 还可以继续停止本地已经启动的任务
}

// onTaskCommit 接收任务提交, 从延迟队列中取出数据然后进行业务逻辑处理
func (w *Worker) onTaskCommit(event *Event) {
	// 获取之前本地接收的所有任务
	tasks, found := w.deferredTaskUpdates[event.Name]
	if !found {
		return
	}

	// 获取配置
	config := w.getTasksConfig(tasks)
	if w.onCommit != nil {
		w.onCommit(www.shengryll.com config)
	}
	delete(w.deferredTaskUpdates, event.Name)
}

// onTaskEvent 接收任务数据,此时需要丢到本地暂存不能进行应用
func (w *Worker) onTaskEvent(event *Event) {
	task, err := event.Value.(Task)
	if !err {
		// log
		return
	}

	// 保存任务到延迟更新map
	configs, found := w.deferredTaskUpdates[task.Group]
	if !found {
		configs = make([]Task, 0)
	}
	configs = append(configs, task)
	w.deferredTaskUpdates[task.Group] = configs
}

// getTasksConfig 获取task任务列表
func (w *Worker) getTasksConfig(tasks []Task) map[string]string {
	config := make(map[string]string)
	for _, t := range tasks {
		config = t.updateConfig(config)
	}
	return config
}
 

主流程

unc main() {

	// 生成一个内存队列启动
	queue := NewMemoryQueue(10)
	queue.Start()

	// 生成一个worker
	name := "test"
	worker := NewWorker(name, func(data map[string]string) {
		for key, value := range data {
			println("worker get task key: " + key + " value: " + value)
		}
	})
	// 注册到队列中
	queue.AddListener(worker)

	taskName := "test"
	// events 发送的任务事件
	configs := []map[string]string{
		map[string]string{"task1": "SendEmail", "params1": "Hello world"},
		map[string]string{"task2": "SendMQ", "params2": "Hello world"},
	}

	// 分发任务
	queue.Push(ClearTaskPrefix, taskName, nil)
	for _, conf := range configs {
		queue.Push(TaskPrefix, taskName, Task{Name: taskName, Group: taskName, Config: conf})
	}
	queue.Push(CommitTaskPrefix, taskName, nil)
	// 停止队列
	queue.Stop()
}

输出

# go run main.go
worker get task key: params1 value: Hello world
worker get task key: task1 value: SendEmail
worker get task key: params2 value: Hello world
worker get task key: task2 value: SendMQ

总结

在分布式环境中,很多时候并不需要使用CP模型,更多时候是满足最终一致性即可

基于2PC和延迟队列的这种设计,主要是依赖于事件驱动的架构

在kafka connect中, 每次节点变化都会触发一次任务的重分配,所以延迟存储直接用的就是内存中的HashMap, 因为即使分配消息的主节点挂了,那就再触发一次事件,直接将HashMap里面的数据清掉,进行下一次事务即可,并不需要保证延迟存储里面的数据不丢,

所以方案因环境、需求不同,可以做一些取舍,没必要什么东西都去加一个CP模型的中间件进来,当然其实那样更简单

未完待续!

原文地址:https://www.cnblogs.com/qwangxiao/p/10888066.html

时间: 2024-08-02 01:28:11

基于2PC和延迟队列的分布式事务实现的相关文章

使用kafka消息队列解决分布式事务

微服务框架Spring Cloud介绍 Part1: 使用事件和消息队列实现分布式事务 本文转自:http://skaka.me/blog/2016/04/21/springcloud1/ 不同于单一架构应用(Monolith), 分布式环境下, 进行事务操作将变得困难, 因为分布式环境通常会有多个数据源, 只用本地数据库事务难以保证多个数据源数据的一致性. 这种情况下, 可以使用两阶段或者三阶段提交协议来完成分布式事务.但是使用这种方式一般来说性能较差, 因为事务管理器需要在多个数据源之间进行

基于spring+mybatis+atomikos+jta实现分布式事务(2)-动态切换数据源

本文介绍基于spring+mybatis+atomikos+jta实现分布式事务,由程序动态切换数据源,通过atomikos可实现分布式事务一致性. 版本:spring-3.2.9.RELEASE.mybatis-3.4.4.atomikos-4.0.5.jdk1.8 1,maven配置文件pom.xml如下: <!-- test --> <dependency> <groupId>junit</groupId> <artifactId>juni

使用消息队列规避分布式事务问题

前阵子从支付宝转账10000元到余额宝,这是日常生活的一件普通小事,但作为互联网研发人员的职业病,我就思考支付宝扣除1万之后,如果系统挂掉怎么办,这时余额宝账户并没有增加10000,数据就会出现不一致状况了.这样的场景在各个类型的系统中都能找到相似的影子,比如在电商系统中,当有用户下单后,除了在订单表插入一条记录外,对应商品表的这个商品数量也必须减1:在搜索广告系统中,当用户点击某广告后,除了在点击事件表中增加一条记录外,还得去商家账户表中找到这个商家并扣除广告费等等,相信大家或多或多少都能碰到

使用事件和消息队列实现分布式事务(转+补充)

虽然本文并非笔者原创,但是我们在非强依赖的事务中原理上也是采用这种方式处理的,不过因为没有仔细去总结,最近在整理和总结时看到了,故转载并做部分根据我们实际情况的完善和补充. 不同于单一架构应用(Monolith), 分布式环境下, 进行事务操作将变得困难, 因为分布式环境通常会有多个数据源, 只用本地数据库事务难以保证多个数据源数据的一致性. 这种情况下, 可以使用两阶段或者三阶段提交协议来完成分布式事务.但是使用这种方式一般来说性能较差, 因为事务管理器需要在多个数据源之间进行多次等待. 有一

使用事件和消息队列实现分布式事务

原文:http://skaka.me/blog/2016/04/21/springcloud1/ 不同于单一架构应用(Monolith), 分布式环境下, 进行事务操作将变得困难, 因为分布式环境通常会有多个数据源, 只用本地数据库事务难以保证多个数据源数据的一致性. 这种情况下, 可以使用两阶段或者三阶段提交协议来完成分布式事务.但是使用这种方式一般来说性能较差, 因为事务管理器需要在多个数据源之间进行多次等待. 有一种方法同样可以解决分布式事务问题, 并且性能较好, 这就是我这篇文章要介绍的

微服务框架Spring Cloud之使用事件和消息队列实现分布式事务

不同于单一架构应用(Monolith), 分布式环境下, 进行事务操作将变得困难, 因为分布式环境通常会有多个数据源, 只用本地数据库事务难以保证多个数据源数据的一致性. 这种情况下, 可以使用两阶段或者三阶段提交协议来完成分布式事务.但是使用这种方式一般来说性能较差, 因为事务管理器需要在多个数据源之间进行多次等待. 有一种方法同样可以解决分布式事务问题, 并且性能较好, 这就是我这篇文章要介绍的使用事件,本地事务以及消息队列来实现分布式事务. 我们从一个简单的实例入手. 基本所有互联网应用都

【分布式事务】ACID/BASE/CAP + TCC/2PC/Soga/....

事务的具体定义 事务提供一种机制将一个活动涉及的所有操作纳入到一个不可分割的执行单元,组成事务的所有操作只有在所有操作均能正常执行的情况下方能提交,只要其中任一操作执行失败,都将导致整个事务的回滚.简单地说,事务提供一种“要么什么都不做,要么做全套(All or Nothing)”机制. 数据库本地事务 ACID 数据库事务中的四大特性 A:原子性(Atomicity) 一个事务(transaction)中的所有操作,要么全部完成,要么全部不完成,不会结束在中间某个环节.事务在执行过程中发生错误

Percolator:基于BigTable的分布式事务实现

Google为了解决网页索引的增量处理,以及维护数据表和索引表的一致性问题,基于BigTable实现了一个支持分布式事务的存储系统.这里重点讨论这个系统的分布式事务实现,不讨论percolator中为了支持增量计算而实现的Notifications机制. 该系统基于BigTable,支持snapshot isolation隔离级别,这个隔离级别不在ANSI定义的隔离级别范围内.简单来说,就是一个事务看到的是一个stable的数据库的快照.快照隔离相对于可串行化隔离级别的优点是更高的读性能,不需要

分布式事务(1)---2PC和3PC理论

分布式事务(1)---2PC和3PC理论 分布式事物基本理论:基本遵循CPA理论,采用柔性事物特征,软状态或者最终一致性特点保证分布式事物一致性问题. 分布式事物常见解决方案: 2PC两段提交协议 3PC三段提交协议(弥补两端提交协议缺点) TCC或者GTS(阿里) 消息中间件最终一致性 使用LCN解决分布式事物,理念"LCN并不生产事务,LCN只是本地事务的搬运工". 一.两阶段提交(2PC) 两阶段提交又称2PC,2PC是一个非常经典的强一致.中心化的原子提交协议. 这里所说的中心