txn.go

package clientv3

import (
    "sync"

    pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
    "golang.org/x/net/context"
    "google.golang.org/grpc"
)

// Txn is the interface that wraps mini-transactions.
//
//     Tx.If(
//      Compare(Value(k1), ">", v1),
//      Compare(Version(k1), "=", 2)
//     ).Then(
//      OpPut(k2,v2), OpPut(k3,v3)
//     ).Else(
//      OpPut(k4,v4), OpPut(k5,v5)
//     ).Commit()
//
type Txn interface {
    // If takes a list of comparison. If all comparisons passed in succeed,
    // the operations passed into Then() will be executed. Or the operations
    // passed into Else() will be executed.
    If(cs ...Cmp) Txn

    // Then takes a list of operations. The Ops list will be executed, if the
    // comparisons passed in If() succeed.
    Then(ops ...Op) Txn

    // Else takes a list of operations. The Ops list will be executed, if the
    // comparisons passed in If() fail.
    Else(ops ...Op) Txn

    // Commit tries to commit the transaction.
    Commit() (*TxnResponse, error)

    // TODO: add a Do for shortcut the txn without any condition?
}

type txn struct {
    kv  *kv
    ctx context.Context

    mu    sync.Mutex
    cif   bool
    cthen bool
    celse bool

    isWrite bool

    cmps []*pb.Compare

    sus []*pb.RequestOp
    fas []*pb.RequestOp
}

func (txn *txn) If(cs ...Cmp) Txn {
    txn.mu.Lock()
    defer txn.mu.Unlock()

    if txn.cif {
        panic("cannot call If twice!")
    }

    if txn.cthen {
        panic("cannot call If after Then!")
    }

    if txn.celse {
        panic("cannot call If after Else!")
    }

    txn.cif = true

    for i := range cs {
        txn.cmps = append(txn.cmps, (*pb.Compare)(&cs[i]))
    }

    return txn
}

func (txn *txn) Then(ops ...Op) Txn {
    txn.mu.Lock()
    defer txn.mu.Unlock()

    if txn.cthen {
        panic("cannot call Then twice!")
    }
    if txn.celse {
        panic("cannot call Then after Else!")
    }

    txn.cthen = true

    for _, op := range ops {
        txn.isWrite = txn.isWrite || op.isWrite()
        txn.sus = append(txn.sus, op.toRequestOp())
    }

    return txn
}

func (txn *txn) Else(ops ...Op) Txn {
    txn.mu.Lock()
    defer txn.mu.Unlock()

    if txn.celse {
        panic("cannot call Else twice!")
    }

    txn.celse = true

    for _, op := range ops {
        txn.isWrite = txn.isWrite || op.isWrite()
        txn.fas = append(txn.fas, op.toRequestOp())
    }

    return txn
}

func (txn *txn) Commit() (*TxnResponse, error) {
    txn.mu.Lock()
    defer txn.mu.Unlock()
    for {
        resp, err := txn.commit()
        if err == nil {
            return resp, err
        }
        if isHaltErr(txn.ctx, err) {
            return nil, toErr(txn.ctx, err)
        }
        if txn.isWrite {
            return nil, toErr(txn.ctx, err)
        }
    }
}

func (txn *txn) commit() (*TxnResponse, error) {
    r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}

    var opts []grpc.CallOption
    if !txn.isWrite {
        opts = []grpc.CallOption{grpc.FailFast(false)}
    }
    resp, err := txn.kv.remote.Txn(txn.ctx, r, opts...)
    if err != nil {
        return nil, err
    }
    return (*TxnResponse)(resp), nil
}
				
时间: 2024-10-13 14:52:50

txn.go的相关文章

Oracle 11gR2上遇到blocking txn id for DDL等待事件

本文是原创文章,转载请注明出处:http://blog.csdn.net/msdnchina/article/details/44726875 在最近处理的一个案例(11.2.0.4版本的oracle db)中,遇到了blocking txn id for DDL 这个等待事件. 下面来说一下当时的操作: create index idx_tab_1 on table_name (column_name) tablespace xxx online; 如上建立索引的操作用了17分钟零7秒才完成.

Windows Pre-commit hook for comment length Subversion

@echo off :: Stops commits that have empty log messages. @echo off setlocal rem Subversion sends through the path to the repository and transaction id set REPOS=%1 set TXN=%2 svnlook log %REPOS% -t %TXN% | findstr . > nul if %errorlevel% gtr 0 (goto

GoldenGate 日常监控

正确启动数据库 源端启动数据库 SQL>  startup 源端启动goldengate GGSCI >  start mgr GGSCI >  start * 目标端启动数据库 SQL>  startup 目标端启动goldengate GGSCI >  start mgr GGSCI >  start * 正确关闭数据库 源端关闭GoldenGate和数据库 源端关闭GoldenGate GGSCI >  info all                   

自定义Flume Sink:ElasticSearch Sink

Flume Sink的目的是从Flume Channel中获取数据然后输出到存储或者其他Flume Source中.Flume Agent启动的时候,它会为每一个Sink都启动一个SinkRunner的对象,SinkRunner.start()方法会启动一个新的线程去管理每一个Sink的生命周期.每一个Sink需要实现start().Stop()和process()方法.你可以在start方法中去初始化Sink的参数和状态,在stop方法中清理Sink的资源.最关键的是process方法,它将处

CloudStack核心类ApiServlet、ApiServer、ApiDispatcher、GenericDaoBase源码分析

ApiServlet 首先从整体上看下ApiServlet,Outline视图如下, 一.注意@Inject依赖的是javax.inject.jar,它和spring的@Autowired的区别在于使用它时变量不用生成相应的set方法. 二.CloudStack所有的请求都会被ApiSerlet拦截处理,进入到doGet()或者doPost()方法,然后统一交由processRequest()处理. 三.processRequestInContext()方法: 1.更多的是日志记录和异常信息处理

Oracle core02_数据块

数据更改 oracle core完成了oracle的核心功能,recovery,读一致性等. 深入的了解oracle的机制,就从一个最简单的更新开始.对于oracle来说,最大的一个特性就是写了两次数据: 写数据到数据文件中 写数据的变更日志到日志文件中 对于最常见的数据更新来说,oracle主要做了一下动作: 创建数据块变更的日志记录即 redo change vector 创建数据块的映像即undo record 创建undo数据块变更的日志记录 更新数据块 下面就以update为例:记录变

Oracle core03_ACID

ACID特性 oracle如何使用undo和redo来保证了关系数据库的ACID特性. ACID的特性简单描述为: Atomic:以事务为单位的原子性 Consistency:保证数据一致性 Isolation:不同事务之间的隔离性,未提交的事务对其它会话是不可见的 Durablity:提交的事务在系统失败的情况下是可恢复的 oracle使用了redo和undo的机制来完成ACID的特性. 1, Atomic: 当oracle更新数据的时候,会创建undo vector来保存数据的前映像.这样当

rtmplib rtmp协议过程分析

转自:http://chenzhenianqing.cn/articles/1009.html 写的很好,收藏如下,向作者致敬! 没事碰到了librtmp库,这个库是ffmpeg的依赖库,用来接收,发布RTMP协议格式的数据. 代码在这里:git clone git://git.ffmpeg.org/rtmpdump 先看一段通过librtmp.so库下载RTMP源发布的数据的例子,从rtmpdump中抽取出来.使用的大体流程如下: RTMP_Init主要就初始化了一下RTMP*rtmp变量的成

QBC查询、离线条件查询(DetachedCriteric)和分页查询模版

一.QBC检索步骤 QBC检索步骤: 1.调用Session的createCriteria()方法创建一个Criteria对象. 2.设定查询条件.Expression类提供了一系列用于设定查询条件的静态方法, 这些静态方法都返回Criterion实例,每个Criterion实例代表一个查询条件. Criteria的add()方法用于加入查询条件. 3.调用Criteria的list()方法执行查询语句.该方法返回List类型的查询结果,在 List集合中存放了符合查询条件的持久化对象. 比较运