python saltstack api

# -*- coding:utf-8 -*-
import sys
import json
import pycurl
from io import BytesIO

class PyCurl(object):
def __init__(self, url, **kwargs):
# 传入url地址
self.url = url
# 取出header相关信息
self.header = kwargs.get("header", None)
# 创建一个curl对象
self.curl = pycurl.Curl()
# setopt 来设置一些请求选项
# 指定请求的URL
self.curl.setopt(self.curl.URL, self.url)
# 设置代理浏览器
self.curl.setopt(self.curl.HEADER, False)
# 设置请求方式
self.curl.setopt(self.curl.POST, True)
# 设置https方式
self.curl.setopt(pycurl.SSL_VERIFYPEER, 0)
self.curl.setopt(pycurl.SSL_VERIFYHOST, 0)
# 判断header是否存在
if self.header:
# 设置模拟浏览器
self.curl.setopt(self.curl.HTTPHEADER, self.header)

def request(self, data=None, timeout=None):
# 判断对象类型 是否为 str
if isinstance(data, str):
# 将数据提交
self.curl.setopt(pycurl.POSTFIELDS, data)
header_buf = BytesIO()
body_buf = BytesIO()
# 强制获取新的连接,即替代缓存中的连接
self.curl.setopt(self.curl.FRESH_CONNECT, True)
# 完成交互后强制断开连接,不重用
self.curl.setopt(self.curl.FORBID_REUSE, True)
if str(timeout).isdigit() and timeout > 0:
# 设置timeout超时时间
self.curl.setopt(self.curl.TIMEOUT, timeout)
# 将返回的HTTP HEADER定向到回调函数header_buf
self.curl.setopt(self.curl.HEADERFUNCTION, header_buf.write)
# 将返回的内容定向到回调函数body_buf
self.curl.setopt(self.curl.WRITEFUNCTION, body_buf.write)
try:
# 服务器返回信息
self.curl.perform()
except pycurl.error:
return False
# 状态码
http_code = self.curl.getinfo(self.curl.HTTP_CODE)
# 关闭连接
self.curl.close()
# 返回状态码 header body
return {"http_code": http_code, "header": header_buf.getvalue(), "body": body_buf.getvalue(), "url": self.url}

class SaltApi(object):

def __init__(self, **kwargs):

# 设置超时时间
self.timeout = kwargs.get("timeout", 300)
# 设置头信息
self.header = kwargs.get("header", ["Content-Type:application/json"])
# 获取url
self.__url = "https://192.168.44.20:8888"

# 获取
self.__username = "saltapi"
self.__password = "password"

# token id 获取
def token_id(self):
obj = {‘eauth‘: ‘pam‘, ‘username‘: self.__username, ‘password‘: self.__password}
result = self.post(prefix="/login/", **obj)
if result:
try:
self.__token_id = result[‘return‘][0][‘token‘]
except KeyError:
raise KeyError
print(‘##‘,self.__token_id,‘##‘)
return self.__token_id

def post(self, prefix="/", token=None, **data):

# url拼接
url = self.__url + prefix
print (data)
# 实例化
self.header.append(str(token))
curl = PyCurl(url, header=self.header)
# 发起请求
result = curl.request(data=json.dumps(data), timeout=self.timeout)

# 判断值
if not result:
return result
# 判断状态码是否等于200
if result["http_code"] != 200:
self.response = "response code %s".format(result["info"]["http_code"])
return self.response
result = json.loads(result["body"].decode())

# 判断是否有error
if "error" in result and result["error"]:
self.response = "%s(%s)" % (result["error"]["data"], result["error"]["code"])
return self.response
# 返回正确的数据
return result

def all_key(self):
‘‘‘
获取所有的minion_key
‘‘‘
token = ‘X-Auth-Token:%s‘ % self.token_id()
obj = {‘client‘: ‘wheel‘, ‘fun‘: ‘key.list_all‘}
content = self.post(token=token, **obj)
# 取出认证已经通过的
minions = content[‘return‘][0][‘data‘][‘return‘][‘minions‘]
# print(‘已认证‘,minions)
# 取出未通过认证的
minions_pre = content[‘return‘][0][‘data‘][‘return‘][‘minions_pre‘]
# print(‘未认证‘,minions_pre)
return minions, minions_pre

def accept_key(self, node_name):
‘‘‘
如果你想认证某个主机 那么调用此方法
‘‘‘
token = ‘X-Auth-Token:%s‘ % self.token_id()
obj = {‘client‘: ‘wheel‘, ‘fun‘: ‘key.accept‘, ‘match‘: node_name}
content = self.post(token=token, **obj)
print (content)
ret = content[‘return‘][0][‘data‘][‘success‘]
return ret

# 删除认证方法
def delete_key(self, node_name):
obj = {‘client‘: ‘wheel‘, ‘fun‘: ‘key.delete‘, ‘match‘: node_name}
token = ‘X-Auth-Token:%s‘ % self.token_id()
content = self.post(token=token, **obj)

ret = content[‘return‘][0][‘data‘][‘success‘]
return ret

‘‘‘
tgt_type --
The type of tgt. Allowed values:
glob - Bash glob completion - Default
pcre - Perl style regular expression
list - Python list of hosts
grain - Match based on a grain comparison
grain_pcre - Grain comparison with a regex
pillar - Pillar data comparison
pillar_pcre - Pillar data comparison with a regex
nodegroup - Match on nodegroup
range - Use a Range server for matching
compound - Pass a compound match string
ipcidr - Match based on Subnet (CIDR notation) or IPv4 address.

Changed in version 2017.7.0: Renamed from expr_form to tgt_type

可以针对各种类型进行执行响应的命令

sapi = SaltApi()
ret = sapi.host_remote_func(tgt="ba*",fun=‘test.ping‘,tgt_type=‘pcre‘)
print(‘%s\n%s‘%(‘pcre‘,ret))
ret = sapi.host_remote_func(tgt="bamboo,Min-Centos1",fun=‘test.ping‘,tgt_type=‘list‘)
print(‘%s\n%s‘ % (‘list‘, ret))
ret = sapi.host_remote_func(tgt="*",fun=‘test.ping‘)
print(‘%s\n%s‘ % (‘glob‘, ret))
ret = sapi.host_remote_func(tgt="os:CentOS",fun=‘test.ping‘,tgt_type=‘grain‘)
print(‘%s\n%s‘ % (‘grain‘, ret))
ret = sapi.host_remote_func(tgt="[email protected] or [email protected]:Ubuntu",fun=‘test.ping‘,tgt_type=‘compound‘)
print(‘%s\n%s‘ % (‘compound‘, ret))
ret = sapi.host_remote_func(tgt="192.168.44.0/24",fun=‘test.ping‘,tgt_type=‘ipcidr‘)

‘‘‘

# 针对主机远程执行模块
# def host_remote_func(self, tgt, fun,expr_from=‘glob‘):
def host_remote_func(self, tgt, fun, tgt_type=‘glob‘):
‘‘‘ tgt是主机 fun是模块
写上模块名 返回 可以用来调用基本的资产
例如 curl -k https://ip地址:8080/ > -H "Accept: application/x-yaml" > -H "X-Auth-Token:b50e90485615309de0d83132cece2906f6193e43" > -d client=‘local‘ > -d tgt=‘*‘ > -d fun=‘test.ping‘ 要执行的模块
return:
- iZ28r91y66hZ: true
node2.minion: true
‘‘‘
obj = {‘client‘: ‘local‘, ‘tgt‘: tgt, ‘fun‘: fun, ‘tgt_type‘: tgt_type}
token = ‘X-Auth-Token:%s‘ % self.token_id()
content = self.post(token=token, **obj)
ret = content[‘return‘][0]
return ret

def group_remote_func(self, tgt, fun):
obj = {‘client‘: ‘local‘, ‘tgt‘: tgt, ‘fun‘: fun, ‘expr_form‘: ‘nodegroup‘}
token = ‘X-Auth-Token:%s‘ % self.token_id()
content = self.post(token=token, **obj)
print (content)
ret = content[‘return‘][0]
return ret

def host_remote_execution_module(self, tgt, fun, arg, tgt_type=‘glob‘):
‘执行fun 传入传入参数arg ‘
obj = {‘client‘: ‘local‘, ‘tgt‘: tgt, ‘fun‘: fun, ‘arg‘: arg, ‘tgt_type‘: tgt_type}
token = ‘X-Auth-Token:%s‘ % self.token_id()
content = self.post(token=token, **obj)
ret = content[‘return‘][0]
return ret
# print(salt_aa.host_remote_execution_module(‘*‘, ‘cmd.run‘, ‘ifconfig‘))

# 基于分组来执行
def group_remote_execution_module(self, tgt, fun, arg):
‘‘‘
根据分组来执行
tgt =
‘‘‘

obj = {‘client‘: ‘local‘, ‘tgt‘: tgt, ‘fun‘: fun, ‘arg‘: arg, ‘expr_form‘: ‘nodegroup‘}
token = ‘X-Auth-Token:%s‘ % self.token_id()
content = self.post(token=token, **obj)
jid = content[‘return‘][0]
return jid

def host_sls(self, tgt, arg, tgt_type=‘glob‘):
‘‘‘主机进行sls‘‘‘
obj = {‘client‘: ‘local‘, ‘tgt‘: tgt, ‘fun‘: ‘state.sls‘, ‘arg‘: arg, ‘tgt_type‘: tgt_type}
token = ‘X-Auth-Token:%s‘ % self.token_id()
content = self.post(token=token, **obj)
return content

def group_sls(self, tgt, arg):
‘‘‘ 分组进行sls ‘‘‘
obj = {‘client‘: ‘local‘, ‘tgt‘: tgt, ‘fun‘: ‘state.sls‘, ‘arg‘: arg, ‘expr_form‘: ‘nodegroup‘}
token = ‘X-Auth-Token:%s‘ % self.token_id()
content = self.post(token=token, **obj)
jid = content[‘return‘][0][‘jid‘]
return jid

def host_sls_async(self, tgt, arg, tgt_type=‘glob‘):
‘‘‘主机异步sls ‘‘‘
obj = {‘client‘: ‘local_async‘, ‘tgt‘: tgt, ‘fun‘: ‘state.sls‘, ‘arg‘: arg, ‘tgt_type‘: tgt_type}
token = ‘X-Auth-Token:%s‘ % self.token_id()
content = self.post(token=token, **obj)
jid = content[‘return‘][0][‘jid‘]
return jid

def group_sls_async(self, tgt, arg):
‘‘‘分组异步sls ‘‘‘
obj = {‘client‘: ‘local_async‘, ‘tgt‘: tgt, ‘fun‘: ‘state.sls‘, ‘arg‘: arg, ‘expr_form‘: ‘nodegroup‘}
token = ‘X-Auth-Token:%s‘ % self.token_id()
content = self.post(token=token, **obj)
jid = content[‘return‘][0][‘jid‘]
return jid

def server_group_pillar(self, tgt, arg, **kwargs):
‘‘‘分组进行sls and pillar‘‘‘
obj = {‘client‘: ‘local‘, ‘tgt‘: tgt, ‘fun‘: ‘state.sls‘, ‘arg‘: arg, ‘expr_form‘: ‘nodegroup‘,
‘kwarg‘: kwargs}
token = ‘X-Auth-Token:%s‘ % self.token_id()
content = self.post(token=token, **obj)
jid = content[‘return‘][0]
print (jid)

def server_hosts_pillar(self, tgt, arg, **kwargs):
‘‘‘针对主机执行sls and pillar ‘‘‘
obj = {"client": "local", "tgt": tgt, "fun": "state.sls", "arg": arg, "kwarg": kwargs}
token = ‘X-Auth-Token:%s‘ % self.token_id()
content = self.post(token=token, **obj)
jid = content[‘return‘][0]
print content
return jid

def jobs_all_list(self):
‘‘‘打印所有jid缓存‘‘‘
token = ‘X-Auth-Token:%s‘ % self.token_id()
obj = {"client": "runner", "fun": "jobs.list_jobs"}
content = self.post(token=token, **obj)
print (content)

def jobs_jid_status(self, jid):
‘‘‘查看jid运行状态‘‘‘
token = ‘X-Auth-Token:%s‘ % self.token_id()
obj = {"client": "runner", "fun": "jobs.lookup_jid", "jid": jid}
content = self.post(token=token, **obj)
print (content)
return content

# if __name__ == ‘__main__‘:
# sa = SaltApi()
# print (sa.host_remote_func("bamboo", ‘test.ping‘, ‘list‘))

原文地址:http://blog.51cto.com/wuyebamboo/2324460

时间: 2024-07-29 20:35:56

python saltstack api的相关文章

saltstack api部署,使用.

#这几天一直在研究saltstack api,自己也走了很多坑,把部署,使用的过程记录下来分享给大家,欢迎大家一起交流,探讨. 联系QQ:408822635 #系统环境:Centos6.5 X86_64 #python版本:2.6.6 #安装salt服务和依赖: cat /etc/redhat-release        cd /etc/yum.repos.d/ && wget http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-rele

saltstack api wheel模块报错HTTP/1.1 401 Unauthorized

当使用saltstack api调用wheel模块的时候会出现没有权限的报错 [[email protected] ~]# curl -k -v https://localhost:8000     -H "Accept: application/x-yaml"      -H "X-Auth-Token: 65198e689eb5e720ce75970a4b10da91dc003211"      -d client='wheel'     -d fun='key

Python DB API的异常

我们在昨天预告了一下Python DB API的异常,今天我们来细讲一下: 1.所有异常的超类:StandardError; 2.waring:属于StandardError超类,发生非致命问题所以发的异常: 3.Error:属于StandardError超类,所有错误条件的超类: 4.InterfaceError:属于Error超类,发生与接口(非数据库)相关的错误所引发的异常: 5.DatabaseError:属于Error超类,发生与数据库相关的错误的超类: 6.DataError:属于D

Python Elasticsearch API操作ES集群

环境 Centos 7.4 Python 2.7 Pip 2.7 MySQL-python 1.2.5 Elasticsearc 6.3.1 Elasitcsearch6.3.2 知识点 调用Python Elasticsearh API Python Mysqldb使用 DSL查询与聚合 Pyehon 列表操作 代码 #!/usr/bin/env python # -*- coding: utf-8 -*- #minyt 2018.9.1 #获取24小时内出现的模块次数 # 该程序通过elas

Python C API 引用计数器(三)

简介 Python的内存管理是通过对象的引用计数器来实现的,对象的创建会将引用计数器加1,被引用一次则引用计数器就会加1,反之解除引用时,则引用计数器就会减1,当Python对象的引用计数器为0的时候,则这个对象就会被回收和释放. 这种内存管理的方式是有一定的弊端的,一是它需要额外的空间维护引用计数,二是它不能解决对象的"循环引用"的问题,因此,也有很多语言比如Java并没有采用该算法做来垃圾的回收机制. Python代码实例 import sys def test_refcount(

Python DB API 连接数据库

Python DB API Mysql,Oracle,SqlServer 不关闭,会浪费资源. 原文地址:https://www.cnblogs.com/jiqing9006/p/9713716.html

如何用 Python 和 API 收集与分析网络数据?

摘自 https://www.jianshu.com/p/d52020f0c247 本文以一款阿里云市场历史天气查询产品为例,为你逐步介绍如何用 Python 调用 API 收集.分析与可视化数据.希望你举一反三,轻松应对今后的 API 数据收集与分析任务. 市场 我们尝试的,是他们找到的阿里云市场的一款 API 产品,提供天气数据. 它来自于易源数据,链接在 https://market.aliyun.com/products/57096001/cmapi010812.html?spm=517

python编写api接口

 目标: 使用Python实现一个简单的接口服务,可以通过get.post方法请求该接口,拿到响应数据.创建一个api_server.py文件, 想要实现的效果是这样的: 添加代码如下:  1 import flask,json 2 from flask import request 3 4 ''' 5 flask: seb框架,通过flask提供的装饰器@server.route()将普通函数转换为服务 6 登录接口,需要传入url,username,passwd 7 ''' 8 9 #创建一

Saltstack API以及对应的Python模板

来源:<Python自动化运维开发> 测试: import salt.client client = salt.client.LocalClient() ret = client.cmd('*', 'test.ping') print ret (1)Archive模块 1)功能:实现系统层面的压缩包调用,支持gunzip.gzip.rar. tar.unrar.unzip等. 2)示例: #采用gzunzip解压/tmp/sourcefile.txt.gz包 salt '*' archive.