高CPU业务场景下的任务分发方案Gearman搭建一览

  

    Gearman是当年LiveJournal用来做图片resize的,大家也明白图片resize是一个高CPU的操作,如果让web网站去做这个高CPU的功能,有可能会拖垮你的

web应用,那本篇我们来看看gearman是如何解决这个问题的,它的架构图类似下面这样:

从上面这张图,你应该会看到,Gearman是由三个部分组成:

1. Job Server

这个就是Gearman的Job Server,通过它对Client 和 jobwork 进行桥接,是不是想起来了中介者模式。。。

2. Client

Gearman提供了Client API 给客户端调用,Client只需要将一个高CPU的业务函数名丢给Job Server,然后等待JobServer的返回执行结果。

3. jobwork

Gearman提供了work API 给work客户端进行调用。jobserver会根据后端的work集群的负载情况,分发给一个合适的work去执行,并等待结果。

说到这里,你应该就明白了,本质上它属于那种分布式的RPC调用,而且非常牛逼的地方在于Client 和 Work 可以用不同的语言实现。

一:安装部署

1.  下载地址:https://github.com/gearman/gearmand/releases

目前gearman的JobServer 有C,JAVA,Perl三种语言实现,由于C版本的JobServer是最活跃的,所以这里采用目前最新的1.1.17版本的gearmand在CentOS

上进行安装部署。

2.  快速安装

可以通过官网http://gearman.org/getting-started/中的getting-started进行快速安装。

<1> 基础依赖库安装和gearmand下载

 1 yum -y install boost-devel gperf libevent-devel libuuid-devel gcc44 gcc-c++
 2 wget https://github.com/gearman/gearmand/releases/download/1.1.17/gearmand-1.1.17.tar.gz
 3 cd gearmand-1.1.17.tar.gz
 4 tar xzvf gearmand-1.1.17.tar.gz
 5 cd gearmand-1.1.17
 6 [[email protected] gearmand-1.1.17]# ls
 7 aclocal.m4  build-aux     configure.ac  gear_config.in  libgearman-1.0     libhashkit-1.0  Makefile.am  rpm      THANKS
 8 AUTHORS     ChangeLog     COPYING       gearmand        libgearmancore     libhostile      Makefile.in  scripts  util
 9 benchmark   configmake.h  docs          HACKING         libgearman-server  libtest         man          support  version.m4
10 bin         configure     examples      libgearman      libhashkit         m4              NEWS         tests

<2> 然后就是常规的./configure --prefix=/usr/myapp/gearman && make && make install  这个过程超级慢,可以出去抽跟烟,

顺便再去拉泡屎。。。

1 ./configure --prefix=/usr/myapp/gearman && make && make install

<3> 若干年后,当你看到这个就算安装成功了。。。还是得恭喜一下。。。。至少没让你踩到缺少各种依赖包的界面。

 1 See any operating system documentation about shared libraries for
 2 more information, such as the ld(1) and ld.so(8) manual pages.
 3 ----------------------------------------------------------------------
 4  /usr/bin/mkdir -p ‘/usr/myapp/gearman/sbin‘
 5  /usr/bin/install -c -m 644 man/gearman_worker_create.3 man/gearman_worker_define_function.3 man/gearman_worker_echo.3 man/gearman_worker_errno.3 man/gearman_worker_error.3 man/gearman_worker_free.3 man/gearman_worker_function_exist.3 man/gearman_worker_grab_job.3 man/gearman_worker_options.3 man/gearman_worker_register.3 man/gearman_worker_remove_options.3 man/gearman_worker_remove_servers.3 man/gearman_worker_set_context.3 man/gearman_worker_set_log_fn.3 man/gearman_worker_set_namespace.3 man/gearman_worker_set_options.3 man/gearman_worker_set_timeout.3 man/gearman_client_has_option.3 man/gearman_client_options_t.3 man/gearman_task_attr_init.3 man/gearman_task_attr_init_background.3 man/gearman_task_attr_init_epoch.3 man/gearman_task_attr_t.3 man/gearman_worker_set_identifier.3 man/gearman_worker_set_workload_free_fn.3 man/gearman_worker_set_workload_malloc_fn.3 man/gearman_worker_st.3 man/gearman_worker_timeout.3 man/gearman_worker_unregister.3 man/gearman_worker_unregister_all.3 man/gearman_worker_wait.3 man/gearman_worker_work.3 man/libgearman.3 ‘/usr/myapp/gearman/share/man/man3‘
 6   /bin/sh ./libtool   --mode=install /usr/bin/install -c gearmand/gearmand ‘/usr/myapp/gearman/sbin‘
 7 libtool: install: /usr/bin/install -c gearmand/gearmand /usr/myapp/gearman/sbin/gearmand
 8  /usr/bin/mkdir -p ‘/usr/myapp/gearman/bin‘
 9   /bin/sh ./libtool   --mode=install /usr/bin/install -c bin/gearman bin/gearadmin ‘/usr/myapp/gearman/bin‘
10 libtool: install: /usr/bin/install -c bin/.libs/gearman /usr/myapp/gearman/bin/gearman
11 libtool: install: /usr/bin/install -c bin/gearadmin /usr/myapp/gearman/bin/gearadmin
12 make[3]: Leaving directory `/usr/myapp/gearmand-1.1.17‘
13 make[2]: Leaving directory `/usr/myapp/gearmand-1.1.17‘
14 make[1]: Leaving directory `/usr/myapp/gearmand-1.1.17‘

<4> 启动gearmand,你也可以用 -d 开启后台运行的模式,这里加上DEBUG只是看一下实时的DEBUG信息,如下所示:

 1 [[email protected] myapp]# cd /usr/myapp/gearman
 2 [[email protected] gearman]# ls
 3 bin  include  lib  sbin  share
 4 [[email protected] gearman]# cd bin
 5 [[email protected] bin]# ls
 6 gearadmin  gearman
 7 [[email protected] bin]# cd /usr/myapp/gearman
 8 [[email protected] gearman]# cd sbin
 9 [[email protected] sbin]# ls
10 gearmand
11 [[email protected] sbin]# ./gearmand --verbose DEBUG
12 ./gearmand: Could not open log file "/usr/myapp/gearman/var/log/gearmand.log", from "/usr/myapp/gearman/sbin", switching to stderr. (No such file or directory)
13   DEBUG 2017-08-29 02:31:10.796259 [  main ] THREADS: 4 -> libgearman-server/gearmand.cc:263
14    INFO 2017-08-29 02:31:10.796374 [  main ] Initializing Gear on port 4730 with SSL: false
15    INFO 2017-08-29 02:31:10.796487 [  main ] Starting up with pid 40299, verbose is set to DEBUG
16   DEBUG 2017-08-29 02:31:10.796637 [  main ] Method for libevent: epoll -> libgearman-server/gearmand.cc:364
17   DEBUG 2017-08-29 02:31:10.798874 [  main ] Trying to listen on 0.0.0.0:4730 -> libgearman-server/gearmand.cc:646
18    INFO 2017-08-29 02:31:10.800151 [  main ] Listening on 0.0.0.0:4730 (8)
19   DEBUG 2017-08-29 02:31:10.800175 [  main ] Trying to listen on :::4730 -> libgearman-server/gearmand.cc:646
20    INFO 2017-08-29 02:31:10.800307 [  main ] Listening on :::4730 (9)
21   DEBUG 2017-08-29 02:31:10.800333 [  main ] Creating wakeup pipe -> libgearman-server/gearmand.cc:915
22   DEBUG 2017-08-29 02:31:10.800344 [  main ] Creating 4 threads -> libgearman-server/gearmand.cc:378
23   DEBUG 2017-08-29 02:31:10.800357 [  main ] Initializing libevent for IO thread -> libgearman-server/gearmand_thread.cc:224
24   DEBUG 2017-08-29 02:31:10.800406 [  main ] Creating IO thread wakeup pipe -> libgearman-server/gearmand_thread.cc:495
25   DEBUG 2017-08-29 02:31:10.800467 [  main ] Thread 1 created -> libgearman-server/gearmand_thread.cc:273
26   DEBUG 2017-08-29 02:31:10.800507 [  main ] Initializing libevent for IO thread -> libgearman-server/gearmand_thread.cc:224
27   DEBUG 2017-08-29 02:31:10.800550 [  main ] Creating IO thread wakeup pipe -> libgearman-server/gearmand_thread.cc:495
28   DEBUG 2017-08-29 02:31:10.800585 [  main ] Thread 2 created -> libgearman-server/gearmand_thread.cc:273
29   DEBUG 2017-08-29 02:31:10.800594 [  main ] Initializing libevent for IO thread -> libgearman-server/gearmand_thread.cc:224
30   DEBUG 2017-08-29 02:31:10.800632 [  main ] Creating IO thread wakeup pipe -> libgearman-server/gearmand_thread.cc:495
31   DEBUG 2017-08-29 02:31:10.800669 [  main ] Thread 3 created -> libgearman-server/gearmand_thread.cc:273
32   DEBUG 2017-08-29 02:31:10.800677 [  main ] Initializing libevent for IO thread -> libgearman-server/gearmand_thread.cc:224
33   DEBUG 2017-08-29 02:31:10.800714 [  main ] Creating IO thread wakeup pipe -> libgearman-server/gearmand_thread.cc:495
34   DEBUG 2017-08-29 02:31:10.800753 [  main ] Thread 4 created -> libgearman-server/gearmand_thread.cc:273
35   DEBUG 2017-08-29 02:31:10.800761 [  main ] replaying queue: begin -> libgearman-server/gearmand.cc:391
36   DEBUG 2017-08-29 02:31:10.800766 [  main ] __replay -> libgearman-server/plugins/queue/default/queue.cc:101
37   DEBUG 2017-08-29 02:31:10.800774 [  main ] replaying queue: end -> libgearman-server/gearmand.cc:397
38    INFO 2017-08-29 02:31:10.800780 [  main ] Adding event for listening socket (8)
39    INFO 2017-08-29 02:31:10.800787 [  main ] Adding event for listening socket (9)
40   DEBUG 2017-08-29 02:31:10.800794 [  main ] Adding event for wakeup pipe -> libgearman-server/gearmand.cc:966
41   DEBUG 2017-08-29 02:31:10.800801 [  main ] Entering main event loop -> libgearman-server/gearmand.cc:406
42   DEBUG 2017-08-29 02:31:10.801186 [     2 ] Entering thread event loop -> libgearman-server/gearmand_thread.cc:463
43   DEBUG 2017-08-29 02:31:10.801277 [     3 ] Entering thread event loop -> libgearman-server/gearmand_thread.cc:463
44   DEBUG 2017-08-29 02:31:10.801507 [  main ] staring up Epoch thread -> libgearman-server/timer.cc:61
45   DEBUG 2017-08-29 02:31:10.801635 [     1 ] Entering thread event loop -> libgearman-server/gearmand_thread.cc:463
46   DEBUG 2017-08-29 02:31:10.802426 [     4 ] Entering thread event loop -> libgearman-server/gearmand_thread.cc:463

<5> 最后通过netstat,lsof, ps -ef 三板斧可以找出来gearmand大概占用的端口号,就如你看到的默认占用的4370端口,

当然你也可以在启动的时候用help命令也是能够知道的。

 1 [[email protected] ~]# netstat -tln
 2 Active Internet connections (only servers)
 3 Proto Recv-Q Send-Q Local Address           Foreign Address         State
 4 tcp        0      0 192.168.122.1:53        0.0.0.0:*               LISTEN
 5 tcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN
 6 tcp        0      0 127.0.0.1:631           0.0.0.0:*               LISTEN
 7 tcp        0      0 127.0.0.1:25            0.0.0.0:*               LISTEN
 8 tcp        0      0 0.0.0.0:4730            0.0.0.0:*               LISTEN
 9 tcp6       0      0 :::8009                 :::*                    LISTEN
10 tcp6       0      0 :::8080                 :::*                    LISTEN
11 tcp6       0      0 :::22                   :::*                    LISTEN
12 tcp6       0      0 ::1:631                 :::*                    LISTEN
13 tcp6       0      0 ::1:25                  :::*                    LISTEN
14 tcp6       0      0 :::4730                 :::*                    LISTEN
15 tcp6       0      0 127.0.0.1:8005          :::*                    LISTEN
16 [[email protected] ~]# ps -ef | grep gearmand
17 root      40299  15869  0 02:31 pts/1    00:00:00 ./gearmand --verbose DEBUG
18 root      40364  40327  0 02:33 pts/2    00:00:00 grep --color=auto gearmand
19 [[email protected] ~]# lsof -i :4730
20 COMMAND    PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
21 gearmand 40299 root    8u  IPv4 322550      0t0  TCP *:gearman (LISTEN)
22 gearmand 40299 root    9u  IPv6 322551      0t0  TCP *:gearman (LISTEN)
23 [[email protected] ~]# 

二:Java Driver 在 Gearman上的使用

为了演示,我可以做一个简单的 “字符串.ToUpper”的业务逻辑来验证一下这个架构是否可以跑的起来。

1. java 充当 Gearman 的 work

首先需要在mvn仓库中拉一下jar包:http://www.mvnrepository.com/artifact/org.gearman/gearman-java/0.6。

<1> UpperFunction类,这个类用于定义work具体的业务逻辑:

 1 package com.datamip.gearmanwork;
 2
 3 import java.text.SimpleDateFormat;
 4 import java.util.Date;
 5
 6 import org.gearman.client.GearmanJobResult;
 7 import org.gearman.client.GearmanJobResultImpl;
 8 import org.gearman.util.ByteUtils;
 9 import org.gearman.worker.AbstractGearmanFunction;
10
11 //字符串大写的业务Function
12 public class UpperFunction extends AbstractGearmanFunction {
13
14     @Override
15     public GearmanJobResult executeFunction() {
16
17         String param = ByteUtils.fromUTF8Bytes((byte[]) this.data);
18
19         byte[] mybytes = param.toUpperCase().getBytes();
20
21         GearmanJobResultImpl result = new GearmanJobResultImpl(mybytes, true, mybytes, null, null, -1, -1);
22
23         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
24
25         String dateString = formatter.format(new Date());
26
27         System.out.println(String.format("当前时间:%s, 过来的字符串:%s,返回的字符串:%s", dateString, param,new String(mybytes)));
28
29         return result;
30     }
31 }

<2>  将UpperFunction注册到gearmand中,从红色代码可以看到,其实是一个kv模式,这里的key="myUpperFunc”的对应执行业务就是new UpperFunction。

这样Client只需要传递一个"myUpperFunc",Gearmand就知道这个“字符串”对应是哪一个处理函数。。。

 1 public class App {
 2     public static void main(String[] args) {
 3
 4         GearmanWorker worker = new GearmanWorkerImpl();
 5
 6         GearmanNIOJobServerConnection conn = new GearmanNIOJobServerConnection("192.168.23.170", 4730);
 7         worker.addServer(conn);
 8
 9         // 将‘将转大写的函数注册’ 到gearmand中
10         worker.registerFunctionFactory(new GearmanFunctionFactory() {
11
12             public String getFunctionName() {
13                 return "myUpperFunc";
14             }
15
16             public GearmanFunction getFunction() {
17                 return new UpperFunction();
18             }
19         });
20
21         System.out.println("启动服务。。。。");
22
23         worker.work();
24     }
25 }

2. java 充当 Gearman 的 client

<1> GearSubmit类【简单的一个包装类,随便定义】

 1 package com.datamip.gearmanclient;
 2
 3 import java.util.concurrent.ExecutionException;
 4 import java.util.concurrent.ExecutorService;
 5 import java.util.concurrent.Executors;
 6 import java.util.concurrent.Future;
 7
 8 import org.gearman.client.GearmanClient;
 9 import org.gearman.client.GearmanClientImpl;
10 import org.gearman.client.GearmanJob;
11 import org.gearman.client.GearmanJobImpl;
12 import org.gearman.client.GearmanJobResult;
13 import org.gearman.common.GearmanJobServerConnection;
14 import org.gearman.common.GearmanNIOJobServerConnection;
15 import org.gearman.util.ByteUtils;
16
17 public class Gearsubmit {
18
19     public  void process() throws InterruptedException, ExecutionException {
20
21         GearmanJobServerConnection conn = new GearmanNIOJobServerConnection("192.168.23.170", 4730);
22
23         GearmanClient client = new GearmanClientImpl();
24
25         client.addJobServer(conn); // 添加连接
26
27         String functionName = "myUpperFunc";
28
29         byte[] data = ByteUtils.toUTF8Bytes("hello,world");
30
31         // 创建后台任务
32         GearmanJob job = GearmanJobImpl.createJob(functionName, data, null);
33
34         GearmanJobResult jobResult = null;
35
36         Future<GearmanJobResult> gearmanJobResult = client.submit(job);
37
38         jobResult = gearmanJobResult.get();
39
40         byte[] resultBytes = jobResult.getResults();
41
42         // 获取job的返回值
43         String value = ByteUtils.fromUTF8Bytes(resultBytes);
44
45         System.out.println(value);
46
47         System.out.println("执行结束");
48
49         client.shutdown();
50     }
51 }

<2> 主程序,开多线程并发的去执行。

 1 public class App {
 2     public static void main(String[] args) throws InterruptedException, ExecutionException, IOException {
 3
 4         ExecutorService executorService = Executors.newFixedThreadPool(100);
 5
 6         for (int i = 0; i < 10000; i++) {
 7             executorService.execute(new Runnable() {
 8
 9                 @Override
10                 public void run() {
11                     Gearsubmit submit=new Gearsubmit();
12
13                     try {
14                         submit.process();
15                     } catch (InterruptedException | ExecutionException e) {
16                         // TODO Auto-generated catch block
17                         e.printStackTrace();
18                     }
19                 }
20             });
21         }
22
23         System.in.read();
24     }
25 }

好了,一切都准备好了,接下来为了演示,演示就是解释,我用Jar2Exe把work程序导出成jar再转换成exe,如下图:

然后我把3.exe开成5个实例,client用100个线程的线程池并发调用,当然一切都是模拟。。。。可以看到,当我client启动的时候,5个work都在执行,

如果这个时候,你把某一个work停止了,jobserver也不再将任务丢给它,而是转给其他负载相对小的work继续执行。

好了,本篇就说到这里,希望对你有帮助。

时间: 2024-08-25 22:54:28

高CPU业务场景下的任务分发方案Gearman搭建一览的相关文章

某种业务场景下,将一个工作区的多个字段整理到一个内表中

1 DATA:num TYPE i. 2 CONSTANTS: times TYPE i VALUE 29. 3 DATA: BEGIN OF ih_lgty, 4 lgty TYPE lgtyp, 5 END OF ih_lgty, 6 it_lgty LIKE TABLE OF ih_lgty. 7 FIELD-SYMBOLS: <lgty> TYPE any, 8 <t334t> LIKE t334t. 9 10 DATA:lh_t334t LIKE t334t, 11 lt

不同场景下 MySQL 的迁移方案

不同场景下 MySQL 的迁移方案 Posted in MySQL and tagged MySQL, 数据迁移, 方案 on Sep 15, 2015. Viewd 2684 times. 文/温国兵 一 目录 一 目录 二 为什么要迁移 三 MySQL 迁移方案概览 四 MySQL 迁移实战 4.1 场景一 一主一从结构迁移从库 4.2 场景二 一主一从结构迁移指定库 4.3 场景三 一主一从结构双边迁移指定库 4.4 场景四 一主一从结构完整迁移主从 4.5 场景五 双主结构跨机房迁移 4

互联网业务场景下消息队列架构

消息队列作为一种基础的抽象数据结构,被广泛应用在各类编程与系统设计中. 同步VS异步 通信的一个基本问题是:发出去的消息什么时候需要被接收到?这个问题引出了两个基础概念:"同步通信"和"异步通信".根据理论抽象模型,同步通信和异步通信最本质的差别来自于时钟机制的有无.同步通信的双方需要一个校准的时钟,异步通信的双方不需要时钟.现实的情况是,没有完全校准的时钟,所以没有绝对的同步通信.同样,绝对异步通信意味着无法控制一个发出去的消息被接收到的时间点,无期限的等待一个消

[原]不同场景下MySQL的迁移方案

一 为什么要迁移 MySQL 迁移是 DBA 日常维护中的一个工作.迁移,究其本义,无非是把实际存在的物体挪走,保证该物体的完整性以及延续性.就像柔软的沙滩上,两个天真无邪的小孩,把一堆沙子挪向其他地方,铸就内心神往的城堡. 生产环境中,有以下情况需要做迁移工作,如下: 1.磁盘空间不够.比如一些老项目,选用的机型并不一定适用于数据库.随着时间的推移,硬盘很有可能出现短缺: 2.业务出现瓶颈.比如项目中采用单机承担所有的读写业务,业务压力增大,不堪重负.如果 IO 压力在可接受的范围,会采用读写

refs的作用是什么,你在什么业务场景下使用过refs

作用是操作dom 场景:图片加载完以后获取图片的宽高 // window上添加事件监听后,组件销毁前需要移除 class Test extends React.Component { constructor(props){ super(props); this.state = { top: 0 } this.handleWindowScroll = this.handleWindowScroll.bind(this); } handleWindowScroll() { this.setState

MvcPager.js在特定业务场景下的问题解决

用到了MvcPager.js,在一个常见的场景中出现了不能POST表单数据的问题,场景描述如下: 日期:2012-12-12 编号:*****                                                                                                                         [查询] 数据列表 首页 1 2 3 ...19 尾页 页面加载完成后,如果首先点击分页页码,那么查询条件部分的日期

面试官:怎么设计大文件、大数据场景下的传输加密方案?

某年某月某一天,冷冽寒风中,姚小毛走进了某家公司,开始了新一轮的面试. 一阵寒暄后. 面试官:"你好,看你的项目经验中有做过数据加密的工作,你是使用什么加密算法加解密的?" 姚小毛:"嗯,我是采用的 非对称加密 + 对称加密 的混合加密算法." 面试官:"为什么要用混合加密的方式?" 姚小毛:"非对称加密跟对称加密都各有优缺点. 非对称安全性好点,由发送方跟接收方分别持有公钥.私钥. 但是缺点是在做大数据量的加密传输时,传输速度会比较慢

高并发场景下请求合并的实践

前言 项目中一般会请求第三方的接口,也会对外提供接口,可能是RPC,也可能是HTTP等方式.在对外提供接口时,有必要提供相应的批量接口,好的批量实现能够提升性能. 高并发场景中,调用批量接口相比调用非批量接口有更大的性能优势.但有时候,请求更多的是单个接口,不能够直接调用批量接口,如果这个接口是高频接口,对其做请求合并就很有必要了.比如电影网站的获取电影详情接口,APP的一次请求是单个接口调用,用户量少的时候请求也不多,完全没问题:但同一时刻往往有大量用户访问电影详情,是个高并发的高频接口,如果

史上最复杂业务场景_逼出阿里高可用三大法宝

原文链接 SREcon 是由计算机科学领域知名机构USENIX主办,聚焦网站可靠性.系统工程.以及复杂分布式系统相关的运维行业技术盛会,今年SREcon17大会 Asia/Australia站于当地时间5月22日-24日在新加坡举行.阿里中间件(Aliware)团队高级技术专家张军(花名游骥)和林佳梁(花名子矜),受邀在本次大会上给现场听众分享了阿里巴巴容量规划和全链路压测方面的技术进展. 容量规划的由来 阿里巴巴有着非常丰富的业务形态,每种业务都由一系列不同的业务系统来提供服务,每个业务系统都