python编写api调用ceph对象网关

#_*_coding:utf-8_*_
import boto
import boto.s3.connection
#pip install filechunkio
from filechunkio import  FileChunkIO
import math
import  threading
import os
import Queue

class Chunk(object):
    num = 0
    offset = 0
    len = 0
    def __init__(self,n,o,l):
        self.num=n
        self.offset=o
        self.length=l

class CONNECTION(object):
    def __init__(self,access_key,secret_key,ip,port,is_secure=False,chrunksize=4<<20):
        self.conn=boto.connect_s3(
        aws_access_key_id=access_key,
        aws_secret_access_key=secret_key,
        host=ip,port=port,
        is_secure=is_secure,
        calling_format=boto.s3.connection.OrdinaryCallingFormat()
        )
        self.chrunksize=chrunksize

    #查询
    def list_all(self):
        all_buckets=self.conn.get_all_buckets()
        for bucket in all_buckets:
            print u‘容器名: %s‘ %(bucket.name)
            for key in bucket.list():
                print ‘ ‘*5,"%-20s%-20s%-20s%-40s%-20s" %(key.mode,key.owner.id,key.size,key.last_modified.split(‘.‘)[0],key.name)

    def list_single(self,bucket_name):
        try:
            single_bucket = self.conn.get_bucket(bucket_name)
        except Exception as e:
            print ‘bucket %s is not exist‘ %bucket_name
            return
        print u‘容器名: %s‘ % (single_bucket.name)
        for key in single_bucket.list():
            print ‘ ‘ * 5, "%-20s%-20s%-20s%-40s%-20s" % (key.mode, key.owner.id, key.size, key.last_modified.split(‘.‘)[0], key.name)

    #普通小文件下载:文件大小<=4M
    def dowload_file(self,filepath,key_name,bucket_name):
        all_bucket_name_list = [i.name for i in self.conn.get_all_buckets()]
        if bucket_name not in all_bucket_name_list:
            print ‘Bucket %s is not exist,please try again‘ % (bucket_name)
            return
        else:
            bucket = self.conn.get_bucket(bucket_name)

        all_key_name_list = [i.name for i in bucket.get_all_keys()]
        if key_name not in all_key_name_list:
            print ‘File %s is not exist,please try again‘ % (key_name)
            return
        else:
            key = bucket.get_key(key_name)

        if not os.path.exists(os.path.dirname(filepath)):
            print ‘Filepath %s is not exists, sure to create and try again‘ % (filepath)
            return

        if os.path.exists(filepath):
            while True:
                d_tag = raw_input(‘File %s already exists, sure you want to cover (Y/N)?‘ % (key_name)).strip()
                if d_tag not in [‘Y‘, ‘N‘] or len(d_tag) == 0:
                    continue
                elif d_tag == ‘Y‘:
                    os.remove(filepath)
                    break
                elif d_tag == ‘N‘:
                    return
        os.mknod(filepath)
        try:
            key.get_contents_to_filename(filepath)
        except Exception:
            pass

    # 普通小文件上传:文件大小<=4M
    def upload_file(self,filepath,key_name,bucket_name):
        try:
            bucket = self.conn.get_bucket(bucket_name)
        except Exception as e:
            print ‘bucket %s is not exist‘ % bucket_name
            tag = raw_input(‘Do you want to create the bucket %s: (Y/N)?‘ % bucket_name).strip()
            while tag not in [‘Y‘, ‘N‘]:
                tag = raw_input(‘Please input (Y/N)‘).strip()
            if tag == ‘N‘:
                return
            elif tag == ‘Y‘:
                self.conn.create_bucket(bucket_name)
                bucket = self.conn.get_bucket(bucket_name)
        all_key_name_list = [i.name for i in bucket.get_all_keys()]
        if key_name in all_key_name_list:
            while True:
                f_tag = raw_input(u‘File already exists, sure you want to cover (Y/N)?: ‘).strip()
                if f_tag not in [‘Y‘, ‘N‘] or len(f_tag) == 0:
                    continue
                elif f_tag == ‘Y‘:
                    break
                elif f_tag == ‘N‘:
                    return
        key=bucket.new_key(key_name)
        if not os.path.exists(filepath):
            print ‘File %s does not exist, please make sure you want to upload file path and try again‘ %(key_name)
            return
        try:
            f=file(filepath,‘rb‘)
            data=f.read()
            key.set_contents_from_string(data)
        except Exception:
            pass

    def delete_file(self,key_name,bucket_name):
        all_bucket_name_list = [i.name for i in self.conn.get_all_buckets()]
        if bucket_name not in all_bucket_name_list:
            print ‘Bucket %s is not exist,please try again‘ % (bucket_name)
            return
        else:
            bucket = self.conn.get_bucket(bucket_name)

        all_key_name_list = [i.name for i in bucket.get_all_keys()]
        if key_name not in all_key_name_list:
            print ‘File %s is not exist,please try again‘ % (key_name)
            return
        else:
            key = bucket.get_key(key_name)

        try:
            bucket.delete_key(key.name)
        except Exception:
            pass

    def delete_bucket(self,bucket_name):
        all_bucket_name_list = [i.name for i in self.conn.get_all_buckets()]
        if bucket_name not in all_bucket_name_list:
            print ‘Bucket %s is not exist,please try again‘ % (bucket_name)
            return
        else:
            bucket = self.conn.get_bucket(bucket_name)

        try:
            self.conn.delete_bucket(bucket.name)
        except Exception:
            pass

    #队列生成
    def init_queue(self,filesize,chunksize):   #8<<20 :8*2**20
        chunkcnt=int(math.ceil(filesize*1.0/chunksize))
        q=Queue.Queue(maxsize=chunkcnt)
        for i in range(0,chunkcnt):
            offset=chunksize*i
            length=min(chunksize,filesize-offset)
            c=Chunk(i+1,offset,length)
            q.put(c)
        return q

    #分片上传object
    def upload_trunk(self,filepath,mp,q,id):
        while not q.empty():
            chunk=q.get()
            fp=FileChunkIO(filepath,‘r‘,offset=chunk.offset,bytes=chunk.length)
            mp.upload_part_from_file(fp,part_num=chunk.num)
            fp.close()
            q.task_done()

    #文件大小获取---->S3分片上传对象生成----->初始队列生成(--------------->文件切,生成切分对象)
    def upload_file_multipart(self,filepath,key_name,bucket_name,threadcnt=8):
        filesize=os.stat(filepath).st_size
        try:
            bucket=self.conn.get_bucket(bucket_name)
        except Exception as e:
            print ‘bucket %s is not exist‘ % bucket_name
            tag=raw_input(‘Do you want to create the bucket %s: (Y/N)?‘ %bucket_name).strip()
            while tag not in [‘Y‘,‘N‘]:
                tag=raw_input(‘Please input (Y/N)‘).strip()
            if tag == ‘N‘:
                return
            elif tag == ‘Y‘:
                self.conn.create_bucket(bucket_name)
                bucket = self.conn.get_bucket(bucket_name)
        all_key_name_list=[i.name for i in bucket.get_all_keys()]
        if key_name  in all_key_name_list:
            while True:
                f_tag=raw_input(u‘File already exists, sure you want to cover (Y/N)?: ‘).strip()
                if f_tag not in [‘Y‘,‘N‘] or len(f_tag) == 0:
                    continue
                elif f_tag == ‘Y‘:
                    break
                elif f_tag == ‘N‘:
                    return

        mp=bucket.initiate_multipart_upload(key_name)
        q=self.init_queue(filesize,self.chrunksize)
        for i in range(0,threadcnt):
            t=threading.Thread(target=self.upload_trunk,args=(filepath,mp,q,i))
            t.setDaemon(True)
            t.start()
        q.join()
        mp.complete_upload()

    #文件分片下载
    def download_chrunk(self,filepath,key_name,bucket_name,q,id):
        while not q.empty():
            chrunk=q.get()
            offset=chrunk.offset
            length=chrunk.length
            bucket=self.conn.get_bucket(bucket_name)
            resp=bucket.connection.make_request(‘GET‘,bucket_name,key_name,headers={‘Range‘:"bytes=%d-%d" %(offset,offset+length)})
            data=resp.read(length)
            fp=FileChunkIO(filepath,‘r+‘,offset=chrunk.offset,bytes=chrunk.length)
            fp.write(data)
            fp.close()
            q.task_done()

    def download_file_multipart(self,filepath,key_name,bucket_name,threadcnt=8):
        all_bucket_name_list=[i.name for i in self.conn.get_all_buckets()]
        if bucket_name not in all_bucket_name_list:
            print ‘Bucket %s is not exist,please try again‘ %(bucket_name)
            return
        else:
            bucket=self.conn.get_bucket(bucket_name)

        all_key_name_list = [i.name for i in bucket.get_all_keys()]
        if key_name not in all_key_name_list:
            print ‘File %s is not exist,please try again‘ %(key_name)
            return
        else:
            key=bucket.get_key(key_name)

        if not os.path.exists(os.path.dirname(filepath)):
            print ‘Filepath %s is not exists, sure to create and try again‘ % (filepath)
            return

        if os.path.exists(filepath):
            while True:
                d_tag = raw_input(‘File %s already exists, sure you want to cover (Y/N)?‘ % (key_name)).strip()
                if d_tag not in [‘Y‘, ‘N‘] or len(d_tag) == 0:
                    continue
                elif d_tag == ‘Y‘:
                    os.remove(filepath)
                    break
                elif d_tag == ‘N‘:
                    return
        os.mknod(filepath)
        filesize=key.size
        q=self.init_queue(filesize,self.chrunksize)
        for i in range(0,threadcnt):
            t=threading.Thread(target=self.download_chrunk,args=(filepath,key_name,bucket_name,q,i))
            t.setDaemon(True)
            t.start()
        q.join()

if __name__ == ‘__main__‘:
    #约定:
    #1:filepath指本地文件的路径(上传路径or下载路径),指的是绝对路径
    #2:bucket_name相当于文件在对象存储中的目录名或者索引名
    #3:key_name相当于文件在对象存储中对应的文件名或文件索引

    access_key = "65IY4EC1BSFYNH6SHWGW"
    secret_key = "viNfIftLHhrPt2MYK44DkWGvxZb82aYqLrCzGYLx"
    ip=‘172.16.201.36‘
    port=8080
    conn=CONNECTION(access_key,secret_key,ip,port)
    #查看所有bucket以及其包含的文件
    #conn.list_all()

    #简单上传,用于文件大小<=4M
    # conn.upload_file(‘/etc/passwd‘,‘passwd‘,‘test_bucket01‘)
    #查看单一bucket下所包含的文件信息
    # conn.list_single(‘test_bucket01‘)

    #简单下载,用于文件大小<=4M
    # conn.dowload_file(‘/lhf_test/test01‘,‘passwd‘,‘test_bucket01‘)
    # conn.list_single(‘test_bucket01‘)

    #删除文件
    # conn.delete_file(‘passwd‘,‘test_bucket01‘)
    # conn.list_single(‘test_bucket01‘)
    #
    #删除bucket
    # conn.delete_bucket(‘test_bucket01‘)
    # conn.list_all()

    #切片上传(多线程),用于文件大小>4M,4M可修改
    # conn.upload_file_multipart(‘/etc/passwd‘,‘passwd_multi_upload‘,‘test_bucket01‘)
    # conn.list_single(‘test_bucket01‘)

    # 切片下载(多线程),用于文件大小>4M,4M可修改
    # conn.download_file_multipart(‘/lhf_test/passwd_multi_dowload‘,‘passwd_multi_upload‘,‘test_bucket01‘)
时间: 2024-10-08 15:37:32

python编写api调用ceph对象网关的相关文章

python编写api接口

 目标: 使用Python实现一个简单的接口服务,可以通过get.post方法请求该接口,拿到响应数据.创建一个api_server.py文件, 想要实现的效果是这样的: 添加代码如下:  1 import flask,json 2 from flask import request 3 4 ''' 5 flask: seb框架,通过flask提供的装饰器@server.route()将普通函数转换为服务 6 登录接口,需要传入url,username,passwd 7 ''' 8 9 #创建一

Ceph 对象存储及客户端配置(三)

一.对象存储介绍 作为文件系统的磁盘,操作系统不能直接访问对象存储.相反,它只能通过应用程序级别的API访问.Ceph是一种分布式对象存储系统,通过Ceph对象网关提供对象存储接口,也称为RADOS网关(RGW)接口,它构建在Ceph RADOS层之上. RGW使用librgw (RADOS Gateway Library)和librados,允许应用程序与Ceph对象存储建立连接. RGW为应用程序提供了一个RESTful S3 / swift兼容的API接口,用于在Ceph集群中以对象的形式

c++与python的互相调用

[编者按]最近一直发Python的资料,是因为Python的脚本扩展性.现在.net的动态语言特性已经很强大了,似乎脚本的作用并不明显.但是对于老式的C++,如果能够结合脚本语言的动态性,引用最近流行的一句话:必可以形成犄角之势!C++调用Python可以通过API,也可以通过Boost库实现,Boost.Python就是对API的包装,方便调用而已. 这是一篇比较老的介绍Boost.Python的文章,权当入门吧.因为文中一些东西现在不对…… Boost.Python是 Boost 中的一个组

Ceph对象存储介绍与安装

一:概念介绍 Ceph对象网关是建立在librados之上的对象存储接口,可为应用程序提供通往Ceph存储集群的RESTful网关, Ceph对象存储支持两个接口 1.S3兼容:为对象存储功能提供与Amazon S3 RESTful API的大部分子集兼容的接口2.Swift兼容:为对象存储功能提供与OpenStack Swift Ceph对象存储使用Ceph对象网关守护进程(radosgw),该守护进程是用于与Ceph存储群集进行交互的HTTP服务器,由于它提供与OpenStack Swift

基于redhat7.3 ceph对象存储集群搭建+owncloud S3接口整合生产实践

一.环境准备 安装redhat7.3虚拟机四台 在四台装好的虚拟机上分别加一块100G的硬盘.如图所示: 3.在每个节点上配置主机名 4.集群配置信息如下 admin-node node1 node2 node3 192.168.42.110 192.168.42.111 192.168.42.112 192.168.42.113 deploy.osd*1 mon*1.osd*1. rgw*1.mds*1 mon*1.osd*1 mon*1.osd*1 5.各节点配置yum源 #需要在每个主机上

使用C语言为python编写动态模块(1)--从底层深度解析python中的对象以及变量

楔子 我们知道可以通过使用C语言编写动态链接库的方式来给python加速,但是方式是通过ctypes来加载,通过类CDLL将动态链接库加载进来得到一个对象之后,通过这个对象来调用动态链接库里面的函数.那么问题来了,我们可不可以使用C语言为python编写模块呢?然后在使用的时候不使用ctypes加载动态库的方式,而是通过python的关键字import进行加载. 答案是可以的,我们知道可以通过编写py文件的方式来得到一个模块,那么也可以使用C语言来编写C源文件,然后再通过python解释器进行编

使用C语言为python编写动态模块(2)--解析python中的对象如何在C语言中传递并返回

楔子 编写扩展模块,需要有python源码层面的知识,我们之前介绍了python中的对象.但是对于编写扩展模块来讲还远远不够,因为里面还需要有python中模块的知识,比如:如何创建一个模块.如何初始化python环境等等.因此我们还需要了解一些前奏的知识,如果你的python基础比较好的话,那么我相信你一定能看懂,当然我们一开始只是介绍一个大概,至于细节方面我们会在真正编写扩展模块的时候会说. 关于使用C为python编写扩展模块,我前面还有一篇博客,强烈建议先去看那篇博客,对你了解Pytho

使用Tcl脚本调用STC平台的Native API实现测试对象、Device、StreamBlock的创建和配置

环境配置如下: 在Windows的Conmand下操作 # # Configuration file(CommonConFig.tcl) # # Initializing the Script. # This script is used to set common configuration paramters used with the STC-300 class. #Step 1: Load Spirent TestCenter Automation API. # Type in the

ceph对象存储(rgw)服务、高可用安装配置

ceph对象存储服务.高可用安装配置 简介:    Ceph本质上就是一个rados,利用命令rados就可以访问和使用ceph的对象存储,但作为一个真正产品机的对象存储服务,通常使用的是Restfulapi的方式进行访问和使用.而radosgw其实就是这个作用,安装完radosgw以后,就可以使用api来访问和使用ceph的对象存储服务了.    首先明白一下架构,radosgw其实名副其实,就是rados的一个网关,作用是对外提供对象存储服务.本质上radosgw(其实也是一个命令)和rbd