stm.go

package concurrency

import (
    v3 "github.com/coreos/etcd/clientv3"
    "golang.org/x/net/context"
)

// STM is an interface for software transactional memory.
type STM interface {
    // Get returns the value for a key and inserts the key in the txn‘s read set.
    // If Get fails, it aborts the transaction with an error, never returning.
    Get(key string) string
    // Put adds a value for a key to the write set.
    Put(key, val string, opts ...v3.OpOption)
    // Rev returns the revision of a key in the read set.
    Rev(key string) int64
    // Del deletes a key.
    Del(key string)

    // commit attempts to apply the txn‘s changes to the server.
    commit() *v3.TxnResponse
    reset()
}

// stmError safely passes STM errors through panic to the STM error channel.
type stmError struct{ err error }

// NewSTMRepeatable initiates new repeatable read transaction; reads within
// the same transaction attempt always return the same data.
func NewSTMRepeatable(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error) {
    s := &stm{client: c, ctx: ctx, getOpts: []v3.OpOption{v3.WithSerializable()}}
    return runSTM(s, apply)
}

// NewSTMSerializable initiates a new serialized transaction; reads within the
// same transactiona attempt return data from the revision of the first read.
func NewSTMSerializable(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error) {
    s := &stmSerializable{
        stm:      stm{client: c, ctx: ctx},
        prefetch: make(map[string]*v3.GetResponse),
    }
    return runSTM(s, apply)
}

// NewSTMReadCommitted initiates a new read committed transaction.
func NewSTMReadCommitted(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error) {
    s := &stmReadCommitted{stm{client: c, ctx: ctx, getOpts: []v3.OpOption{v3.WithSerializable()}}}
    return runSTM(s, apply)
}

type stmResponse struct {
    resp *v3.TxnResponse
    err  error
}

func runSTM(s STM, apply func(STM) error) (*v3.TxnResponse, error) {
    outc := make(chan stmResponse, 1)
    go func() {
        defer func() {
            if r := recover(); r != nil {
                e, ok := r.(stmError)
                if !ok {
                    // client apply panicked
                    panic(r)
                }
                outc <- stmResponse{nil, e.err}
            }
        }()
        var out stmResponse
        for {
            s.reset()
            if out.err = apply(s); out.err != nil {
                break
            }
            if out.resp = s.commit(); out.resp != nil {
                break
            }
        }
        outc <- out
    }()
    r := <-outc
    return r.resp, r.err
}

// stm implements repeatable-read software transactional memory over etcd
type stm struct {
    client *v3.Client
    ctx    context.Context
    // rset holds read key values and revisions
    rset map[string]*v3.GetResponse
    // wset holds overwritten keys and their values
    wset map[string]stmPut
    // getOpts are the opts used for gets
    getOpts []v3.OpOption
}

type stmPut struct {
    val string
    op  v3.Op
}

func (s *stm) Get(key string) string {
    if wv, ok := s.wset[key]; ok {
        return wv.val
    }
    return respToValue(s.fetch(key))
}

func (s *stm) Put(key, val string, opts ...v3.OpOption) {
    s.wset[key] = stmPut{val, v3.OpPut(key, val, opts...)}
}

func (s *stm) Del(key string) { s.wset[key] = stmPut{"", v3.OpDelete(key)} }

func (s *stm) Rev(key string) int64 {
    if resp := s.fetch(key); resp != nil && len(resp.Kvs) != 0 {
        return resp.Kvs[0].ModRevision
    }
    return 0
}

func (s *stm) commit() *v3.TxnResponse {
    txnresp, err := s.client.Txn(s.ctx).If(s.cmps()...).Then(s.puts()...).Commit()
    if err != nil {
        panic(stmError{err})
    }
    if txnresp.Succeeded {
        return txnresp
    }
    return nil
}

// cmps guards the txn from updates to read set
func (s *stm) cmps() []v3.Cmp {
    cmps := make([]v3.Cmp, 0, len(s.rset))
    for k, rk := range s.rset {
        cmps = append(cmps, isKeyCurrent(k, rk))
    }
    return cmps
}

func (s *stm) fetch(key string) *v3.GetResponse {
    if resp, ok := s.rset[key]; ok {
        return resp
    }
    resp, err := s.client.Get(s.ctx, key, s.getOpts...)
    if err != nil {
        panic(stmError{err})
    }
    s.rset[key] = resp
    return resp
}

// puts is the list of ops for all pending writes
func (s *stm) puts() []v3.Op {
    puts := make([]v3.Op, 0, len(s.wset))
    for _, v := range s.wset {
        puts = append(puts, v.op)
    }
    return puts
}

func (s *stm) reset() {
    s.rset = make(map[string]*v3.GetResponse)
    s.wset = make(map[string]stmPut)
}

type stmSerializable struct {
    stm
    prefetch map[string]*v3.GetResponse
}

func (s *stmSerializable) Get(key string) string {
    if wv, ok := s.wset[key]; ok {
        return wv.val
    }
    firstRead := len(s.rset) == 0
    if resp, ok := s.prefetch[key]; ok {
        delete(s.prefetch, key)
        s.rset[key] = resp
    }
    resp := s.stm.fetch(key)
    if firstRead {
        // txn‘s base revision is defined by the first read
        s.getOpts = []v3.OpOption{
            v3.WithRev(resp.Header.Revision),
            v3.WithSerializable(),
        }
    }
    return respToValue(resp)
}

func (s *stmSerializable) Rev(key string) int64 {
    s.Get(key)
    return s.stm.Rev(key)
}

func (s *stmSerializable) gets() ([]string, []v3.Op) {
    keys := make([]string, 0, len(s.rset))
    ops := make([]v3.Op, 0, len(s.rset))
    for k := range s.rset {
        keys = append(keys, k)
        ops = append(ops, v3.OpGet(k))
    }
    return keys, ops
}

func (s *stmSerializable) commit() *v3.TxnResponse {
    keys, getops := s.gets()
    txn := s.client.Txn(s.ctx).If(s.cmps()...).Then(s.puts()...)
    // use Else to prefetch keys in case of conflict to save a round trip
    txnresp, err := txn.Else(getops...).Commit()
    if err != nil {
        panic(stmError{err})
    }
    if txnresp.Succeeded {
        return txnresp
    }
    // load prefetch with Else data
    for i := range keys {
        resp := txnresp.Responses[i].GetResponseRange()
        s.rset[keys[i]] = (*v3.GetResponse)(resp)
    }
    s.prefetch = s.rset
    s.getOpts = nil
    return nil
}

type stmReadCommitted struct{ stm }

// commit always goes through when read committed
func (s *stmReadCommitted) commit() *v3.TxnResponse {
    s.rset = nil
    return s.stm.commit()
}

func isKeyCurrent(k string, r *v3.GetResponse) v3.Cmp {
    rev := r.Header.Revision + 1
    if len(r.Kvs) != 0 {
        rev = r.Kvs[0].ModRevision + 1
    }
    return v3.Compare(v3.ModRevision(k), "<", rev)
}

func respToValue(resp *v3.GetResponse) string {
    if len(resp.Kvs) == 0 {
        return ""
    }
    return string(resp.Kvs[0].Value)
}
				
时间: 2024-11-11 13:13:18

stm.go的相关文章

STM 软件事务内存——本质是为提高并发,通过事务来管理内存的读写访问以避免锁的使用

对Java程序员来说,我们对面向对象的编程(OOP)自然都是烂熟于胸的,但语言也极大地影响了我们构建面向对象应用程序的方式.(现在的OOP已经和Alan Kay当初创造这个词时候的初衷大不相同了,他的主要思想是采用消息传递并消灭所有状态数据(他认为,系统是由一些类似于生物细胞那样的对象构成的,这些对象通过消息传递进行通信,且无需持有任何状态)--go语言) 对于Java程序员来说,当我们顺着指针或引用找到某个实例的时候,实际上是登录到了持有其状态的一块内存上,于是在那个位置上操纵数据也就成了自然

理解Clojure STM 软件事务性内存

翻译说明: 英文原文来自:http://java.ociweb.com/mark/stm/article.html 原文包含了一些非STM的知识,也包括STM底层实现的内容,这里只是翻译了STM抽象层的内容,自认为这部分比较重要. 翻译是基于自己能够理解的方式翻译的,并非逐句翻译,目的是理解STM,理解如何调优STM,有逐句翻译强迫症的同学请不要喷我! 本人是在学习<Clojure编程乐趣>的"压力之下的 Ref"章节,遇到无法理解minHistory和maxHistory

arm汇编:ldr,str,ldm,stm,伪指令ldr

ldr,str,ldm,stm的命名规律: 这几个指令命名看起来不易记住,现在找找规律. 指令 样本 效果 归纳名称解释 ldr Rd,addressing ldr r1,[r0] addressing to Rd [mem to reg] load to register str Rd,addressing str r1,[r0] Rd ro addressing [reg to mem] store register ldm Rn,reglist ldmfd sp!,{r0-r7,pc} *

ARM LDR/STR, LDM/STM 指令

这里比较下容易混淆的四条指令,已经在这4条指令的混淆上花费了很多精力,现在做个小结,LDR,STR,LDM,STM这四条指令, 关于LDM和STM的说明,见另外一个说明文件,说明了这两个文件用于栈操作时的注意事项. (1)LDR:L表示LOAD,LOAD的含义应该理解为:Load from memory into register.下面这条语句就说明的很清楚: LDR   R1,     [R2] ; R1<——[R2] 就是把R2所指向的存储单元的内容的值(一个memory地址内的值),读取到

STM LDM汇编批量加载指令的错误用法

一.背景 在调试程序过程中,遇到一个问题,程序入栈出栈后,总是跑飞,用gdb调试,发现入栈操作后,查看入栈后的数据在出栈时不对应. 二.分析内容 1. STMFD   SP!,    {R0-R12, R1} 以前理解是,先入栈R1,然后在依次递减入栈R12--->R0, 理论上 一共入栈14个数据 反编译后,发现竟然少了一个,入栈13个数据 反编译的汇编如下: 23e062fc: e92d1fff push {r0, r1, r2, r3, r4, r5, r6, r7, r8, r9, sl

【STM】IO口配置库函数、寄存器、位操作方式

库函数函数: 一个初始化函数:初始化一个或者多个IO口(同一组)的工作模式.输出类型.速度以及上下拉方式 1 void GPIO_Init(GPIO_TypeDef* GPIOx, GPIO_InitTypeDef* GPIO_InitStruct); 2 typedef struct 3 { 4 uint32_t GPIO_Pin//指定要初始化的端口 5 GPIOMode_TypeDef GPIO_Mode;//端口模式 6 GPIOSpeed_TypeDef GPIO_Speed;//速度

【STM】GPIO引脚配置方式

配置方式: 普通 GPIO 输入:根据需要配置该引脚为浮空输入.带弱上拉输入或带弱下拉输入,同时不要使能该引脚对应的所有复用功能模块 普通 GPIO 输出:根据需要配置该引脚为推挽输出或开漏输出,同时不要使能该引脚对应的所有复用功能模块 普通模拟输入:配置该引脚为模拟输入模式,同时不要使能该引脚对应的所有复用功能模块 内置外设的输入:根据需要配置该引脚为浮空输入. 带弱上拉输入或带弱下拉输入,同时使能该引脚对应的某个复用功能模块 内置外设的输出:根据需要配置该引脚为复用推挽输出或复用开漏输出,同

stm之中断系统

概述: 提供中断控制器,用于总体管理异常,称之为“嵌套向量中断控制器:Nested Vectored Interrupt Controller (NVIC) VIC:中断管理器: NVIC:内嵌中断管理器,将中断嵌套进入内核: 带来的优势:1.响应速度提高: 2.标准化,统一管理: stm32创新:所以IO口都可以中断: stm32的中断向量表:一个中断源,对应的地址.优先级等信息: 具体参见stm32的文档: NVIC中断优先级: 中断优先级高的中断可以抢占中断优先级低的中断,从而实现了中断嵌

stm之SPI通信协议

SPI (Serial Peripheral interface),顾名思义就是串行外围设备接口.SPI是一种高速的,全双工,同步的通信总线,并且在芯片的管脚上只占用四根线,节约了芯片的管脚,同时为PCB的布局上节省空间,提供方便,主要应用在 EEPROM,FLASH,实时时钟,AD转换器,还有数字信号处理器和数字信号解码器之间 SPI内部简明结构图 关于SPI传输过程(下面由灵魂画家作图) SPI包含四根线: 1.SS(Slave Select):片选信号线,当有多个SPI设备与MCU相连时,