golang的多协程实践

go语言以优异的并发特性而闻名,刚好手上有个小项目比较适合。

项目背景:

公司播控平台的数据存储包括MySQL和ElasticSearch(ES)两个部分,编辑、运营的数据首先保存在MySQL中,为了实现模糊搜索和产品关联推荐,特别增加了ES,ES中保存的是节目集的基本信息。

本项目是为了防止实时同步数据出现问题或者系统重新初始化时的全量数据同步而做的。项目主要是从MySQL读取所有的节目集数据写入到ES中。

项目特点:

因为节目集数量较大,不能一次性的读入内存,因此每次读出一部分记录写入ES。ORM使用的是beego。为了提高性能使用了协程,其中读MySQL的部分最大开启20个协程,ES写入部分开启了15个协程(因为ES分片设置的问题,5个协程和15个协程性能映像不大)。

项目主要包括三个文件:

1、PrdES_v3.go,项目的入口,负责协调MySQL读取和ES写入。

package main

import (
    "./db"
    "./es"
    //"encoding/json"
    "fmt"
    "reflect"
    "time"
)

type PrdES struct {
    DB *prd.Mysql
    ES *es.Elastic
}

// func (this *PrdES) Handle(result []*prd.Series) {
//     // for _, value := range result {
//     //     this.DB.FormatData(value)
//     //     //json, _ := json.Marshal(value)
//     //     //fmt.Println(string(json))
//     // }
//     //写入ES,以多线程的方式执行,最多保持5个线程
//     this.ES.DoBulk(result)
// }
func (this *PrdES) Run() {
    count := 50
    offset := 0
    maxCount := 20
    //create channel
    chs := make([]chan []*prd.Series, maxCount)
    selectCase := make([]reflect.SelectCase, maxCount)
    for i := 0; i < maxCount; i++ {
        offset = count * i
        fmt.Println("offset:", offset)
        //init channel
        chs[i] = make(chan []*prd.Series)
        //set select case
        selectCase[i].Dir = reflect.SelectRecv
        selectCase[i].Chan = reflect.ValueOf(chs[i])
        //运行
        go this.DB.GetData(offset, count, chs[i])
    }
    var result []*prd.Series
    for {
        //wait data return
        chosen, recv, ok := reflect.Select(selectCase)
        if ok {
            fmt.Println("channel id:", chosen)
            result = recv.Interface().([]*prd.Series)

            //读取数据从mysql
            go this.DB.GetData(offset, count, chs[chosen])

            //写入ES,以多线程的方式执行,最多保持15个线程
            this.ES.DoBulk(result)
            //update offset
            offset = offset + len(result)
            //判断是否到达数据尾部,最后一次查询
            if len(result) < count {
                fmt.Println("read end of DB")
                //等所有的任务执行完毕
                this.ES.Over()
                fmt.Println("MySQL Total:", this.DB.GetTotal(), ",Elastic Total:", this.ES.GetTotal())
                return

            }
        }
    }

}

func main() {
    s := time.Now()
    fmt.Println("start")
    pe := new(PrdES)

    pe.DB = prd.NewDB()
    pe.ES = es.NewES()
    //fmt.Println("mysql info:")
    //fmt.Println("ES info:")
    pe.Run()

    fmt.Println("time out:", time.Since(s).Seconds(), "(s)")
    fmt.Println("Over!")

}

在run函数里可以看到使用了reflect.SelectCase,使用reflect.SelectCase的原因是读MySQL数据是多个协程,不可预计哪个会首先返回,selectCase是任何一个处理完毕reflect.Select函数就会返回,MySQL读取的数据放在channel中宕Select函数返回时chosen, recv, ok := reflect.Select(selectCase)判断ok是否未true            chosen代表的是协程id通过result = recv.Interface().([]*prd.Series)获得返回的数据,因为MySQL读取的数据是对象的结果集,因次使用recv.Interface函数,如果是简单类型可以使用recv.recvInt(),recv.recvString()等函数直接获取channel返回数据。 

这里通过counter控制协程的数量,也可以通过channel,用select的方式控制协程的数量,之所以用counter计数器的方式控制协程数量是我想知道同时有多少协程在运行。

注:此处channel可以不用创建数组形式,channel带回来的数据也没有顺序问题。

2、es.go,负责写入ES和es的写入协程调度

package es

import (
    "../db"
    //"encoding/json"
    "fmt"
    elastigo "github.com/mattbaird/elastigo/lib"
    //elastigo "github.com/Uncodin/elastigo/lib"
    //"github.com/Uncodin/elastigo/core"
    "time"
    //"bytes"
    "flag"
    "sync"
    //"github.com/fatih/structs"
)

var (
    //开发测试库
    //host = flag.String("host", "192.168.1.236", "Elasticsearch Host")
    //C平台线上
    host = flag.String("host", "192.168.100.23", "Elasticsearch Host")
    port = flag.String("port", "9200", "Elasticsearch port")
)

//indexor := core.NewBulkIndexorErrors(10, 60)
// func init() {
//     //connect to elasticsearch
//     fmt.Println("connecting  es")
//     //api.Domain = *host //"192.168.1.236"
//     //api.Port = "9300"

// }
//save thread count
var counter int

type Elastic struct {
    //Seq int64
    c         *elastigo.Conn
    lock      *sync.Mutex
    lockTotal *sync.Mutex
    wg        *sync.WaitGroup
    total     int64
}

func (this *Elastic) Conn() {
    this.c = elastigo.NewConn()
    this.c.Domain = *host
    this.c.Port = *port
    //NewClient(fmt.Sprintf("%s:%d", *host, *port))
}
func (this *Elastic) CreateLock() {
    this.lock = &sync.Mutex{}
    this.lockTotal = &sync.Mutex{}
    this.wg = &sync.WaitGroup{}
    counter = 0
    this.total = 0
}
func NewES() (es *Elastic) {
    //connect elastic
    es = new(Elastic)
    es.Conn()
    //create lock
    es.CreateLock()
    return es
}
func (this *Elastic) DoBulk(series []*prd.Series) {
    for true {
        this.lock.Lock()
        if counter < 25 {
            //跳出,执行任务
            break
        } else {
            this.lock.Unlock()
            //等待100毫秒
            //fmt.Println("wait counter less than 25, counter:", counter)
            time.Sleep(1e8)
        }
    }
    this.lock.Unlock()
    //执行任务
    go this.bulk(series, this.lock)
}
func (this *Elastic) Over() {
    this.wg.Wait()
    /*for {
        this.lock.Lock()
        if counter <= 0 {
            this.lock.Unlock()
            break
        }
        this.lock.Unlock()
    }
    */
}

func (this *Elastic) GetTotal() (t int64) {
    this.lockTotal.Lock()
    t = this.total
    this.lockTotal.Unlock()
    return t
}
func (this *Elastic) bulk(series []*prd.Series, lock *sync.Mutex) (succCount int64) {
    //增加计数器
    this.wg.Add(1)
    //减少计数器
    defer this.wg.Done()

    //加计数器
    lock.Lock()
    counter++
    fmt.Println("add task, coutner:", counter)
    lock.Unlock()

    //设置初始成功写入的数量
    succCount = 0

    for _, value := range series {
        //json, _ := json.Marshal(value)
        //fmt.Println(string(json))
        if value.ServiceGroup != nil {
            fmt.Println("series code:", value.Code, ",ServiceGroup:", value.ServiceGroup)

            resp, err := this.c.Index("guttv", "series", value.Code, nil, *value)

            if err != nil {
                panic(err)
            } else {
                //fmt.Println(value.Code + " write to ES succsessful!")
                fmt.Println(resp)
                succCount++
            }
        } else {
            fmt.Println("series code:", value.Code, "service group is null")
        }
    }

    //计数器减一
    lock.Lock()
    counter--
    fmt.Println("reduce task, coutner:", counter, ",success count:", succCount)
    lock.Unlock()

    this.lockTotal.Lock()
    this.total = this.total + succCount
    this.lockTotal.Unlock()
    return succCount
}

在es.go里有两把锁lock和lockTotal,前者是针对counter变量,记录es正在写入的协程数量的;后者为记录总共写入多少条记录到es而增加的。

这里必须要提到的是Over函数,初步使用协程的容易忽略。golang的原则是当main函数运行结束后,所有正在运行的协程都会终止,因袭在MySQL读取数据完毕必须调用Over函数,等待所有的协程结束。这里使用sync.waiGrooup。每次协程启动执行下面两个语句:

//增加计数器
this.wg.Add(1)
//减少计数器,函数结束时自动执行
defer this.wg.Done()Over函数中调用wg.Wait()等待计数器为0时返回,否则就一直阻塞。当然读者也可以看到通过检查counter是否小于等于0也可以判断协程是否都结束(Over函数被注释的部分),显然使用waitGroup更优雅和高效。

 

3、db.go,负责MySQL数据的读取

package prd

import (
    "fmt"
    "github.com/astaxie/beego/orm"
    _ "github.com/go-sql-driver/mysql" // import your used driver
    "strings"
    "sync"
    "time"
)

func init() {

    orm.RegisterDataBase("default", "mysql", "@tcp(192.168.100.3306)/guttv_vod?charset=utf8", 30)

    orm.RegisterModelWithPrefix("t_", new(Series), new(Product), new(ServiceGroup))
    orm.RunSyncdb("default", false, false)
}

type Mysql struct {
    sql   string
    total int64
    lock  *sync.Mutex
}

func (this *Mysql) New() {
    //this.sql = "SELECT s.*, p.code ProductCode, p.name pName  FROM guttv_vod.t_series s inner join guttv_vod.t_product p on p.itemcode=s.code  and p.isdelete=0 limit ?,?"
    this.sql = "SELECT s.*, p.code ProductCode, p.name pName  FROM guttv_vod.t_series s , guttv_vod.t_product p where p.itemcode=s.code  and p.isdelete=0 limit ?,?"
    this.total = 0
    this.lock = &sync.Mutex{}
}
func NewDB() (db *Mysql) {
    db = new(Mysql)
    db.New()
    return db
}
func (this *Mysql) GetTotal() (t int64) {
    t = 0
    this.lock.Lock()
    t = this.total
    this.lock.Unlock()
    return t
}
func (this *Mysql) toTime(toBeCharge string) int64 {
    timeLayout := "2006-01-02 15:04:05"
    loc, _ := time.LoadLocation("Local")
    theTime, _ := time.ParseInLocation(timeLayout, toBeCharge, loc)
    sr := theTime.Unix()
    if sr < 0 {
        sr = 0
    }
    return sr
}
func (this *Mysql) getSGCode(seriesCode string) (result []string, num int64) {
    sql := "select distinct ref.servicegroupcode code  from t_servicegroup_reference_category ref "
    sql = sql + "left join t_category_product cp on cp.categorycode=ref.categorycode "
    sql = sql + "left join t_package pkg on pkg.code = cp.assetcode "
    sql = sql + "left join t_package_product pp on pp.parentcode=pkg.code "
    sql = sql + "left join t_product prd on prd.code = pp.assetcode "
    sql = sql + "where   prd.itemcode=?"
    o := orm.NewOrm()
    var sg []*ServiceGroup
    num, err := o.Raw(sql, seriesCode).QueryRows(&sg)

    if err == nil {
        //fmt.Println(num)
        for _, value := range sg {
            //fmt.Println(value.Code)
            result = append(result, value.Code)
        }

    } else {
        fmt.Println(err)
    }
    //fmt.Println(result)
    return result, num
}

func (this *Mysql) formatData(value *Series) {
    //设置业务分组数据
    sg, _ := this.getSGCode(value.Code)
    //fmt.Println(sg)
    value.ServiceGroup = []string{}
    value.ServiceGroup = sg[0:]
    //更改OnlineTime为整数
    value.OnlineTimeInt = this.toTime(value.OnlineTime)
    //分解地区
    value.OriginalCountryArr = strings.Split(value.OriginalCountry, "|")
    //分解二级分类
    value.ProgramType2Arr = strings.Split(value.ProgramType2, "|")
    //写入记录内容
    value.Description = strings.Replace(value.Description, "\n", "", -1)
}
func (this *Mysql) GetData(offset int, size int, ch chan []*Series) {
    var result []*Series
    o := orm.NewOrm()
    num, err := o.Raw(this.sql, offset, size).
        QueryRows(&result)
    if err != nil {
        fmt.Println("read DB err")
        panic(err)
        //return //err, nil
    }
    for _, value := range result {
        this.formatData(value)
        //json, _ := json.Marshal(value)
        //fmt.Println(string(json))
        //fmt.Println(value.ServiceGroup)
    }
    this.lock.Lock()
    this.total += num
    this.lock.Unlock()

    fmt.Println("read count :", num) //, "Total:", Total)
    //return nil, result
    ch <- result
}

从项目上看。go语言开发还是比较简洁的,多协程实现也相对容易,但要求开发者必须对概念非常清晰,像select和selectCase理解和defer的理解要很到位,个人层经做过多年的C/C++程序员,从经验上看,C/C++的经验(多线程的理解)对运用go语言还是很有帮助的。

时间: 2024-10-11 09:26:16

golang的多协程实践的相关文章

关于协程:nodejs和golang协程的不同

nodejs和golang都是支持协程的,从表现上来看,nodejs对于协程的支持在于async/await,golang对协程的支持在于goroutine.关于协程的话题,简单来说,可以看作是非抢占式的轻量级线程. 协程本身 一句话概括,上面提到了 "可以看作是非抢占式的轻量级线程". 在多线程中,把一段代码放在一个线程中执行,cpu会自动将代码分成碎片,并在一定时间切换cpu控制权,线程通过锁机制确保自己使用的资源在cpu执行别的线程的代码时被修改(占用的内存堆栈.硬盘数据资源等)

golang协程——通道channel阻塞

新的一年开始了,不管今天以前发生了什么,向前看,就够了. 说到channel,就一定要说一说线程了.任何实际项目,无论大小,并发是必然存在的.并发的存在,就涉及到线程通信.在当下的开发语言中,线程通讯主要有两种,共享内存与消息传递.共享内存一定都很熟悉,通过共同操作同一对象,实现线程间通讯.消息传递即通过类似聊天的方式.golang对并发的处理采用了协程的技术.golang的goroutine就是协程的实现.协程的概念很早就有,简单的理解为轻量级线程,goroutine就是为了解决并发任务间的通

在C++中使用golang的协程

开源项目cpp_features提供了一个仿golang协程的stackful协程库. 可以在c++中使用golang的协程,大概语法是这样的: 1 #include <iostream> 2 3 void foo() 4 { 5 std::cout << "foo" << std::endl; 6 } 7 8 co_main() 9 { 10 go foo; 11 } 怎么样,语法是不是和golang很像? 以下是这个项目的ReadMe corou

GoLang之协程

GoLang之协程 目前,WebServer几种主流的并发模型: 多线程,每个线程一次处理一个请求,在当前请求处理完成之前不会接收其它请求:但在高并发环境下,多线程的开销比较大: 基于回调的异步IO,如Nginx服务器使用的epoll模型,这种模式通过事件驱动的方式使用异步IO,使服务器持续运转,但人的思维模式是串行的,大量回调函数会把流程分割,对于问题本身的反应不够自然: 协程,不需要抢占式调度,可以有效提高线程的任务并发性,而避免多线程的缺点:但原生支持协程的语言还很少. 协程(corout

golang中四种方式实现子goroutine与主协程的同步

如何实现子goroutine与主线程的同步 第一种方式:time.sleep(),这种方式很太死板,就不演示了. 第二种方式:使用channel机制,每个goroutine传一个channel进去然后往里写数据,在再主线程中读取这些channel,直到全部读到数据了子goroutine也就全部运行完了,那么主goroutine也就可以结束了.这种模式是子线程去通知主线程结束. package main import ( "fmt" ) func main() { var chanTes

golang 单协程和多协程的性能测试

测试数据:单协程操作1亿数据,以及多协程(10条协程)操作1亿数据(每条协程操作1kw数据) 废话少说,贴代码: 单协程测试运算: package main import ( "fmt" "time" ) func testNum(num int) { for i := 1; i <= 10000000; i++{ num = num + i num = num - i num = num * i num = num / i } } func main() {

今晚八点 golang 分享《如何在60分钟掌握 go 协程&amp;管道 &amp; socket 通信》

今晚八点 golang 分享<如何在60分钟掌握 go 协程&管道 & socket 通信> 内容如下: 功能演示 知识点学习 Golang 介绍 go 并发编程与通信 TCP/IP 协议族 socket 实战 使用 go net 模块开发 tcp 服务器与客户端 代码讲解 分享时间:2019.5.28——20:00-21:30 主讲人:kk 多语言混搭开发工程师,多年 PHP.Python 项目开发经验,带领团队完成多个中.小型项目开发.擅长于 Web 安全开发.性能优化.分

golang协程同步的几种方法

目录 golang协程同步的几种方法 协程概念简要理解 为什么要做同步 协程的几种同步方法 Mutex channel WaitGroup golang协程同步的几种方法 本文简要介绍下go中协程的几种同步方法. 协程概念简要理解 协程类似线程,是一种更为轻量级的调度单位,但协程还是不同于线程的,线程是系统级实现的,常见的调度方法是时间片轮转法,如每隔10ms切换一个线程执行. 协程则是应用软件级实现,它和线程的原理差不多,当一个协程调度到另一个协程时,将上一个协程的上下文信息压入堆栈,来回切换

golang 学习 (八)协程

一: 进程.线程 和 协程 之间概念的区别:        对于 进程.线程,都是有内核进行调度,有 CPU 时间片的概念,进行 抢占式调度(有多种调度算法)    (补充: 抢占式调度与非抢占(轮询任务调度)区别在于抢占式调度可以因为优先级高的任务抢占cpu,而轮询的不能) 对于 协程(用户级线程),这是对内核透明的,也就是系统并不知道有协程的存在,是完全由用户自己的程序进行调度的,因为是由用户程序自己控制,那么就很难像抢占式调度那样做到强制的 CPU 控制权切换到其他进程/线程,通常只能进行