前段时间遇到了这样一个需求
某一客户端向一个服务器提交任务
服务器再将分发下去由对于的工作人员来完成
前辈告诉我用gearman搭建一个分布式系统。gearman有三个部分,client、service和worker
client:提交任务
service:分配任务
worker:执行任务
它可以实现的效果就是一台机器上搭建好了服务器
另外可以多台机器作为客户端,可以一起提交任务,然后服务器会用个队列来存起来
然后就是起多台机器作为worker去服务器要任务
把每个步骤都分布到了不同的主机上,这就是典型的分布式系统。
(感觉这个这就是一个加强版的多线程机制,一个线程提交任务到队列,起多个线程去队列中去。就是一个网游一个是单机)
详细介绍可以看官网Gearman,官网上还有各种语言的例子
配置过程的话跟着网上来就可以,前辈说linux就是配置特别麻烦,等配置好了之后,用起来就方便了
gearman client
提交任务用法很简单,以下是用例
gm_client = gearman.GearmanClient([‘localhost:4730‘]) gm_client.submit_job(GRARMAN_TASK_NAME, data, priority=gearman.PRIORITY_HIGH, background=True)
记得先引包 import gearman
[‘localhost:4730‘] 就是gearman服务器的位置,端口默认是4730
然后client有一个submit_job的方法,下面有该函数的源码,有一堆参数意思就和名字一样。如background 参数就是提交后台任务False就是等待返回结果,适用于大量提交任务。
其中最重要的参数 就是 task 在测试代码中我填写是 GRARMAN_TASK_NAME 。这个是任务的唯一标识,就是可以有多个client提交任务,但服务器怎么识别任务呢,就是靠这参数,当然worker也是靠这个参数识别。(我第一次写的时候把这个参数写成了“echo”结果提交了一坨任务,但我自己的worker只收到几个,我调试了好久才发现。。。)
gearman.client.submit_job 源码
1 def submit_job(self, task, data, unique=None, priority=PRIORITY_NONE, background=False, wait_until_complete=True, max_retries=0, poll_timeout=None): 2 """Submit a single job to any gearman server""" 3 job_info = dict(task=task, data=data, unique=unique, priority=priority) 4 completed_job_list = self.submit_multiple_jobs([job_info], background=background, wait_until_complete=wait_until_complete, max_retries=max_retries, poll_timeout=poll_timeout) 5 return gearman.util.unlist(completed_job_list)
gearman worker
worker里面的话,除了GRARMAN_TASK_NAME 和 [‘localhost:4730‘]值得注意意外,还有一个地方就是每个worker都需要注册一下自己要做的任务,就是下面代码中的
gm_worker.register_task(GRARMAN_TASK_NAME, task_listener_reverse)
注册的名字需要和对应client中的一致,后面一个参数是一个方法,写的格式也是代码中的那样,相当于就是拿到了任务怎么干,这个怎么干的过程就写到方法里面
gm_worker = gearman.GearmanWorker([‘localhost:4730‘]) def task_listener_reverse(gearman_worker, gearman_job): #gearman_job 就是client端传过来的数据 print "这里是想要干的事" return gearman_job.data[::-1]#返回数据的逆序 #GRARMAN_TASK_NAME 这个名字需要是任务的唯一标识 gm_worker.register_task(GRARMAN_TASK_NAME, task_listener_reverse)
GearmanAdminClient
今天有一个需求
得到gearman服务上有多少个job,又有多少worker正在工作
然后根据job和worker的数量进行一些相应的调整工作
突然发现gearman中GearmanAdminClient有以下两个方法,瞬间完成任务
def get_workers(self): """Retrieves a list of workers and reports what tasks they‘re operating on""" self.establish_admin_connection() self.current_handler.send_text_command (GEARMAN_SERVER_COMMAND_WORKERS) return self.wait_until_server_responds(GEARMAN_SERVER_COMMAND_WORKERS) def get_status(self): """Retrieves a list of all registered tasks and reports how many items/workers are in the queue""" self.establish_admin_connection() self.current_handler.send_text_command(GEARMAN_SERVER_COMMAND_STATUS) return self.wait_until_server_responds(GEARMAN_SERVER_COMMAND_STATUS)
测试代码
ad_client=gearman.GearmanAdminClient([‘localhost:4730‘]) list=ad_client.get_workers() for row in list: print row print "\n" list=ad_client.get_status() for row in list: print row
部分结果展示
{‘file_descriptor‘: ‘34‘, ‘tasks‘: (), ‘client_id‘: ‘-‘, ‘ip‘: ‘127.0.0.1‘} {‘file_descriptor‘: ‘50‘, ‘tasks‘: (‘resize‘, ‘like‘, ‘dislike‘), ‘client_id‘: ‘-‘, ‘ip‘: ‘127.0.0.1‘} {‘file_descriptor‘: ‘46‘, ‘tasks‘: (‘resize‘, ‘like‘, ‘dislike‘), ‘client_id‘: ‘-‘, ‘ip‘: ‘127.0.0.1‘} {‘file_descriptor‘: ‘59‘, ‘tasks‘: (‘add_phone_info‘,), ‘client_id‘: ‘-‘, ‘ip‘: ‘127.0.0.1‘} {‘file_descriptor‘: ‘55‘, ‘tasks‘: (‘add_phone_info‘,), ‘client_id‘: ‘-‘, ‘ip‘: ‘127.0.0.1‘} {‘workers‘: 0, ‘running‘: 0, ‘task‘: ‘apkcrawler‘, ‘queued‘: 22028} {‘workers‘: 0, ‘running‘: 0, ‘task‘: ‘reverse‘, ‘queued‘: 0} {‘workers‘: 1, ‘running‘: 0, ‘task‘: ‘echo‘, ‘queued‘: 0} {‘workers‘: 10, ‘running‘: 0, ‘task‘: ‘add_phone_info‘, ‘queued‘: 0} {‘workers‘: 0, ‘running‘: 0, ‘task‘: ‘write_hbase‘, ‘queued‘: 0} {‘workers‘: 0, ‘running‘: 0, ‘task‘: ‘write_amazon‘, ‘queued‘: 0} {‘workers‘: 10, ‘running‘: 0, ‘task‘: ‘dislike‘, ‘queued‘: 0}