一、ansible api
在了解python的ansible api之前,先简单了解一下ansible。
ansible是新出现的自动化运维工具,基于Python开发,集合了众多运维工具(puppet、cfengine、chef、func、fabric)的优点,实现了批量系统配置、批量程序部署、批量运行命令等功能。
ansible是基于模块工作的,本身没有批量部署的能力。真正具有批量部署的是ansible所运行的模块,ansible只是提供一种框架。主要包括:
- 连接插件connection plugins:负责和被监控端实现通信;
- host inventory:指定操作的主机,是一个配置文件里面定义监控的主机;
- 各种模块核心模块、command模块、自定义模块;
- 借助于插件完成记录日志邮件等功能;
- playbook:剧本执行多个任务时,非必需可以让节点一次性运行多个任务。
安装ansible
[[email protected] python]# yum install -y ansible # 直接yum安装即可
配置ansible
[[email protected] day_13]# ssh-keygen -t rsa # 创建密钥 Generating public/private rsa key pair. Enter file in which to save the key (/root/.ssh/id_rsa): Enter passphrase (empty for no passphrase): Enter same passphrase again: Your identification has been saved in /root/.ssh/id_rsa. Your public key has been saved in /root/.ssh/id_rsa.pub. The key fingerprint is: 8c:91:b4:b5:f5:79:ae:19:5a:a4:a3:ef:35:72:91:91 [email protected] The key‘s randomart image is: +--[ RSA 2048]----+ | . . . | | . + o . o | | + . E . | | + o = | | . S o = . | | . + = | | . o * | | . + . | | .o | +-----------------+ [[email protected] day_13]# ssh-copy-id -i /root/.ssh/id_rsa.pub 192.168.38.250 # 传输密钥,使其免密钥访问 [[email protected] day_13]# vim /etc/ansible/hosts [web] 192.168.38.250
测试
[[email protected] day_13]# ansible web -a ‘uname -r‘ 192.168.38.250 | success | rc=0 >> 3.10.0-327.el7.x86_64
常用参数
- -m: 使用模块名
- -a: 传入的参数
常用模块
- copy模块
# 将/root/anaconda-ks.cfg复制到/tmp下 [[email protected] day_13]# ansible web -m copy -a ‘src=/root/anaconda-ks.cfg dest=/tmp‘
- file模块
# 修改/tmp的属主,属组和权限 [[email protected] day_13]# ansible web -m file -a ‘dest=/tmp mode=755 owner=root group=root‘
- cron模块
# 创建周期性任务 [[email protected] day_13]# ansible web -m cron -a ‘name="ntp job" minute=*/3 hour=* day=* month=* weekday=* job="/usr/sbin/ntpdate ntp.sjtu.edu.cn"‘
- group模块
# 创建用户组 [[email protected] day_13]# ansible web -m group -a ‘gid=2000 name=linux‘
- user模块
# 创建用户 [[email protected] day_13]# ansible web -m user -a ‘name=linux groups=linux state=present‘
- yum模块
# 安装httpd服务 [[email protected] day_13]# ansible web -m yum -a "state=present name=httpd"
- service模块
# 重启httpd服务 [[email protected] day_13]# ansible web
- script模块
# 执行脚本 [[email protected] day_13]# ansible web -m script -a ‘/root/test.sh‘
- ping模块
# 查看能否ping通 [[email protected] day_13]# ansible web -m ping
- command模块
# 执行命令,和shell模块相似 [[email protected] day_13]# ansible web -m command -a ‘hostname‘
安装python的ansible模块
# ansible 2.0之后变化很大,故安装2.0之前的版本 [[email protected] day_13]# pip install ‘ansible<2.0‘
ansible中的函数
PACKAGE CONTENTS cache (package) callback_plugins (package) callbacks color constants errors inventory (package) module_common module_utils (package) modules (package) playbook (package) runner (package) utils (package)
实例
In [1]: import ansible.runner In [2]: runner = ansible.runner.Runner( ...: module_name=‘shell‘, # 模块名 ...: module_args=‘uname -r‘, # 参数 ...: pattern=‘web‘, # 组名 ...: forks=10 # 线程数 ...: ) In [3]: res = runner.run() In [4]: res Out[4]: {‘contacted‘: {‘192.168.38.250‘: {u‘changed‘: True, u‘cmd‘: u‘uname -r‘, u‘delta‘: u‘0:00:00.006807‘, u‘end‘: u‘2016-11-02 02:43:47.664868‘, ‘invocation‘: {‘module_args‘: u‘uname -r‘, ‘module_complex_args‘: {}, ‘module_name‘: ‘shell‘}, u‘rc‘: 0, u‘start‘: u‘2016-11-02 02:43:47.658061‘, u‘stderr‘: u‘‘, u‘stdout‘: u‘3.10.0-327.el7.x86_64‘, u‘warnings‘: []}}, ‘dark‘: {}}
批量命令的思路:从前端获取module_name,module_args,pattern和forks参数,通过ansible.runner.Runner()获取执行结果,对结果处理后对在前端进行展示。
前端
// 通过点击执行按钮,获取参数并传递给回调函数 $(‘#cmdform‘).on(‘submit‘,function(){ var str = $(‘#cmdform‘).serialize() var url = ‘/cmd?‘+str $.get(url,function(data){ //data = "<strong><pre>"+data+"</pre></strong>" # 获取结果,在前端展示 $(‘#display‘).html(data) }) return false })
逻辑端
# 获取参数并格式化 cmd_time = time.strftime(‘%Y-%m-%d %H:%M:%S‘) pattern = request.args.get(‘pattern‘,‘all‘) module = request.args.get(‘module‘,‘shell‘) args = urllib.unquote(request.args.get(‘cmd‘,‘whoami‘)) forks = request.args.get(‘forks‘,5) results = ansible_cmd(pattern,module,args,forks) record = "[%s] - %s - %s - %s\n" % (cmd_time,name,pattern,args) # 拼接字符串并返回结果 for (hostname,result) in results[‘contacted‘].items(): if not "failed" in result and result[‘stdout‘] != "": str += "%s | %s | success >> \n %s \n" % (hostname,result[‘cmd‘],result[‘stdout‘]) else: str += "%s | %s | FAILED >> \n %s \n" % (hostname,result[‘cmd‘],result[‘stderr‘]) for (hostname,result) in results[‘dark‘].items(): str += "%s | SSH Error >> \n %s \ n" % (hostname, result[‘msg‘]) return str
效果图
缺点
虽然批量化执行命令很方便,但是用一个缺点,就是前端将数据传送到逻辑端使用的是‘GET‘请求,并不安全,当用户在前端通过固定格式,将危险的执行操作通过url的格式传递到逻辑端的话会非常危险,所以要在逻辑端进行判断,禁止像‘rm‘这样危险命令的执行。
类(class)
学习地址:http://blog.csdn.net/on_1y/article/details/8640012
python的多线程
Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。
Python的标准库提供了两个模块:thread和threading,thread是低级模块,threading是高级模块,对thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块。
threading 模块提供的方法:
- threading.currentThread(): 返回当前的线程变量。
- threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
- threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
- run(): 用以表示线程活动的方法。
- start():启动线程活动。
- join(): 等待至线程中止。join()的作用是,在子线程完成运行之前,这个子线程的父线程将一直被阻塞。
- isAlive(): 返回线程是否活动的。
- getName(): 返回线程名。
- setName(): 设置线程名。
在此之前,先看一下单线程是如何工作的
# coding:utf-8 from time import sleep,ctime def music(): for i in range(2): print ‘I was listening to music. {}‘.format(ctime()) sleep(1) def movie(): for i in range(2): print ‘I was see movie. {}‘.format(ctime()) sleep(2) if __name__==‘__main__‘: music() movie() print ‘all done {}‘.format(ctime())
执行结果
[[email protected] day_14]# python simple_thread.py I was listening to music. Wed Nov 2 06:21:42 2016 I was listening to music. Wed Nov 2 06:21:43 2016 I was see movie. Wed Nov 2 06:21:44 2016 I was see movie. Wed Nov 2 06:21:46 2016 all done Wed Nov 2 06:21:48 2016
总结:在执行music函数过程中,movie始终处于阻塞状态,耗时6秒。
多线程
# coding:utf-8 import threading from time import sleep,ctime def music(music): for i in range(2): print ‘I was listening to {}. {}‘.format(music,ctime()) sleep(1) def movie(movie): for i in range(2): print ‘I was see {}. {}‘.format(movie,ctime()) sleep(2) threads = [] thread1 = threading.Thread(target=music,args=(‘爱情买卖‘,)) threads.append(thread1) thread2 = threading.Thread(target=movie,args=(‘阿凡达‘,)) threads.append(thread2) if __name__==‘__main__‘: for i in threads: i.start() i.join() # join()使主线程在子线程完成之前处于阻塞状态 print ‘all done {}‘.format(ctime())
运行结果
[[email protected] day_14]# python multi_thread.py I was listening to 爱情买卖. Wed Nov 2 06:21:58 2016 I was see 阿凡达. Wed Nov 2 06:21:58 2016 I was listening to 爱情买卖. Wed Nov 2 06:21:59 2016 I was see 阿凡达. Wed Nov 2 06:22:00 2016 all done Wed Nov 2 06:22:02 2016
总结:两个子线程同是执行,耗时4秒。
python的多进程
Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
一、multiprocessing实现python跨平台多进程任务
multiprocessing模块提供了一个Process类来代表一个进程对象,创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。
join()方法可以等待子进程结束后再继续往下运行,和线程不同的时,不加join()父进程不会退出,子进程也不会成为孤儿
单个子进程实例
# codiing:utf-8 from multiprocessing import Process import os,time def run(name): time.sleep(5) print ‘Run child process {} {}‘.format(name,os.getpid()) if __name__==‘__main__‘: print ‘Parent process {}.‘.format(os.getpid()) p = Process(target=run,args=(‘child_process‘,)) print ‘I am parent process {}, child process will start.‘.format(os.getpid()) p.start() p.join() # 子进程结束后再往下执行父进程,即使父进程执行完也不退出,等待子进程一起退出 print ‘I am parent process {}, child process end.‘.format(os.getpid())
运行结果
# 加了join() [[email protected] day_14]# python simple_process.py Parent process 47143. I am parent process 47143, child process will start. Run child process child_process 47145 I am parent process 47143, child process end. # 注释掉join() [[email protected] day_14]# python simple_process.py Parent process 32359. I am parent process 32359, child process will start. I am parent process 32359, child process end. Run child process child_process 32361
多个子进程实例
#coding:utf-8 from multiprocessing import Process import os def run(name,num): print ‘{} Run child process {}{},my parent is {}..‘.format(num,name,os.getpid(),os.getppid()) if __name__==‘__main__‘: print ‘Parent process is {}.‘.format(os.getpid()) for i in range(3): p = Process(target=run,args=(‘test‘,i,)) print ‘Process will start {}.‘.format(os.getpid()) p.start() p.join() print ‘Process end {}.‘.format(os.getpid())
运行结果
# 开启join()阻塞时候的执行结果——有序但阻塞 [[email protected] day_14]# python multi_process.py Parent process is 91474. Process will start 91474. 0 Run child process test91477,my parent is 91474.. Process will start 91474. 1 Run child process test91478,my parent is 91474.. Process will start 91474. 2 Run child process test91479,my parent is 91474.. Process end 91474. # 关闭join()非阻塞时候的执行结果——高效非阻塞,但无序混乱了 [[email protected] day_14]# python multi_process.py Parent process is 113725. Process will start 113725. Process will start 113725. Process will start 113725. Process end 113725. 0 Run child process test113726,my parent is 113725.. 1 Run child process test113727,my parent is 113725.. 2 Run child process test113728,my parent is 113725..
二、 Python多进程并发操作中进程池Pool的应用
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,10几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,这时候进程池Pool发挥作用的时候就到了。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。这里有一个简单的例子:
#!/usr/bin/python #coding:utf-8 from multiprocessing import Pool import os,time,random def run(name): print "Run child process %s (%s),my parent is (%s).." % (name,os.getpid(),os.getppid()) start = time.time() time.sleep(random.random()*3) end = time.time() print ‘Task %s runs %0.2f seconds..(%s)‘ % (name,(end-start),os.getpid()) if __name__==‘__main__‘: print ‘Parent process %s‘ % os.getpid() pool = Pool(processes=3) # 创建进程池 p=Pool() 默认创建进程数为cpu核数 for n in xrange(4): result = pool.apply_async(run,args=(n,)) # 用子进程处理任务, print ‘waiting for all subprocess done (%s)‘ % os.getpid() pool.close() # 调用close()会等待池中的worker进程执行结束再关闭pool, pool.join() # 等待所有子进程执行完毕后在执行父进程,如果父进程先退出,所有子进程也消失,任务终止 if result.successful(): # result.successful()表示整个调用执行的状态,如果还有worker没有执行完,则会抛出AssertionError异常。 print ‘successful‘ print ‘all subprocess end (%s)‘ % os.getpid()
运行结果
[[email protected] day_14]# python process_pool.py Parent process 114005 waiting for all subprocess done (114005) Run child process 0 (114007),my parent is (114005).. # 进程池有三个进程,故前三个任务是并发执行的 Run child process 1 (114008),my parent is (114005).. Run child process 2 (114006),my parent is (114005).. Task 0 runs 0.66 seconds..(114007) Run child process 3 (114007),my parent is (114005).. # 第四个任务开始被前面闲下来的子进程处理 Task 3 runs 0.01 seconds..(114007) Task 2 runs 1.79 seconds..(114006) Task 1 runs 2.73 seconds..(114008) successful all subprocess end (114005)
三、进行间通信
Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:
# coding:utf-8 from multiprocessing import Queue,Process import os,time,random def write(q): for value in [‘A‘,‘B‘,‘C‘]: print ‘Put {} to queue‘.format(value) q.put(value) time.sleep(random.random()) def read(q): while True: # 此处不能为while not q.empty(),因为不能保证读和写速度,当队列为空,读进程就会结束 value = q.get(True) # 当队列为空时,get()会被阻塞,需要强制关闭进程 print ‘Get {} from queue‘.format(value) if __name__==‘__main__‘: q = Queue() pw = Process(target=write,args=(q,)) pr = Process(target=read,args=(q,)) pw.start() # 写进程与读进程同时执行 pr.start() pw.join() pr.terminate() # 强制结束都进程,因为它是死循环
运行结果
[[email protected] day_14]# python process_queue.py Put A to queue Get A from queue Put B to queue Get B from queue Put C to queue Get C from queue
Flask-SQLALchemy
学习地址:http://forlinux.blog.51cto.com/8001278/1420961
SQLAlchemy
学习地址:https://segmentfault.com/a/1190000006949536#articleHeader5