ceph储存的S3接口实现(支持断点续传)

最近公司准备接ceph储存,研究了一番,准备用亚马逊的s3接口实现,实现类如下:

/**
 * Title:        S3Manager
 * Description:  Ceph储存的s3接口实现,参考文档:
 * https://docs.aws.amazon.com/zh_cn/AmazonS3/latest/dev/RetrievingObjectUsingJava.html
 * http://docs.ceph.org.cn/radosgw/s3/
 * author:       xu jun
 * date:         2018/10/22
 */
@Slf4j
@Service
public class S3Manager extends StorageManagerBase implements StorageManager {
    private final UKID ukid;
    private final S3ClientConfig s3ClientConfig;
    private final RedisManage redisManage;
    private AmazonS3 amazonClient;

    @Autowired
    public S3Manager(UKID ukid, S3ClientConfig s3ClientConfig, RedisManage redisManage) {
        this.ukid = ukid;
        this.s3ClientConfig = s3ClientConfig;
        this.redisManage = redisManage;
    }

    private AmazonS3 getAmazonClient() {
        if (amazonClient == null) {
            String accessKey = s3ClientConfig.getAccessKey();
            String secretKey = s3ClientConfig.getSecretKey();
            String endpoint = s3ClientConfig.getEndPoint();

            AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
            ClientConfiguration clientConfig = new ClientConfiguration();
            clientConfig.setProtocol(Protocol.HTTP);

            AmazonS3 conn = AmazonS3ClientBuilder.standard()
                    .withClientConfiguration(clientConfig)
                    .withCredentials(new AWSStaticCredentialsProvider(credentials))
                    .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, ""))
                    .withPathStyleAccessEnabled(true)
                    .build();

            //检查储存空间是否创建
            checkBucket(conn);
            amazonClient = conn;
        }
        return amazonClient;
    }

    @Override
    public String uploadFile(byte[] fileData, String extension) {
        log.info("Storage s3 api, upload file start");

        //生成上传文件的随机序号
        long fileId = ukid.getGeneratorID();
        String fileName = Long.toString(fileId);
        //储存空间名
        String bucketName = s3ClientConfig.getBucketName();
        AmazonS3 conn = getAmazonClient();

        PutObjectResult result = conn.putObject(bucketName, fileName, new ByteArrayInputStream(fileData), null);
        log.info("Storage s3 api, put object result :{}", result);

        log.info("Storage s3 api, upload file end, file name:" + fileName);
        return fileName;
    }

    @Override
    public String uploadAppenderFile(byte[] fileData, String extension) {
        log.info("Storage s3 api, upload appender file start");

        //生成上传文件的随机序号
        long ukId = ukid.getGeneratorID();
        String fileName = Long.toString(ukId);
        //储存空间名
        String bucketName = s3ClientConfig.getBucketName();
        AmazonS3 conn = getAmazonClient();
        List<PartETag> partETags = new ArrayList<>();
        //初始化分片上传
        InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, fileName);
        InitiateMultipartUploadResult initResponse = conn.initiateMultipartUpload(initRequest);
        String uploadId = initResponse.getUploadId();

        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(fileData);
        Integer contentLength = fileData.length;
        // 文件上传
        UploadPartRequest uploadPartRequest = new UploadPartRequest()
                .withBucketName(bucketName)
                .withKey(fileName)
                .withUploadId(uploadId)
                .withPartNumber(1)
                .withPartSize(contentLength)
                .withInputStream(byteArrayInputStream);
        UploadPartResult uploadPartResult = conn.uploadPart(uploadPartRequest);

        try {
            byteArrayInputStream.close();
        } catch (IOException e) {
            throw FileCenterExceptionConstants.INTERNAL_IO_EXCEPTION;
        }
        partETags.add(uploadPartResult.getPartETag());
        Integer partNumber = uploadPartResult.getPartNumber();

        S3CacheMode cacheMode = new S3CacheMode();
        cacheMode.setPartETags(partETags);
        cacheMode.setPartNumber(partNumber);
        cacheMode.setUploadId(uploadId);
        redisManage.set(fileName, cacheMode);

        log.info("Storage s3 api, upload appender file end, fileName: {}", fileName);
        return fileName;
    }

    @Override
    public void uploadChunkFile(ChunkFileSaveParams chunkFileSaveParams) {
        log.info("Storage s3 api, upload chunk file start");

        String fileName = chunkFileSaveParams.getFileAddress();
        Result result = redisManage.get(fileName);
        JSONObject jsonObject = (JSONObject) result.getData();
        if (jsonObject == null) {
            throw FileCenterExceptionConstants.CACHE_DATA_NOT_EXIST;
        }
        S3CacheMode cacheMode = jsonObject.toJavaObject(S3CacheMode.class);
        Integer partNumber = cacheMode.partNumber;
        String uploadId = cacheMode.getUploadId();
        List<PartETag> partETags = cacheMode.partETags;

        //储存空间名
        String bucketName = s3ClientConfig.getBucketName();
        AmazonS3 conn = getAmazonClient();
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(chunkFileSaveParams.getBytes());
        Integer contentLength = chunkFileSaveParams.getBytes().length;

        UploadPartRequest uploadPartRequest = new UploadPartRequest()
                .withBucketName(bucketName)
                .withKey(fileName)
                .withUploadId(uploadId)
                .withPartNumber(partNumber + 1)
                .withPartSize(contentLength)
                .withInputStream(byteArrayInputStream);

        UploadPartResult uploadPartResult = conn.uploadPart(uploadPartRequest);
        partETags.add(uploadPartResult.getPartETag());
        partNumber = uploadPartResult.getPartNumber();

        try {
            byteArrayInputStream.close();
        } catch (IOException e) {
            throw FileCenterExceptionConstants.INTERNAL_IO_EXCEPTION;
        }

        S3CacheMode cacheModeUpdate = new S3CacheMode();
        cacheModeUpdate.setPartETags(partETags);
        cacheModeUpdate.setPartNumber(partNumber);
        cacheModeUpdate.setUploadId(uploadId);
        redisManage.set(fileName, cacheModeUpdate);

        if (chunkFileSaveParams.getChunk().equals(chunkFileSaveParams.getChunks() - 1)) {
            //完成分片上传,生成储存对象
            CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(bucketName, fileName,
                    uploadId, partETags);
            conn.completeMultipartUpload(compRequest);
        }

        log.info("Storage s3 api, upload chunk file end");
    }

    @Override
    public byte[] downloadFile(String fileName) {
        log.info("Storage s3 api, download file start");
        //储存空间名
        String bucketName = s3ClientConfig.getBucketName();
        AmazonS3 conn = getAmazonClient();
        S3Object object;
        if (conn.doesObjectExist(bucketName, fileName)) {
            object = conn.getObject(bucketName, fileName);
        } else {
            throw FileCenterExceptionConstants.OBJECT_NOT_EXIST;
        }
        log.debug("Storage s3 api, get object result :{}", object);

        byte[] fileByte;
        InputStream inputStream;
        inputStream = object.getObjectContent();
        try {
            fileByte = IOUtils.toByteArray(inputStream);
            inputStream.close();
        } catch (IOException e) {
            throw FileCenterExceptionConstants.INTERNAL_IO_EXCEPTION;
        } finally {
            if (inputStream != null) {
                try {
                    inputStream.close();
                } catch (IOException e) {
                    log.error(e.getMessage());
                }
            }
        }
        log.info("Storage s3 api, download file end");
        return fileByte;
    }

    @Override
    public byte[] downloadFile(String fileName, long fileOffset, long fileSize) {
        log.info("Storage s3 api, download file by block start");
        //储存空间名
        String bucketName = s3ClientConfig.getBucketName();
        AmazonS3 conn = getAmazonClient();
        S3Object object;
        if (conn.doesObjectExist(bucketName, fileName)) {
            GetObjectRequest getObjectRequest = new GetObjectRequest(bucketName, fileName)
                    .withRange(fileOffset, fileOffset + fileSize);
            //范围下载。
            object = conn.getObject(getObjectRequest);
        } else {
            throw FileCenterExceptionConstants.OBJECT_NOT_EXIST;
        }
        log.info("Storage s3 api, get object result :{}", object);

        // 读取数据。
        byte[] buf;
        InputStream in = object.getObjectContent();
        try {
            buf = inputToByte(in, (int) fileSize);
        } catch (IOException e) {
            throw FileCenterExceptionConstants.INTERNAL_IO_EXCEPTION;
        } finally {
            try {
                in.close();
            } catch (IOException e) {
                log.error(e.getMessage());
            }
        }
        log.info("Storage s3 api, download file by block end");
        return buf;
    }

    @Override
    public String fileSecret(String filePath) {
        return null;
    }

    @Override
    public String fileDecrypt(String filePath) {
        return null;
    }

    @Override
    public String getDomain() {
        return null;
    }

    /**
     * 检查储存空间是否已创建
     *
     * @param conn 客户端连接
     */
    private void checkBucket(AmazonS3 conn) {
        //储存空间名
        String bucketName = s3ClientConfig.getBucketName();
        if (conn.doesBucketExist(bucketName)) {
            log.debug("Storage s3 api, bucketName is found: " + bucketName);
        } else {
            log.warn("Storage s3 api, bucketName is not exist, create it: " + bucketName);
            conn.createBucket(bucketName);
        }
    }

    /**
     * inputStream转byte[]
     *
     * @param inStream 输入
     * @param fileSize 文件大小
     * @return 输出
     * @throws IOException 异常
     */
    private static byte[] inputToByte(InputStream inStream, int fileSize) throws IOException {
        ByteArrayOutputStream swapStream = new ByteArrayOutputStream();
        byte[] buff = new byte[fileSize];
        int rc;
        while ((rc = inStream.read(buff, 0, fileSize)) > 0) {
            swapStream.write(buff, 0, rc);
        }
        return swapStream.toByteArray();
    }

    /**
     * 调试用的方法,可以在控制台看到io的数据
     *
     * @param input 输入
     * @throws IOException 异常
    private static void displayTextInputStream(InputStream input) throws IOException {
        // Read the text input stream one line at a time and display each line.
        BufferedReader reader = new BufferedReader(new InputStreamReader(input));
        String line;
        while ((line = reader.readLine()) != null) {
            log.info(line);
        }
    }
     */
}

业务接口要实现包括分片上传(支持断点续传)、分片下载等功能,上面类是底层类不包含业务逻辑。

maven依赖:

        <!-- ceph储存的接口 -->
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk</artifactId>
            <version>1.11.433</version>
        </dependency>

开发感受:

  1.ceph官网上提供的s3接口文档(java版),内容又少又旧,已经基本不能当做参考了。所以API和代码示例要去亚马逊官网上看(提供了中文版,好评)

  2.s3接口本身不提供文件追加储存的功能。所以在实现分片上传的时候,比较麻烦(不想fastDFS和OSS那么方便)

  3.分片上传默认最小限制是5M,要修改可以在服务器配置上做

  4.如果使用域名做端点的话,默认会把bucket的名字,作为子域名来访问(需要域名解析,所以不建议)。如果想作为路径来访问,需要在连接配置中指定。

原文地址:https://www.cnblogs.com/xujanus/p/9851604.html

时间: 2024-10-07 05:03:51

ceph储存的S3接口实现(支持断点续传)的相关文章

基于LAMP php7.1搭建owncloud云盘 与ceph对象存储S3借口整合案例

ownCloud简介:      是一个来自 KDE 社区开发的免费软件,提供私人的 Web 服务.当前主要功能包括文件管理(内建文件分享).音乐.日历.联系人等等,可在PC和服务器上运行.     简单来说就是一个基于Php的自建网盘.基本上是私人使用这样,因为直到现在开发版本也没有暴露注册功能.我这里采用基于php7.1的LAMP环境搭建这个owncloud 下一篇将介绍和ceph对象存储整合案例 一.环境准备,这里的owncloud是10版本,所以需要php5.6以上的LAMP环境,否则会

使用COSBench工具对ceph s3接口进行压力测试

一.COSBench安装 COSBench是Intel团队基于java开发,对云存储的测试工具,全称是Cloud object Storage Bench 吐槽下,貌似这套工具是intel上海团队开发的,竟然没有中文的相关资料. 同所有的性能测试工具一样,COSBench也分控制台和发起请求的driver,且driver可以分布式部署.可以支持swift.s3.Openstack等接口 1. 下载COSBench工具 下载地址为:https://github.com/intel-cloud/co

自己写的一个简单的迅雷下载支持断点续传

当我学习了网络线程,就自己仿照迅雷下载写了一个下载器,支持断点续传 我用的是SWT插件做的界面 界面 package com.yc.xunlei; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.util.Arra

IOS开发网络篇之──ASIHTTPRequest下载示例(支持断点续传)

IOS开发网络篇之──ASIHTTPRequest下载示例(支持断点续传) 网络ios文件管理器attributespathurl 在工程中,我们会常常遇到需要下载的程序,比如下载在线音乐.下载图片等等,今天我将介绍一下利用ASIHTTPRequest的下载示例,支持断点续传,利用ASIHTTPRequest下载以及断点续传的原理在我的博客:http://blog.csdn.net/pjk1129/article/details/6575588中有具体的介绍,今天重点介绍如何实现,废话少说,开始

edtftpj让Java上传FTP文件支持断点续传

在用Java实现FTP上传文件功能时,特别是上传大文件的时候,可以需要这样的功能:程序在上传的过程中意外终止了,文件传了一大半,想从断掉了地方继续传:或者想做类似迅雷下载类似的功能,文件太大,今天传一半,睡一觉去先,明天继续传. Java上传FTP文件,用的比较多的工具是apache的commons-net.如果想用commons-net实现FTP上传的断点续传还是有点麻烦. 除了commons-net之外,还有很多非常优秀的FTP工具,这里使用edtftpj这个工具来实现断点续传. 下载:ht

在ASP.NET中支持断点续传下载大文件(ZT)

IE的自带下载功能中没有断点续传功能,要实现断点续传功能,需要用到HTTP协议中鲜为人知的几个响应头和请求头. 一. 两个必要响应头Accept-Ranges.ETag         客户端每次提交下载请求时,服务端都要添加这两个响应头,以保证客户端和服务端将此下载识别为可以断点续传的下载: Accept-Ranges:告知下载客户端这是一个可以恢复续传的下载,存放本次下载的开始字节位置.文件的字节大小: ETag:保存文件的唯一标识(我在用的文件名+文件最后修改时间,以便续传请求时对文件进行

[IOS_HTTP]ASIHTTPRequest下载示例(支持断点续传)

在工程中,我们会常常遇到需要下载的程序,比如下载在线音乐.下载图片等等,今天我将介绍一下利用ASIHTTPRequest的下载示例,支持断点续传,利用ASIHTTPRequest下载以及断点续传的原理在我的博客:http://www.cnblogs.com/webapplee/p/3784599.html 中有具体的介绍,今天重点介绍如何实现,废话少说,开始正文: 一.创建网络请求队列 首先,创建网络请求队列,如下: ASINetworkQueue   *que = [[ASINetworkQu

【FTP】FTP文件上传下载-支持断点续传

Jar包:apache的commons-net包: 支持断点续传 支持进度监控(有时出不来,搞不清原因) 相关知识点 编码格式: UTF-8等; 文件类型: 包括[BINARY_FILE_TYPE(常用)]和[ASCII_FILE_TYPE]两种; 数据连接模式:一般使用LocalPassiveMode模式,因为大部分客户端都在防火墙后面: 1. LocalPassiveMode:服务器端打开数据端口,进行数据传输: 2. LocalActiveMode:客户端打开数据端口,进行数据传输: 系统

C# http下载(支持断点续传)

分享下项目里面自己封装的一个http下载类 功能如下: 1.支持断点续传 2.下载失败自动重试 3.超时等异常处理 using System; using System.Collections.Generic; using System.IO; using System.Net; public class HttpManager { const int bytebuff = 1024; const int ReadWriteTimeOut = 2 * 1000;//超时等待时间 const in