golang常见的几种并发模型框架

原文链接

package main

import (
	"fmt"
	"math/rand"
	"os"
	"runtime"
	"sync"
	"sync/atomic"
	"time"
)

type Scenario struct {
	Name        string
	Description []string
	Examples    []string
	RunExample  func()
}
var s1 = &Scenario{
	Name: "s1",
	Description: []string{
		"简单并发执行任务",
	},
	Examples: []string{
		"比如并发的请求后端某个接口",
	},
	RunExample: RunScenario1,
}

var s2 = &Scenario{
	Name: "s2",
	Description: []string{
		"持续一定时间的高并发模型",
	},
	Examples: []string{
		"在规定时间内,持续的高并发请求后端服务, 防止服务死循环",
	},
	RunExample: RunScenario2,
}

var s3 = &Scenario{
	Name: "s3",
	Description: []string{
		"基于大数据量的并发任务模型, goroutine worker pool",
	},
	Examples: []string{
		"比如技术支持要给某个客户删除几个TB/GB的文件",
	},
	RunExample: RunScenario3,
}

var s4 = &Scenario{
	Name: "s4",
	Description: []string{
		"等待异步任务执行结果(goroutine+select+channel)",
	},
	Examples: []string{
		"",
	},
	RunExample: RunScenario4,
}

var s5 = &Scenario{
	Name: "s5",
	Description: []string{
		"定时的反馈结果(Ticker)",
	},
	Examples: []string{
		"比如测试上传接口的性能,要实时给出指标: 吞吐率,IOPS,成功率等",
	},
	RunExample: RunScenario5,
}

var Scenarios []*Scenario

func init() {
	Scenarios = append(Scenarios, s1)
	Scenarios = append(Scenarios, s2)
	Scenarios = append(Scenarios, s3)
	Scenarios = append(Scenarios, s4)
	Scenarios = append(Scenarios, s5)
}

// 常用的并发与同步场景
func main() {
	if len(os.Args) == 1 {
		fmt.Println("请选择使用场景 ==> ")
		for _, sc := range Scenarios {
			fmt.Printf("场景: %s ,", sc.Name)
			printDescription(sc.Description)
		}
		return
	}
	for _, arg := range os.Args[1:] {
		sc := matchScenario(arg)
		if sc != nil {
			printDescription(sc.Description)
			printExamples(sc.Examples)
			sc.RunExample()
		}
	}
}

func printDescription(str []string) {
	fmt.Printf("场景描述: %s \n", str)
}

func printExamples(str []string) {
	fmt.Printf("场景举例: %s \n", str)
}

func matchScenario(name string) *Scenario {
	for _, sc := range Scenarios {
		if sc.Name == name {
			return sc
		}
	}
	return nil
}

var doSomething = func(i int) string {
	time.Sleep(time.Millisecond * time.Duration(10))
	fmt.Printf("Goroutine %d do things .... \n", i)
	return fmt.Sprintf("Goroutine %d", i)
}

var takeSomthing = func(res string) string {
	time.Sleep(time.Millisecond * time.Duration(10))
	tmp := fmt.Sprintf("Take result from %s.... \n", res)
	fmt.Println(tmp)
	return tmp
}

// 场景1: 简单并发任务

func RunScenario1() {
	count := 10
	var wg sync.WaitGroup

	for i := 0; i < count; i++ {
		wg.Add(1)
		go func(index int) {
			defer wg.Done()
			doSomething(index)
		}(i)
	}

	wg.Wait()
}

// 场景2: 按时间来持续并发

func RunScenario2() {
	timeout := time.Now().Add(time.Second * time.Duration(10))
	n := runtime.NumCPU()

	waitForAll := make(chan struct{})
	done := make(chan struct{})
	concurrentCount := make(chan struct{}, n)

	for i := 0; i < n; i++ {
		concurrentCount <- struct{}{}
	}

	go func() {
		for time.Now().Before(timeout) {
			<-done
			concurrentCount <- struct{}{}
		}

		waitForAll <- struct{}{}
	}()

	go func() {
		for {
			<-concurrentCount
			go func() {
				doSomething(rand.Intn(n))
				done <- struct{}{}
			}()
		}
	}()

	<-waitForAll
}

// 场景3:以 worker pool 方式 并发做事/发送请求

func RunScenario3() {
	numOfConcurrency := runtime.NumCPU()
	taskTool := 10
	jobs := make(chan int, taskTool)
	results := make(chan int, taskTool)
	var wg sync.WaitGroup

	// workExample
	workExampleFunc := func(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
		defer wg.Done()
		for job := range jobs {
			res := job * 2
			fmt.Printf("Worker %d do things, produce result %d \n", id, res)
			time.Sleep(time.Millisecond * time.Duration(100))
			results <- res
		}
	}

	for i := 0; i < numOfConcurrency; i++ {
		wg.Add(1)
		go workExampleFunc(i, jobs, results, &wg)
	}
	totalTasks := 100

	wg.Add(1)
	go func() {
		defer wg.Done()
		for i := 0; i < totalTasks; i++ {
			n := <-results
			fmt.Printf("Got results %d \n", n)
		}
		close(results)
	}()

	for i := 0; i < totalTasks; i++ {
		jobs <- i
	}
	close(jobs)
	wg.Wait()
}

// 场景4: 等待异步任务执行结果(goroutine+select+channel)

func RunScenario4() {
	sth := make(chan string)
	result := make(chan string)
	go func() {
		id := rand.Intn(100)
		for {
			sth <- doSomething(id)
		}
	}()
	go func() {
		for {
			result <- takeSomthing(<-sth)
		}
	}()

	select {
	case c := <-result:
		fmt.Printf("Got result %s ", c)
	case <-time.After(time.Duration(30 * time.Second)):
		fmt.Errorf("指定时间内都没有得到结果")
	}
}

var doUploadMock = func() bool {
	time.Sleep(time.Millisecond * time.Duration(100))
	n := rand.Intn(100)
	if n > 50 {
		return true
	} else {
		return false
	}
}

// 场景5: 定时的反馈结果(Ticker)
// 测试上传接口的性能,要实时给出指标: 吞吐率,成功率等

func RunScenario5() {
	totalSize := int64(0)
	totalCount := int64(0)
	totalErr := int64(0)

	concurrencyCount := runtime.NumCPU()
	stop := make(chan struct{})
	fileSizeExample := int64(10)

	timeout := 10 // seconds to stop

	go func() {
		for i := 0; i < concurrencyCount; i++ {
			go func(index int) {
				for {
					select {
					case <-stop:
						return
					default:
						break
					}

					res := doUploadMock()
					if res {
						atomic.AddInt64(&totalCount, 1)
						atomic.AddInt64(&totalSize, fileSizeExample)
					} else {
						atomic.AddInt64(&totalErr, 1)
					}
				}
			}(i)
		}
	}()

	t := time.NewTicker(time.Second)
	index := 0
	for {
		select {
		case <-t.C:
			index++
			tmpCount := atomic.LoadInt64(&totalCount)
			tmpSize := atomic.LoadInt64(&totalSize)
			tmpErr := atomic.LoadInt64(&totalErr)
			fmt.Printf("吞吐率: %d,成功率: %d \n", tmpSize/int64(index), tmpCount*100/(tmpCount+tmpErr))
			if index > timeout {
				t.Stop()
				close(stop)
				return
			}
		}

	}
}

  

原文地址:https://www.cnblogs.com/-wenli/p/11206506.html

时间: 2024-08-11 03:26:41

golang常见的几种并发模型框架的相关文章

最常见的两种防御模型|安全千字文系列2

为了保证系统的机密性.可靠性.稳定性,我们要围绕系统的核心建立一些防御措施,最常见的防御措施模型有两种,分别被描述为棒棒糖和洋葱. 棒棒糖模型 最常见的防御模型被称为便捷安全,也就是围绕有价值的对象建立一个屏障,这个屏障可以是逻辑上的也可以是物理的.很多机构都会选择采用这样的防御模式,比如断绝企业内网与英特网的连接,或者在内外网的交界处放置防火墙,或者有些企业在网络边界上设置认证服务器等. 这样的防御方式,就像一个棒棒糖一样,外层是保护屏障,中心是被保护的信息. 这种模型的好处很明显,就是物料成

Java并发模型(一)

学习资料来自http://ifeve.com/java-concurrency-thread-directory/ 一.多线程 进程和线程的区别: 一个程序运行至少一个进程,一个进程至少包含一个线程. 多线程: 多线程使得在一个程序内部能够拥有多个线程并行执行,一个线程的执行可以被认为是一个cpu在执行该程序,当一个程序运行在多线程下,就好像有多个CPU在同时执行该程序. 多线程在同一个程序内部并发执行,因此会对相同的内存空间进行并发读写操作. 思考: 如果一个线程在读一个内存时,另一个线程正向

Python实现RabbitMQ中6种消息模型

RabbitMQ与Redis对比 ? RabbitMQ是一种比较流行的消息中间件,之前我一直使用redis作为消息中间件,但是生产环境比较推荐RabbitMQ来替代Redis,所以我去查询了一些RabbitMQ的资料.相比于Redis,RabbitMQ优点很多,比如: 具有消息消费确认机制 队列,消息,都可以选择是否持久化,粒度更小.更灵活. 可以实现负载均衡 RabbitMQ应用场景 异步处理:比如用户注册时的确认邮件.短信等交由rabbitMQ进行异步处理 应用解耦:比如收发消息双方可以使用

【专家坐堂】四种并发编程模型简介

本文来自网易云社区 概述 并发往往和并行一起被提及,但是我们应该明确的是"并发"不等同于"并行" ?       并发 :同一时间 对待 多件事情 (逻辑层面) ?       并行 :同一时间 做(执行) 多件事情 (物理层面) 并发可以构造出一种问题解决方法,该方法能够被用于并行化,从而让原本只能串行处理的事务并行化,更好地发挥出当前多核CPU,分布式集群的能力. 但是,并发编程和人们正常的思维方式是不一样的,因此才有了各种编程模型的抽象来帮助我们更方便,更不容

golang中的CSP并发模型

1. 相关概念: 用户态:当一个进程在执行用户自己的代码时处于用户运行态(用户态) 内核态:当一个进程因为系统调用陷入内核代码中执行时处于内核运行态(内核态),引入内核态防止用户态的程序随意的操作内核地址空间,具有一定的安全保护作用.这种保护模式是通过内存页表操作等机制,保证进程间的地址空间不会相互冲突,一个进程的操作不会修改另一个进程地址空间中的数据. 用户态与内核态之间的切换:当在系统中执行一个程序时,大部分时间都是运行在用户态下的,在其需要操作系统帮助完成一些用户态自己没有特权和能力完成的

量化投资策略:常见的几种Python回测框架(库)

量化投资策略:常见的几种Python回测框架(库) 原文地址:http://blog.csdn.net/lawme/article/details/51454237 本文章为转载文章.这段时间在研究量化策略方向,研究了Zipline一段时间,但是后续发现他仅支持美国股票,收集量化策略文章,转载到博客中. 在实盘交易之前,必须对量化交易策略进行回测.在此,我们评价一下常用的Python回测框架(库).评价的尺度包括用途范围(回测.虚盘交易.实盘交易),易用程度(结构良好.文档完整)和扩展性(速度快

IO复用、多进程和多线程三种并发编程模型

I/O复用模型 I/O复用原理:让应用程序可以同时对多个I/O端口进行监控以判断其上的操作是否可以进行,达到时间复用的目的.在书上看到一个例子来解释I/O的原理,我觉得很形象,如果用监控来自10根不同地方的水管(I/O端口)是否有水流到达(即是否可读),那么需要10个人(即10个线程或10处代码)来做这件事.如果利用某种技术(比如摄像头)把这10根水管的状态情况统一传达到某一点,那么就只需要1个人在那个点进行监控就行了,而类似与select或epoll这样的多路I/O复用机制就好比是摄像头的功能

网络I/O模型--5种常见的网络I/O模型

阻塞与非阻塞 阻塞就是卡在那儿什么也不做,双方之间也没有信息沟通. 非阻塞就是即使对方不能马上完成请求,双方之间也有信息的沟通. 同步与异步 同步就是一件事件只由一个过程处理完成,不论阻塞与非阻塞,最后完成这个事情的都是同一个过程 异步就是一件事由两个过程完成,前面一个过程通知,后面一个过程接受返回的结果. 异步和事件驱动(multi IO) 异步是指数据准备好并且已经拷贝到用户空间,在通知用户来取数据 事件驱动理解为准备好数据了但是没有拷贝到用户空间,这个时候去通知用户,用户再去取数据,经过拷

Golang并发模型:select进阶

前一篇文章<Golang并发模型:轻松入门select>介绍了select的作用和它的基本用法,这次介绍它的3个进阶特性. nil的通道永远阻塞 如何跳出for-select select{}阻塞 nil的通道永远阻塞 当case上读一个通道时,如果这个通道是nil,则该case永远阻塞.这个功能有1个妙用,select通常处理的是多个通道,当某个读通道关闭了,但不想select再继续关注此case,继续处理其他case,把该通道设置为nil即可.下面是一个合并程序等待两个输入通道都关闭后才退