阿里OSS大文件分片上传

OSS文件分片上传

  • 依赖
<!-- https://mvnrepository.com/artifact/com.aliyun.oss/aliyun-sdk-oss -->
<dependency>
	<groupId>com.aliyun.oss</groupId>
	<artifactId>aliyun-sdk-oss</artifactId>
	<version>3.8.1</version>
</dependency>
  • 基础参数dto

@Data
@Builder
public class OssParamDTO {

    private String endpoint;
    private String accessKeyId;
    private String accessKeySecret;
    private String bucketName;
    private String folder;
    /**
     * objectName = folder + fileName
     */
    private String objectName;

    /**
     * 上传线程
     */
    private Integer task;

    /**
     * 每个线程处理大小 分片大小
     */
    private Integer number;

}

具体上传方法

  • 小文件上传
    public static PutObjectResult uploadFile(OssParamDTO ossParamDTO, InputStream inputStream){
        // 创建OSSClient实例。
        OSS ossClient = new OSSClientBuilder().build(ossParamDTO.getEndpoint(), ossParamDTO.getAccessKeyId(), ossParamDTO.getAccessKeySecret());
        PutObjectResult putObjectResult = null;
        // 上传文件流。
        try {
            putObjectResult = ossClient.putObject(ossParamDTO.getBucketName(), ossParamDTO.getObjectName(), inputStream);
            //权限设置
            ossClient.setBucketAcl(ossParamDTO.getBucketName(), CannedAccessControlList.PublicRead);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            // 关闭OSSClient。
            ossClient.shutdown();
        }
      return  putObjectResult;
    }

  • 大文件上传,分片oss自己处理
  • 处理逻辑:前段轮训查询数据库某个字段,当该字段被回调接口更新时结束轮训,上传完成
public static void uploadBigFile(OssParamDTO ossParamDTO,String path, File file,Long fileId) throws Throwable {
        System.out.println("上传时间:"+System.currentTimeMillis());
        // Endpoint以杭州为例,其它Region请按实际情况填写。
        String endpoint = ossParamDTO.getEndpoint();
        // 阿里云主账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM账号进行API访问或日常运维,请登录 https://ram.console.aliyun.com 创建RAM账号。
        String accessKeyId = ossParamDTO.getAccessKeyId();
        String accessKeySecret = ossParamDTO.getAccessKeySecret();

        // 创建OSSClient实例。
        OSS ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);

        ObjectMetadata meta = new ObjectMetadata();
        // 指定上传的内容类型。
        meta.setContentType("text/plain");

        // 通过UploadFileRequest设置多个参数。
        UploadFileRequest uploadFileRequest = new UploadFileRequest(ossParamDTO.getBucketName(),ossParamDTO.getObjectName());
        // 指定上传的本地文件。
        uploadFileRequest.setUploadFile(path);
        // 指定上传并发线程数,默认为1。
        uploadFileRequest.setTaskNum(ossParamDTO.getTask());
        // 指定上传的分片大小,范围为100KB~5GB,默认为文件大小/10000。
        uploadFileRequest.setPartSize(ossParamDTO.getNumber() * 1024 * 1024);

        uploadFileRequest.setObjectMetadata(meta);
        // 设置上传成功回调,参数为Callback类型。
        Callback callback = new Callback();
        callback.setCalbackBodyType(Callback.CalbackBodyType.URL);
		//回调参数 --- 同同步到数据库
        callback.setCallbackBody("fileId="+fileId+"&fileName=${object}&uploadStatus=1");
		//回调接口(自己服务器接口,可供外网访问)
        callback.setCallbackUrl("http://3m8wv2.natappfree.cc/web/common/callBack");
        uploadFileRequest.setCallback(callback);

        // 断点续传上传。
        ossClient.uploadFile(uploadFileRequest);
        //权限设置
        ossClient.setBucketAcl(ossParamDTO.getBucketName(), CannedAccessControlList.PublicRead);
        // 关闭OSSClient。
        ossClient.shutdown();
    }
  • 大文件本地分片,多线程执行分片上传,再合并碎片
  • 分片上传代码
 PartETag getUploadPartETag(String objectName, String bucketName, String uploadId,
                              InputStream instream, Long curPartSize,Integer partNum,
                              OSS ossClient, CountDownLatch countDownLatch){
       long before = System.currentTimeMillis();
       UploadPartRequest uploadPartRequest = null;
       try {
           log.debug("分片文件上传线程: {}",Thread.currentThread().getName());
           uploadPartRequest = new UploadPartRequest();
           uploadPartRequest.setBucketName(bucketName);
           uploadPartRequest.setKey(objectName);
           uploadPartRequest.setUploadId(uploadId);
           uploadPartRequest.setInputStream(instream);
           // 设置分片大小。除了最后一个分片没有大小限制,其他的分片最小为100KB。
           uploadPartRequest.setPartSize(curPartSize);
           // 设置分片号。每一个上传的分片都有一个分片号,取值范围是1~10000,如果超出这个范围,OSS将返回InvalidArgument的错误码。
           uploadPartRequest.setPartNumber(partNum);
           // 每个分片不需要按顺序上传,甚至可以在不同客户端上传,OSS会按照分片号排序组成完整的文件。
           UploadPartResult uploadPartResult = ossClient.uploadPart(uploadPartRequest);
           // 每次上传分片之后,OSS的返回结果会包含一个PartETag。PartETag将被保存到partETags中。
          log.debug("getPartETag  ::{}" ,uploadPartResult.getPartETag().getETag());
          return uploadPartResult.getPartETag();
       }finally {
           countDownLatch.countDown();
           log.debug("线程: {}  执行完毕, 等待线程数 :{}, 消耗时间: {}",
                   Thread.currentThread().getName(),countDownLatch.getCount(),
                   ((System.currentTimeMillis()-before)/1000)+"s");
       }
   }
  • 外部分片代码
   @Qualifier("taskExecutor")
   @Autowired
   ThreadPoolTaskExecutor taskExecutor;
/**
    * 上传
    * @param ossParamDTO
    * @param multipartFile
    * @return
    */
   public CompleteMultipartUploadResult uploadBigFileForProd(OssParamDTO ossParamDTO, MultipartFile multipartFile){
       Long before = System.currentTimeMillis();
       // Endpoint以杭州为例,其它Region请按实际情况填写。
       String endpoint = ossParamDTO.getEndpoint();
       // 阿里云主账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM账号进行API访问或日常运维,请登录 https://ram.console.aliyun.com 创建RAM账号。
       String accessKeyId = ossParamDTO.getAccessKeyId();
       String accessKeySecret = ossParamDTO.getAccessKeySecret();
       String bucketName = ossParamDTO.getBucketName();
       // <yourObjectName>表示上传文件到OSS时需要指定包含文件后缀在内的完整路径,例如abc/efg/123.jpg。
       String objectName = ossParamDTO.getObjectName();

       // 创建OSSClient实例。
       OSS ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);

       // 创建InitiateMultipartUploadRequest对象。
       InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, objectName);

       // 初始化分片。
       InitiateMultipartUploadResult upresult = ossClient.initiateMultipartUpload(request);
       // 返回uploadId,它是分片上传事件的唯一标识,您可以根据这个ID来发起相关的操作,如取消分片上传、查询分片上传等。
       String uploadId = upresult.getUploadId();

       // partETags是PartETag的集合。PartETag由分片的ETag和分片号组成。
       List<PartETag> partETags =  new ArrayList<>();
       // 计算文件有多少个分片 15MB
       final long partSize = 2 * 1024 * 1024L;
       long fileLength = multipartFile.getSize();
       int partCount = (int) (fileLength / partSize);
       if (fileLength % partSize != 0) {
           partCount++;
       }
       // 遍历分片上传。
       log.info("分片数量  {}",partCount);
       List<Future<PartETag>> futureList = Collections.synchronizedList(new ArrayList());
       CountDownLatch countDownLatch = new CountDownLatch(partCount);
       for (int i = 0; i < partCount; i++) {
           long startPos = i * partSize;
           long curPartSize = (i + 1 == partCount) ? (fileLength - startPos) : partSize;
           InputStream instream = null;
           try {
               instream = multipartFile.getInputStream();
           }  catch (IOException e) {
               e.printStackTrace();
           }
           // 跳过已经上传的分片。
           try {
               instream.skip(startPos);
           } catch (IOException e) {
               e.printStackTrace();
           }
           int finalI = i;
           InputStream finalInstream = instream;
           Future<PartETag> partETagFuture = taskExecutor.submit(() ->
                   fileServiceExtAsync.getUploadPartETag(objectName, bucketName, uploadId, finalInstream, curPartSize, finalI + 1, ossClient, countDownLatch));
           futureList.add(partETagFuture);
       }
       try {
           countDownLatch.await();
           for (Future<PartETag> tagFuture : futureList) {
               partETags.add(tagFuture.get());
           }
       } catch (InterruptedException | ExecutionException e) {
           e.printStackTrace();
       }
       // 创建CompleteMultipartUploadRequest对象。
       List<PartETag> collect = partETags.stream().sorted(Comparator.comparing(PartETag::getPartNumber)).collect(Collectors.toList());
       // 在执行完成分片上传操作时,需要提供所有有效的partETags。OSS收到提交的partETags后,会逐一验证每个分片的有效性。当所有的数据分片验证通过后,OSS将把这些分片组合成一个完整的文件。
       log.debug("文件开始合并");
       CompleteMultipartUploadRequest completeMultipartUploadRequest =
               new CompleteMultipartUploadRequest(bucketName, objectName, uploadId, collect);

       // 如果需要在完成文件上传的同时设置文件访问权限,请参考以下示例代码。
       completeMultipartUploadRequest.setObjectACL(CannedAccessControlList.PublicRead);
       // 完成上传。
       CompleteMultipartUploadResult completeMultipartUploadResult = ossClient.completeMultipartUpload(completeMultipartUploadRequest);

       // 关闭OSSClient。
       ossClient.shutdown();
       log.debug("消耗总时间:  {}",((System.currentTimeMillis()-before)/1000)+"s");
       return completeMultipartUploadResult;
   }

参考:springboot使用阿里云OSS的sdk上传文件
来源:

2 个赞

正好有这方面的需求 收藏了先

1 个赞