每分钟达百万的数据请求

通过Go来处理每分钟达百万的数据请求

artong0416

我读《通过Go来处理每分钟达百万的数据请求》

我读《通过Go来处理每分钟达百万的数据请求》

原文

原文作者为Malwarebytes公司的首席架构师Marcio Castilho http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/

问题描述

当我们的服务端需要处理大量的耗时任务时,我们一般都会考虑将耗时任务异步处理。

简单粗暴法

golang恰恰给我们的异步处理带来了很大的便利--go func()。然而,绝大多数的时候,我们不能简单粗暴的创建协程来处理异步任务,原因是不可控。虽然协程相对于线程占用的系统资源更少,但这并不代表我们可以无休止的创建协程。积水成江,不停创建协程也有压垮系统的风险。这里引用原作者的demo,一个执行耗时任务的handler。

func payloadHandler(w http.ResponseWriter, r *http.Request) {
    if r.Method != "POST" {
             w.WriteHeader(http.StatusMethodNotAllowed)
             return
    }
    // Read the body into a string for json decoding
     var content = &PayloadCollection{}
     err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
             w.Header().Set("Content-Type", "application/json; charset=UTF-8")
             w.WriteHeader(http.StatusBadRequest)
             return
     }

    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {
        go payload.UploadToS3()   // <----- DON‘T DO THIS
    }
    w.WriteHeader(http.StatusOK)
}

这就是我们遇到的第一个问题,简单粗暴起协程处理耗时任务导致的系统不可控性。我们自然而然就会想,怎么能让系统更可控呢。

优雅的方法

一个很自然的思路是,建立任务队列。golang提供了线程安全的任务队列实现方式--带缓冲的channal。但是这样只是延后了请求的爆发。作者意识到,要解决这一问题,必须控制协程的数量。如何控制协程的数量?Job/Worker模式!这里我将作者的代码修改了一下,单文件执行,以记录这一模式。

package main

import (
    "fmt"
    "reflect"
    "time"
)

var (
    MaxWorker = 10
)

type Payload struct {
    Num int
}

//待执行的工作
type Job struct {
    Payload Payload
}

//任务channal
var JobQueue chan Job

//执行任务的工作者单元
type Worker struct {
    WorkerPool chan chan Job //工作者池--每个元素是一个工作者的私有任务channal
    JobChannel chan Job      //每个工作者单元包含一个任务管道 用于获取任务
    quit       chan bool     //退出信号
    no         int           //编号
}

//创建一个新工作者单元
func NewWorker(workerPool chan chan Job, no int) Worker {
    fmt.Println("创建一个新工作者单元")
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool),
        no:         no,
    }
}

//循环  监听任务和结束信号
func (w Worker) Start() {
    go func() {
        for {
            // register the current worker into the worker queue.
            w.WorkerPool <- w.JobChannel
            fmt.Println("w.WorkerPool <- w.JobChannel", w)
            select {
            case job := <-w.JobChannel:
                fmt.Println("job := <-w.JobChannel")
                // 收到任务
                fmt.Println(job)
                time.Sleep(100 * time.Second)
            case <-w.quit:
                // 收到退出信号
                return
            }
        }
    }()
}

// 停止信号
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

//调度中心
type Dispatcher struct {
    //工作者池
    WorkerPool chan chan Job
    //工作者数量
    MaxWorkers int
}

//创建调度中心
func NewDispatcher(maxWorkers int) *Dispatcher {
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers}
}

//工作者池的初始化
func (d *Dispatcher) Run() {
    // starting n number of workers
    for i := 1; i < d.MaxWorkers+1; i++ {
        worker := NewWorker(d.WorkerPool, i)
        worker.Start()
    }
    go d.dispatch()
}

//调度
func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            fmt.Println("job := <-JobQueue:")
            go func(job Job) {
                //等待空闲worker (任务多的时候会阻塞这里)
                jobChannel := <-d.WorkerPool
                fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
                // 将任务放到上述woker的私有任务channal中
                jobChannel <- job
                fmt.Println("jobChannel <- job")
            }(job)
        }
    }
}

func main() {
    JobQueue = make(chan Job, 10)
    dispatcher := NewDispatcher(MaxWorker)
    dispatcher.Run()
    time.Sleep(1 * time.Second)
    go addQueue()
    time.Sleep(1000 * time.Second)
}

func addQueue() {
    for i := 0; i < 20; i++ {
        // 新建一个任务
        payLoad := Payload{Num: 1}
        work := Job{Payload: payLoad}
        // 任务放入任务队列channal
        JobQueue <- work
        fmt.Println("JobQueue <- work")
        time.Sleep(1 * time.Second)
    }
}

/*
一个任务的执行过程如下
JobQueue <- work  新任务入队
job := <-JobQueue: 调度中心收到任务
jobChannel := <-d.WorkerPool 从工作者池取到一个工作者
jobChannel <- job 任务给到工作者
job := <-w.JobChannel 工作者取出任务
{{1}} 执行任务
w.WorkerPool <- w.JobChannel 工作者在放回工作者池
*/

这样,我们已经能够主动的控制worker的数量。这时候,我们不妨设想一下,我们完全解决问题了么?如果有大量的任务同时涌入,会发生什么样的结果。程序会阻塞等待可用的workerjobChannel := <-d.WorkerPool

//调度
func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            fmt.Println("job := <-JobQueue:")
            go func(job Job) {
                //等待空闲worker (任务多的时候会阻塞这里)
                jobChannel := <-d.WorkerPool
                fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
                // 将任务放到上述woker的私有任务channal中
                jobChannel <- job
                fmt.Println("jobChannel <- job")
            }(job)
        }
    }
}

不要忘记,这个调度方法也是在不断的创建协程等待空闲的worker。我们改一下代码如下:

package main

import (
    "fmt"
    "reflect"
    "time"
    "runtime"
)

var (
    MaxWorker = 10
)

type Payload struct {
    Num int
}

//待执行的工作
type Job struct {
    Payload Payload
}

//任务channal
var JobQueue chan Job

//执行任务的工作者单元
type Worker struct {
    WorkerPool chan chan Job //工作者池--每个元素是一个工作者的私有任务channal
    JobChannel chan Job      //每个工作者单元包含一个任务管道 用于获取任务
    quit       chan bool     //退出信号
    no         int           //编号
}

//创建一个新工作者单元
func NewWorker(workerPool chan chan Job, no int) Worker {
    fmt.Println("创建一个新工作者单元")
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool),
        no:         no,
    }
}

//循环  监听任务和结束信号
func (w Worker) Start() {
    go func() {
        for {
            // register the current worker into the worker queue.
            w.WorkerPool <- w.JobChannel
            fmt.Println("w.WorkerPool <- w.JobChannel", w)
            select {
            case job := <-w.JobChannel:
                fmt.Println("job := <-w.JobChannel")
                // 收到任务
                fmt.Println(job)
                time.Sleep(100 * time.Second)
            case <-w.quit:
                // 收到退出信号
                return
            }
        }
    }()
}

// 停止信号
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

//调度中心
type Dispatcher struct {
    //工作者池
    WorkerPool chan chan Job
    //工作者数量
    MaxWorkers int
}

//创建调度中心
func NewDispatcher(maxWorkers int) *Dispatcher {
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers}
}

//工作者池的初始化
func (d *Dispatcher) Run() {
    // starting n number of workers
    for i := 1; i < d.MaxWorkers+1; i++ {
        worker := NewWorker(d.WorkerPool, i)
        worker.Start()
    }
    go d.dispatch()
}

//调度
func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            fmt.Println("job := <-JobQueue:")
            go func(job Job) {
                fmt.Println("等待空闲worker (任务多的时候会阻塞这里")
                //等待空闲worker (任务多的时候会阻塞这里)
                jobChannel := <-d.WorkerPool
                fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
                // 将任务放到上述woker的私有任务channal中
                jobChannel <- job
                fmt.Println("jobChannel <- job")
            }(job)
        }
    }
}

func main() {
    JobQueue = make(chan Job, 10)
    dispatcher := NewDispatcher(MaxWorker)
    dispatcher.Run()
    time.Sleep(1 * time.Second)
    go addQueue()
    time.Sleep(1000 * time.Second)
}

func addQueue() {
    for i := 0; i < 100; i++ {
        // 新建一个任务
        payLoad := Payload{Num: i}
        work := Job{Payload: payLoad}
        // 任务放入任务队列channal
        JobQueue <- work
        fmt.Println("JobQueue <- work", i)
        fmt.Println("当前协程数:", runtime.NumGoroutine())
        time.Sleep(100 * time.Millisecond)
    }
}

执行结果如下:

...
...

JobQueue <- work 97
当前协程数: 100
job := <-JobQueue:
等待空闲worker (任务多的时候会阻塞这里
JobQueue <- work 98
当前协程数: 101
job := <-JobQueue:
等待空闲worker (任务多的时候会阻塞这里
JobQueue <- work 99
当前协程数: 102

我们发现,我们依然没能控制住协程数量,我们只是控制住了worker的数量。这种情况下,如果worker数量设置的合理且异步任务耗时不是特别长的情况下一般没有问题。但是出于安全的考虑,我要把这个阻塞的协程数做一个控制,如果达到限制时候拒绝服务以保护系统该怎么处理?

真正控制协程数量(并发执行的任务数)

我们可以控制并发执行(包括等待执行)的任务数。我们加入任务使用如下判断。用一个带缓冲的Channel控制并发执行的任务数。当任务异步处理完成的时候执行<- DispatchNumControl释放控制即可。用这种方法,我们可以根据压测结果设置合适的并发数从而保证系统能够尽可能的发挥自己的能力,同时保证不会因为任务量太大而崩溃(因为达到极限的时候,系统会告诉调用方--我很忙)。

//用于控制并发处理的协程数
var DispatchNumControl = make(chan bool, 10000)

func Limit(work Job) bool {
   select {
   case <-time.After(time.Millisecond * 100):
      fmt.println("我很忙")
      return false
   case DispatchNumControl <- true:
   // 任务放入任务队列channal
      jobChannel <- work
      return true
   }
}

总结

总结一波,协程是个好的设计,但任何东西都不能过度使用。我们做系统设计的时候,一定也要时刻想着控制--要对自己设计的系统有着足够的控制力。

分类: golang

时间: 2024-10-29 19:11:59

每分钟达百万的数据请求的相关文章

XML 数据请求与JSON 数据请求

(1)XML 数据请求 使用 AFNetworking 中的 AFHTTPRequestOperation 和 AFXMLParserResponseSerializer,另外结合第三方框架 XMLDictionary 进行数据转换 使用 XMLDictionary 的好处:有效避免自行实现 NSXMLParserDelegate 委托代理协议方法来进行繁琐的数据拼凑解析 (2)JSON 数据请求 使用 AFNetworking 中的 AFHTTPRequestOperation 或 AFHTT

微信小程序数据请求方法wx.request小测试

微信小程序数据请求方法 wx.request wxml文件: <view> <textarea value="{{textdata}}"/> </view> <button bindtap="RequestData" value="Button">Button</button> 主要是一个按钮,点击后将请求的数据写入到textarea中 js文件: Page({ data:{ textd

Ajax --- 数据请求

下面主要介绍(JS原生)数据请求的主要步骤: Ajax 数据请求步骤: 1.创建XMLHttpRequest对象 2.准备数据发送 3.执行发送 4.指定回掉函数 第一步:创建XMLHttpRequest对象 1 var xhr = new XMLHttpRequest(); // 标准浏览器 2 3 var xhr = new ActiveXObject('Microsoft.XMLHTTP'); // IE6 第二步:使用 open() 方法将参数传入 1 xhr.open('get','.

iOS开发——网络编程OC篇&amp;数据请求总结

数据请求总结 1 //**************************************GET 2 //同步 3 /* 4 //第一步,创建URL 5 NSURL * url = [[NSURL alloc]initWithString:@"http://e.hiphotos.baidu.com/image/w%3D1366%3Bcrop%3D0%2C0%2C1366%2C768/sign=66f9b9980ef431adbcd2473a7d0097cc/08f790529822720

JDBC实现往MySQL插入百万级数据

from:http://www.cnblogs.com/fnz0/p/5713102.html JDBC实现往MySQL插入百万级数据 想往某个表中插入几百万条数据做下测试, 原先的想法,直接写个循环10W次随便插入点数据试试吧,好吧,我真的很天真.... DROP PROCEDURE IF EXISTS proc_initData;--如果存在此存储过程则删掉 DELIMITER $ CREATE PROCEDURE proc_initData() BEGIN DECLARE i INT DE

ajax 里的数据请求

今天给大家讲讲前端怎样通过ajax里的 get 和post两种方法来发送数据请求 首先,我们先知道 ajax里的get  post里有两个属性, open(),send().还要区分同步请求还是异步请求 在我们以后参加工作的时候用的比较多的都是异步请求 true: 那我们先来说说异步的请求方法 当然这里也不能漏了同步请求 上面的 是通过get的方法来请求的 date.json是我需要请求的数据地址 这里大家知道怎么请求的方法就可以了 一般你在参加工作之后会有后端的数据库人员会给你相应的数据库地址

使用 AFNetworking 进行 XML 和 JSON 数据请求

(1)XML 数据请求 使用 AFNetworking 中的 AFHTTPRequestOperation 和 AFXMLParserResponseSerializer,另外结合第三方框架 XMLDictionary 进行数据转换 使用 XMLDictionary 的好处:有效避免自行实现 NSXMLParserDelegate 委托代理协议方法来进行繁琐的数据拼凑解析 (2)JSON 数据请求 使用 AFNetworking 中的 AFHTTPRequestOperation 或 AFHTT

ios NSURLRequest NSMutableURLRequest 数据请求

get 请求 #pragma mark - GET登录 - (void)getLogon { // 1. URL NSString *urlStr = [NSString stringWithFormat:@"http://localhost/login.php?username=%@&password=%@", self.userName.text, self.userPwd.text]; NSURL *url = [NSURL URLWithString:urlStr];

批量新增百万条数据 十百万条数据

--创建用户表CREATE TABLE table_1(    id int PRIMARY KEY, -- 主键ID    c1 varchar(24) NOT NULL,-- 列1    c2 datetime NOT NULL -- 列2) -- 批量新增一万条数据CREATE PROCEDURE PROC_INSERT @max int = 1000000, @c1 int = 1as WHILE @c1  <= @maxBEGIN  INSERT INTO Table_1 VALUES