阿里云api调用

# coding: utf-8"""This module be able to manage ecs instances on the aliyun cloud. We choosesome helper functions from salt.cloud.clouds.aliyun since DRY principle.Full documentations about the Aliyun APIs are located at"https://help.aliyun.com/document_detail/25484.html"."""import sysimport timeimport uuidimport hmacimport base64import json

#隐藏部分内部调用库

import logging

log = logging.getLogger(__name__)

from hashlib import sha1

import requestsimport salt.utils

try:    from salt.ext.six.moves.urllib.parse import quote as _quoteexcept:    pass

DEFAULT_ALIYUN_API_VERSION = ‘2014-05-26‘

def get_params(integration_info):    utils.logger.info(‘{}‘.format(integration_info))    access_key_id = None    secret_key = None    if not isinstance(integration_info,dict):        integration_info=json.loads(integration_info)

    for k, v in integration_info.items():        if k == ‘access_key_id‘:            access_key_id = v        elif k == ‘secret_key‘:            secret_key = v    return access_key_id, secret_key

class HTTPError(Exception):    def __init__(self, response):        self.has_data = False

        self.status_code = response.status_code        utils.logger.debug(self.status_code)        self.content = response.content        utils.logger.debug(self.content)        if ‘application/json‘ in response.headers[‘Content-Type‘].lower():            self.has_data = True            self.data = json.loads(                self.content, object_hook=salt.utils.decode_dict)

def _compute_signature(parameters, secret_key):    ‘‘‘    Generate aliyun request signature    ‘‘‘

    def percent_encode(line):        if not isinstance(line, str):            return line

        s = line

        utils.logger.debug(sys.stdin.encoding)        utils.logger.debug(sys.getdefaultencoding())        utils.logger.debug(sys.getfilesystemencoding())        if sys.stdin.encoding is None and sys.getfilesystemencoding() is None:            s = line.decode().encode(‘utf8‘)        elif sys.stdin.encoding is None and sys.getfilesystemencoding():            s = line.decode(sys.getfilesystemencoding()).encode(‘utf8‘)        else:            s = line.decode(sys.stdin.encoding).encode(‘utf8‘)        res = _quote(s, ‘‘)        res = res.replace(‘+‘, ‘%20‘)        res = res.replace(‘*‘, ‘%2A‘)        res = res.replace(‘%7E‘, ‘~‘)        return res

    sortedParameters = sorted(list(parameters.items()),                              key=lambda items: items[0])

    canonicalizedQueryString = ‘‘    for k, v in sortedParameters:        canonicalizedQueryString += ‘&‘ + percent_encode(k) \                                    + ‘=‘ + percent_encode(v)

    # All aliyun API only support GET method    stringToSign = ‘GET&%2F&‘ + percent_encode(canonicalizedQueryString[1:])

    h = hmac.new(secret_key + "&", stringToSign, sha1)    signature = base64.encodestring(h.digest()).strip()    return signature

def query(access_key_id, secret_key, params, jid=None, outputParam=[], **kwargs):    ‘‘‘    Make a web call to aliyun ECS REST API    ‘‘‘    path = ‘https://ecs.aliyuncs.com/‘

    timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())

    # public interface parameters    parameters = {        ‘Format‘: ‘JSON‘,        ‘Version‘: DEFAULT_ALIYUN_API_VERSION,        ‘AccessKeyId‘: access_key_id,        ‘SignatureVersion‘: ‘1.0‘,        ‘SignatureMethod‘: ‘HMAC-SHA1‘,        ‘SignatureNonce‘: str(uuid.uuid1()),        ‘TimeStamp‘: timestamp,    }

    # include action or function parameters    if params:        parameters.update(params)

    # Calculate the string for Signature    signature = _compute_signature(parameters, secret_key)    parameters[‘Signature‘] = signature    utils.logger.debug(parameters)    utils.logger.debug(path)    utils.logger.debug(parameters)    request = requests.get(path, params=parameters, verify=True)    utils.logger.debug(‘request url:{}‘.format(path))    utils.logger.debug(‘parameters:{}‘.format(parameters))

    if request.status_code != 200:        raise HTTPError(request)

    log.debug(request.url)    utils.logger.debug(request.url)    utils.logger.debug(request.status_code)

    content = request.text    utils.logger.debug(content)    result = json.loads(content, object_hook=salt.utils.decode_dict)    if ‘Code‘ in result:        raise HTTPError(request)

    return result

def http_error_code_result(http_error):    if http_error.has_data and ‘Code‘ in http_error.data:        return {"success": False, "message": u‘error code:{0}‘.format(http_error.data[‘Code‘])}    else:        log.error(‘{}‘.format(http_error.content))        utils.logger.error(‘{}‘.format(http_error.content))        return {"success": False, "message": u‘unknown error‘}

def byteify(input_str):    if isinstance(input_str, dict):        return {byteify(key): byteify(value) for key, value in input_str.iteritems()}    elif isinstance(input_str, list):        return [byteify(element) for element in input_str]    elif isinstance(input_str, unicode):        return input_str.encode(‘utf-8‘)    else:        return input_str

def create(integration_info=None, vm_conf=None, jid=None, outputParam=[], **kwargs):    params = {        ‘Action‘: ‘CreateInstance‘,    }    try:        ret, param = utils_errors.check_inputs(locals())        if not ret:            return {‘success‘: False, ‘message‘: ‘input params error,please check input params:{}‘.format(param)}

        access_key_id, secret_key = get_params(integration_info)        if vm_conf in ({}, None, ‘‘):            return {"success": False, "message": u‘create ECS instance fail,please check the config params‘}        else:            params.update(vm_conf)        params = byteify(params)        access_key_id = byteify(access_key_id)        secret_key = byteify(secret_key)        result = query(access_key_id, secret_key, params)        utils.logger.info(‘result:{}‘.format(result))        instanceId = result.get(‘InstanceId‘, None)        out = {‘instance_id‘: instanceId}        utils.logger.info(‘{} {}‘.format(out, outputParam))        outs = utils_params.get_output(out, outputParam)    except HTTPError as e:        return http_error_code_result(e)    return {"success": True, "message": u‘create ECS instance success,instance ID:{0}‘.format(instanceId),            ‘outputParam‘: outs}

def edit(integration_info=None, opts=None, access_key_id=None, secret_key=None, instance_id=None,         instanceName=None,         Description=None, Password=None, HostName=None, jid=None, outputParam=[], **kwargs):    access_key_id, secret_key = get_params(integration_info)    params = {        ‘Action‘: ‘ModifyInstanceAttribute‘,        ‘InstanceId‘: instance_id,        ‘InstanceName‘: instanceName,        ‘Description‘: Description,        ‘Password‘: Password,        ‘HostName‘: HostName,    }

    for k, v in params.items():        if not v:            del params[k]

    try:        ret, param = utils_errors.check_inputs(locals())        if not ret:            return {‘success‘: False, ‘message‘: ‘input params error,please check input params:{}‘.format(param)}

        params = byteify(params)        access_key_id = byteify(access_key_id)        secret_key = byteify(secret_key)        result = query(access_key_id, secret_key, params)        utils.logger.info(‘{}‘.format(result))        instanceId = result.get(‘RequestId‘, None)        out = {‘instance_id‘: instanceId}        outs = utils_params.get_output(out, outputParam)        utils.logger.info(‘{}‘.format(outs))

    except HTTPError as e:        return http_error_code_result(e)    else:        return {‘success‘: True, ‘message‘: u‘edit ECS instance success,instance ID:{0}‘.format(            instanceId), ‘outputParam‘: outs}

def _query_status(integration_info, region_id=None, instance_id=None, **kwargs):    access_key_id, secret_key = get_params(integration_info)    params = {        ‘Action‘: ‘DescribeInstanceStatus‘,    }    try:        params = byteify(params)        access_key_id = byteify(access_key_id)        secret_key = byteify(secret_key)        regions = _query_region(integration_info)        status = None        for i in regions:            params[‘RegionId‘] = i            result = query(access_key_id, secret_key, params)            for i in result[‘InstanceStatuses‘][‘InstanceStatus‘]:                if i[‘InstanceId‘] == instance_id:                    status = i[‘Status‘]                    break            if status:  # Running|Stopped                return True, status        if not status:            return False, ‘the instance not exists‘    except HTTPError as e:        return http_error_code_result(e)

def start(integration_info=None, opts=None, access_key_id=None, secret_key=None, instance_id=None,          jid=None,          outputParam=[], **kwargs):    access_key_id, secret_key = get_params(integration_info)    params = {        ‘Action‘: ‘StartInstance‘,        ‘InstanceId‘: instance_id,    }

    try:        ret, param = utils_errors.check_inputs(locals())        if not ret:            return {‘success‘: False, ‘message‘: ‘input params error,please check input params:{}‘.format(param)}

        params = byteify(params)        access_key_id = byteify(access_key_id)        secret_key = byteify(secret_key)        result = query(access_key_id, secret_key, params)    except HTTPError as e:        return http_error_code_result(e)    else:        return {‘success‘: True, ‘message‘: u‘instance start success‘}

def _query_zone(integration_info=None, region_id=None):    params = {        ‘Action‘: ‘DescribeZones‘,        ‘RegionId‘: region_id,    }    access_key_id, secret_key = get_params(integration_info)    params = byteify(params)    access_key_id = byteify(access_key_id)    secret_key = byteify(secret_key)    result = query(access_key_id, secret_key, params)    return result.get(‘ZoneId‘)

def _query_region(integration_info=None):    params = {        ‘Action‘: ‘DescribeRegions‘,    }    access_key_id, secret_key = get_params(integration_info)    params = byteify(params)    access_key_id = byteify(access_key_id)    secret_key = byteify(secret_key)    result = query(access_key_id, secret_key, params)    result = result[‘Regions‘][‘Region‘]    results = [i[‘RegionId‘] for i in result]    return results

def query_ecs(integration_info=None, opts=None, region_id=None, instance_id=‘Instances‘, jid=None,              outputParam=[], **kwargs):    params = {        ‘Action‘: ‘DescribeInstances‘,        ‘RegionId‘: region_id,    }

    try:        ret, param = utils_errors.check_inputs(locals())        if not ret:            return {‘success‘: False, ‘message‘: ‘input params error,please check input params:{}‘.format(param)}

        access_key_id, secret_key = get_params(integration_info)        params = byteify(params)        access_key_id = byteify(access_key_id)        secret_key = byteify(secret_key)        result = query(access_key_id, secret_key, params)[‘Instances‘][‘Instance‘]        out = {}        for i in result:            if i[‘InstanceId‘] == instance_id:                out[‘PrivateIpAddresses‘] = ‘,‘.join(                    i[‘VpcAttributes‘][‘PrivateIpAddress‘][‘IpAddress‘])                out[‘InnerIpAddresses‘] = ‘,‘.join(i[‘InnerIpAddress‘][‘IpAddress‘])                out[‘PublicIpAddresses‘] = ‘,‘.join(i[‘PublicIpAddress‘][‘IpAddress‘])

                out[‘InstanceName‘] = ‘,‘.join(i[‘InstanceName‘])                out[‘Memory‘] = ‘,‘.join(i[‘Memory‘])                out[‘CPU‘] = ‘,‘.join(i[‘Cpu‘])

                out[‘HostName‘] = ‘,‘.join(i[‘HostName‘])                out[‘Status‘] = ‘,‘.join(i[‘Status‘])                out[‘CreationTime‘] = ‘,‘.join(i[‘CreationTime‘])                out[‘ExpiredTime‘] = ‘,‘.join(i[‘ExpiredTime‘])                out[‘InstanceNetworkType‘] = ‘,‘.join(i[‘InstanceNetworkType‘])        outs = utils_params.get_output(out, outputParam)    except HTTPError as e:        return http_error_code_result(e)    else:        return {‘success‘: True, ‘message‘: ‘query success‘, ‘outputParam‘: outs}

def query_zone(integration_info=None, region_id=None, **kwargs):    params = {        ‘Action‘: ‘DescribeZones‘,        ‘RegionId‘: region_id    }    try:        ret, param = utils_errors.check_inputs(locals())        if not ret:            return {‘success‘: False, ‘message‘: ‘input params error,please check input params:{}‘.format(param)}

        access_key_id, secret_key = get_params(integration_info)        params = byteify(params)        access_key_id = byteify(access_key_id)        secret_key = byteify(secret_key)        result = query(access_key_id, secret_key, params)    except HTTPError as e:        return http_error_code_result(e)    else:        return {‘success‘: True, ‘message‘: ‘query result:\n{}‘.format(utils_params.format_json(result))}

def reboot(integration_info=None, opts=None, access_key_id=None, secret_key=None, instance_id=None,           jid=None, outputParam=[], **kwargs):    params = {        ‘Action‘: ‘RebootInstance‘,        ‘InstanceId‘: instance_id,    }

    try:        ret, param = utils_errors.check_inputs(locals())        if not ret:            return {‘success‘: False, ‘message‘: ‘input params error,please check input params:{}‘.format(param)}

        access_key_id, secret_key = get_params(integration_info)        params = byteify(params)        access_key_id = byteify(access_key_id)        secret_key = byteify(secret_key)        result = query(access_key_id, secret_key, params)    except HTTPError as e:        return http_error_code_result(e)    else:        return {‘success‘: True, ‘message‘: u‘instance restart success‘}

def stop(integration_info=None, instance_id=None, **kwargs):    params = {        ‘Action‘: ‘StopInstance‘,        ‘InstanceId‘: instance_id,    }

    try:        ret, param = utils_errors.check_inputs(locals())        if not ret:            return {‘success‘: False, ‘message‘: ‘input params error,please check input params:{}‘.format(param)}

        access_key_id, secret_key = get_params(integration_info)        params = byteify(params)        access_key_id = byteify(access_key_id)        secret_key = byteify(secret_key)        result = query(access_key_id, secret_key, params)

        start_time = time.time()        while True:            status, res = _query_status(integration_info, instance_id=instance_id)            if status and res == ‘Stopped‘:                return {‘success‘: True, ‘message‘: u‘instance stop success‘}            end_time = time.time()            if end_time - start_time >= 10000:                return {‘success‘: False, ‘message‘: u‘time is out‘}    except HTTPError as e:        return http_error_code_result(e)

def delete(integration_info=None, opts=None, instance_id=None, access_key_id=None, secret_key=None,           jid=None, outputParam=[], **kwargs):    params = {        ‘Action‘: ‘DeleteInstance‘,        ‘InstanceId‘: instance_id,    }

    try:        ret, param = utils_errors.check_inputs(locals())        if not ret:            return {‘success‘: False, ‘message‘: ‘input params error,please check input params:{}‘.format(param)}

        access_key_id, secret_key = get_params(integration_info)        params = byteify(params)        access_key_id = byteify(access_key_id)        secret_key = byteify(secret_key)        result = query(access_key_id, secret_key, params)    except HTTPError as e:        return http_error_code_result(e)    else:        return {‘success‘: True, ‘message‘: u‘instance delete success‘}

def create_image(integration_info=None, opts=None, image=None, jid=None, access_key_id=None,                 secret_key=None,                 outputParam=[], **kwargs):    params = {        ‘Action‘: ‘CreateImage‘,    }

    try:        ret, param = utils_errors.check_inputs(locals())        if not ret:            return {‘success‘: False, ‘message‘: ‘input params error,please check input params:{}‘.format(param)}

        access_key_id, secret_key = get_params(integration_info)        params.update(image)        params = byteify(params)        access_key_id = byteify(access_key_id)        secret_key = byteify(secret_key)        result = query(access_key_id, secret_key, params)        utils.logger.debug(‘create_image‘)    except HTTPError as e:        return http_error_code_result(e)    else:        return {‘success‘: True, ‘message‘: u‘create image success,image ID:{0}‘.format(result[‘ImageId‘])}

原文地址:https://www.cnblogs.com/slqt/p/10947959.html

时间: 2024-08-03 18:58:40

阿里云api调用的相关文章

阿里云api调用做简单的cmdb

阿里云api调用做简单的cmdb 1 步骤 事实上就是调用阿里api.获取可用区,比方cn-hangzhou啊等等.然后在每一个区调用api 取ecs的状态信息,最好写到一个excel里面去.方便排序排版. 2 示意图 3 源代码 https://github.com/gqdw/cmdb/tree/master

Python 调用阿里云 API 收集 ECS 数据

#!/usr/bin/env python # coding: utf-8 # author: Wang XiaoQiang ''' 功能介绍: 1.调用阿里云API,收集所有区域 ECS 信息 2.将需要的数据整理.生成 Excel 文档 3.关于阿里 sdk 的安装,api 的调用请参考阿里云官网 4.xlsxwriter 请参考这里:http://xlsxwriter.readthedocs.org/ ''' import json, sys try: from termcolor imp

浅析阿里云API网关的产品架构和常见应用场景

自上世纪60年代计算机网络发展开始,API(Application Programming Interface )随之诞生,API即应用程序接口,是实现系统间衔接的桥梁.时至今日,API市场已经形成了一个庞大的生态体系,在拥抱API经济的过程当中,API网关这一个组件起到了至关重要的作用. 什么是API网关 API 网关提供完整的 API 托管服务,辅助用户将能力.服务.数据以 API 的形式开放给合作伙伴,也可以发布到 API 市场供更多的开发者采购使用. 1.提供防攻击.防重放.请求加密.身

调用阿里云api获取阿里云数据同步服务(DTS)并且作图发送邮件的整个流程

前言 在https://rorschachchan.github.io/2018/02/24/阿里云获取DTS服务延迟的脚本/ 文章里已经写过,领导现在要求"每天查看阿里云dts同步的延迟情况和同步速率情况",并且在https://rorschachchan.github.io/2018/02/27/使用matplotlib画图的一个脚本/ 里面也放了一个使用python matplotlib画图的demo,这篇文章的目的就是把整个过程实现,并且把dts图形以每日邮件的形式发送给领导的

01.阿里云SDK调用,获取ESC主机详细信息

一:通过python SDK获取云主机的详细信息 1.创建Accessky码(不做展示) 2.通过pip安装SDK模块,这个阿里云帮助里面有,也不做详细展示. 3.详细使用方法看代码 我下面展示的返回的json格式,默认为xml格式. DescribeInstancesRequest 为获取esc详细信息的函数,其他函数可以参考阿里云官方支持,我后续也会更新. #! -*- coding:utf-8 -*- import json from aliyunsdkcore import client

python封装 阿里云api

最近用到阿里云的API和SDK ,用python封装了一下基础类,我只是用查询一下接口的信息,别的没有测试过,可以看看 内容如下: #-*- coding: utf-8 -*-# author: sunwei import jsonimport uuidfrom urllib import requestimport hmac,base64from hashlib import sha1from datetime import datetime, timedelta, timezone,date

python3.6 通过调用 阿里云 API (非SDK方式) 查询 可用区 例子

#!/usr/bin/env python3.5 # -*- coding:utf8 -*- try: import httplib except ImportError: import http.client as httplib import sys, datetime import urllib import urllib.request import urllib.error import urllib.parse import time import json import base6

阿里云API网关(17)签名算法

文档说明:https://help.aliyun.com/document_detail/54229.html?spm=5176.doc27781.6.753.wWRWnm&parentId=29462 代码下载:https://github.com/aliyun/aliyun-openapi-java-sdk/tree/master/aliyun-java-sdk-core aliyun-openapi-java-sdk/aliyun-java-sdk-core/src/main/java/c

基于阿里云 DNS API 实现的 DDNS 工具

0.简要介绍 0.1 思路说明 AliDDNSNet 是基于 .NET Core 开发的动态 DNS 解析工具,借助于阿里云的 DNS API 来实现域名与动态 IP 的绑定功能.工具核心就是调用了阿里云 DNS 的两个 API ,一个 API 获取指定域名的所有解析记录,然后通过比对与当前公网 IP 是否一致,一致则不进行更改,不一致则通过另外一个修改 API 来修改指定子域名的修改记录. 0.2 使用说明 使用时请更改同目录下的 settings.json.example 为 setting