2026/4/18 8:08:57
网站建设
项目流程
门户网站的基本功能,ui中国设计网站页面,如何安装wordpress ftp,海网站建设minio分片上传前言分片上传的技术选择实现分片上传第一步自定义客户端初始化分片上传获取一个上传分片的url共前端使用确认每一个分片的上传合并分片文件前言
为什么要选择将一个大文件拆分成许多小文件来上传#xff1f;
对于许多服务器和应用框架来说#xff0c;单次HTTP…minio分片上传前言分片上传的技术选择实现分片上传第一步自定义客户端初始化分片上传获取一个上传分片的url共前端使用确认每一个分片的上传合并分片文件前言为什么要选择将一个大文件拆分成许多小文件来上传对于许多服务器和应用框架来说单次HTTP请求的大小有严格的限制。上传一个10GB的大文件可能需要数个小时单一的http请求连接不稳定造成上传失败。降低服务器的压力其次分片上传可以实现断点续传、秒上传等功能分片上传的技术选择分片上传分为两种一种是经过后端服务器来上传另外一种是前端直传。第一种是由后端接受分片文件流在调用minio的api上传第二种就是有后端生成预签名url返回给前端前端上传到minio。一般我们选择后者第二种利用客户端宽带减轻了服务器的压力实现分片上传流程图第一步自定义客户端packagecom.petlife.base.config;importcom.google.common.collect.Multimap;importio.minio.CreateMultipartUploadResponse;importio.minio.ListPartsResponse;importio.minio.MinioClient;importio.minio.ObjectWriteResponse;importio.minio.errors.*;importio.minio.messages.Part;importjava.io.IOException;importjava.security.InvalidKeyException;importjava.security.NoSuchAlgorithmException;publicclassBigFileClientextendsMinioClient{publicBigFileClient(MinioClientclient){super(client);}/** * 创建分片上传任务 - 在MinIO服务器上初始化一个分片上传会话 * 获取uploadId - 返回一个唯一的uploadId用于标识这次分片上传会话 * param bucket 存储桶名称 * param region 区域 (null 表示默认区域) * param object 对象名称 (文件路径) * param headers 额外的请求头 * param extraQueryParams 额外的查询参数 * return 上传ID */publicStringinitMultiPartUpload(Stringbucket,Stringregion,Stringobject,MultimapString,Stringheaders,MultimapString,StringextraQueryParams){try{CreateMultipartUploadResponseresponsethis.createMultipartUpload(bucket,region,object,headers,extraQueryParams);returnresponse.result().uploadId();}catch(IOException|InvalidKeyException|NoSuchAlgorithmException|InsufficientDataException|ServerException|InternalException|XmlParserException|InvalidResponseException|ErrorResponseExceptione){thrownewRuntimeException(初始化分片上传失败,e);}}/** * 合并分片上传 - 将之前初始化的分片上传任务合并为一个完整的对象 * param bucketName 存储桶名称 * param region 区域 (null 表示默认区域) * param objectName 对象名称 (文件路径) * param uploadId 上传ID (由initMultiPartUpload返回) * param parts 分片信息数组 * param extraHeaders 额外的请求头 * param extraQueryParams 额外的查询参数 * return 合并后的对象写入响应 */publicObjectWriteResponsemergeMultipartUpload(StringbucketName,Stringregion,StringobjectName,StringuploadId,Part[]parts,MultimapString,StringextraHeaders,MultimapString,StringextraQueryParams){try{returnthis.completeMultipartUpload(bucketName,region,objectName,uploadId,parts,extraHeaders,extraQueryParams);}catch(IOException|InvalidKeyException|NoSuchAlgorithmException|InsufficientDataException|ServerException|InternalException|XmlParserException|InvalidResponseException|ErrorResponseExceptione){thrownewRuntimeException(合并分片上传失败,e);}}/** * 列出分片上传 - 获取已上传的分片信息 * param bucketName 存储桶名称 * param region 区域 (null 表示默认区域) * param objectName 对象名称 (文件路径) * param maxParts 最大返回分片数量 (null 表示默认值) * param partNumberMarker 分片编号标记 (null 表示从第一个分片开始) * param uploadId 上传ID (由initMultiPartUpload返回) * param extraHeaders 额外的请求头 * param extraQueryParams 额外的查询参数 * return ListPartsResponse 包含的重要信息是ListPart 分片信息列表 可以通过part.partNumber()获取分片编号part.size()获取分片大小part.etag()获取分片的ETag * 可以通过listPartsResponse.parts()获取分片信息列表 */publicListPartsResponselistMultipart(StringbucketName,Stringregion,StringobjectName,IntegermaxParts,IntegerpartNumberMarker,StringuploadId,MultimapString,StringextraHeaders,MultimapString,StringextraQueryParams){try{returnthis.listParts(bucketName,region,objectName,maxParts,partNumberMarker,uploadId,extraHeaders,extraQueryParams);}catch(IOException|InvalidKeyException|NoSuchAlgorithmException|InsufficientDataException|ServerException|InternalException|XmlParserException|InvalidResponseException|ErrorResponseExceptione){thrownewRuntimeException(列出分片上传失败,e);}}/** * 简化版初始化分片上传 * param bucketName 存储桶名称 * param objectName 对象名称 * return 上传ID */publicStringinitMultiPartUpload(StringbucketName,StringobjectName){returninitMultiPartUpload(bucketName,null,objectName,null,null);}/** * 简化版合并分片上传 * param bucketName 存储桶名称 * param objectName 对象名称 * param uploadId 上传ID * param parts 分片数组 * return 合并结果 */publicObjectWriteResponsemergeMultipartUpload(StringbucketName,StringobjectName,StringuploadId,Part[]parts){returnmergeMultipartUpload(bucketName,null,objectName,uploadId,parts,null,null);}publicbooleancancelMultiPartUpload(StringbucketName,StringobjectName,StringuploadId){try{this.abortMultipartUpload(bucketName,null,objectName,uploadId,null,null);returntrue;}catch(Exceptione){returnfalse;}}}初始化分片上传对于分片上传任务的数据库设计当前的版本比较粗糙。主要是对于已上传文件和分片任务的记录。createtableifnotexistspet-life.upload_task(idintauto_incrementcomment主键idprimarykey,chunk_indexintnullcomment分片索引,chunk_etagintnullcomment分片标签,statusintdefault0notnullcomment上传状态1已经上传 0 未上传,srcvarchar(50)nullcomment上传路径,expired_timedatetimenullcomment过期时间)comment文件上传任务;createtableifnotexistspet-life.file_record(idbigintauto_incrementprimarykey,file_namevarchar(100)nullcomment文件名,file_hashvarchar(50)nullcomment文件唯一标识,srcvarchar(50)nullcomment文件在minio中的存储路径,file_sizeintnullcomment文件大小,is_mergedtinyintnullcomment是否合并 0 否 1 是,upload_idvarchar(30)nullcomment上传id,expired_timedatetimenullcomment过期时间)comment文件上传记录;获取一个上传分片的url共前端使用/** * 获取分一个分片的上传的url * param uploadId 上传ID * param chunkIndex 分片索引 * return 上传URL */privateStringgenerateUploadFileURL(StringuploadId,intchunkIndex){try{MapString,StringextraQueryParamsnewHashMap();extraQueryParams.put(uploadId,uploadId);extraQueryParams.put(partNumber,String.valueOf(chunkIndex1));returnminioClient.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder().method(Method.PUT).bucket(bucketName).object(uploadId/chunkIndex).extraQueryParams(extraQueryParams).expiry(1,TimeUnit.DAYS).build());}catch(Exceptione){thrownewRuntimeException(生成上传URL失败e.getMessage());}}初始化分上传任务逻辑要考虑该文件是否已经上传过了和断点续传的情况。根据文件唯一标识通过数据库查询如果已经上传过了则直接返回访问路径如果是断电续传的情况则返回完成上传的分片如果从未上传过则直接创建一个新的分片上传任务。publicResponseDTOUploadDTOupload(FileFormfileForm){// 1. 校验文件是否已上传StringfileHashfileForm.getFileHash();UploadDTOresultnewUploadDTO();FileRecordfileRecordfileRecordService.getBaseMapper().selectOne(newLambdaQueryWrapperFileRecord().eq(FileRecord::getFileHash,fileHash).ge(FileRecord::getExpiredTime,LocalDateTime.now()));//1.1 文件记录不存在开启新的分片上传任务if(fileRecordnull){// 1.1.1 文件记录不存在开启新的分片上传任务MapString,Objectresult1initChunkUpload02(fileForm.getFileName(),fileForm.getChunkCount());StringuploadId(String)result1.get(uploadId);ListUploadDTO.chunkInfochunkList(ListUploadDTO.chunkInfo)result1.get(chunkList);// 1.1.2 创建上传任务记录synchronized(fileHash){CompletableFuture.runAsync(()-{for(UploadDTO.chunkInfo chunkInfo:chunkList){UploadTaskuploadTasknewUploadTask().setChunkIndex(chunkInfo.getChunkIndex()).setSrc(chunkInfo.getSrc()).setUploadId(uploadId).setStatus(0).setExpiredTime(LocalDateTime.now().plusDays(1));uploadTaskService.getBaseMapper().insert(uploadTask);}},threadPoolTaskExecutor);// 1.1.3 保存文件记录fileRecordnewFileRecord().setFileHash(fileHash).setFileName(fileForm.getFileName()).setFileSize(fileForm.getFileSize()).setSrc(null).setIsMerged(0).setUploadId(uploadId).setExpiredTime(LocalDateTime.now().plusDays(1));fileRecordService.getBaseMapper().insert(fileRecord);}//1.1.3 保存分片上传人物result.setUploadId(uploadId).setExist(false).setChunkList(chunkList).setObjectName(fileRecord.getFileName());returnResponseDTO.success(result);}if(fileRecord.getIsMerged()1){// 1.1文件已上传返回已上传的URLresult.setExist(true);result.setSrc(fileRecord.getSrc());returnResponseDTO.success(result);}//1.2 文件未上传 判断是否是断点上传ListUploadTaskuploadTasksuploadTaskService.getBaseMapper().selectList(newLambdaQueryWrapperUploadTask().eq(UploadTask::getUploadId,fileRecord.getUploadId()).eq(UploadTask::getStatus,0));if(!uploadTasks.isEmpty()){ListUploadDTO.chunkInfochunkListnewArrayList();for(UploadTaskuploadTask:uploadTasks){UploadDTO.chunkInfo chunkInfonewUploadDTO.chunkInfo();chunkInfo.setChunkIndex(uploadTask.getChunkIndex());chunkInfo.setSrc(uploadTask.getSrc());chunkList.add(chunkInfo);}result.setExist(true);result.setChunkList(chunkList);returnResponseDTO.success(result);}// 1.2.2 不是断点上传返回空列表result.setExist(false);returnResponseDTO.fail(服务器数据库异常);}确认每一个分片的上传每一个分片完成上传前端向后端发送一个请求确认分片上传的状态。publicResponseDTOBooleanconfirmChunkUpload(ConfirmChunkFormconfirmChunkForm){UploadTaskuploadTasknewUploadTask();uploadTask.setUploadId(confirmChunkForm.getUploadId());uploadTask.setChunkEtag(confirmChunkForm.getChunkEtag());uploadTask.setChunkIndex(confirmChunkForm.getChunkIndex());if(confirmChunkForm.getSuccess()){uploadTask.setStatus(1);}else{uploadTask.setStatus(2);}intupdateuploadTaskService.getBaseMapper().update(uploadTask,newLambdaUpdateWrapperUploadTask().eq(UploadTask::getUploadId,confirmChunkForm.getUploadId()).eq(UploadTask::getChunkIndex,confirmChunkForm.getChunkIndex()));returnResponseDTO.success(update0);}合并分片文件可以加上对分片任务的校验和数据库的更改这里就不加了。publicResponseDTOStringmergeChunk(StringuploadID,StringobjectName){try{Part[]partsbigFileClient.listMultipart(bucketName,null,objectName,null,null,uploadID,null,null).result().partList().toArray(newPart[0]);ObjectWriteResponseobjectWriteResponsebigFileClient.mergeMultipartUpload(bucketName,objectName,uploadID,parts);// 构建访问URLStringfinalObjectNameobjectWriteResponse.object();Stringsrcendpoint/bucketName/finalObjectName;returnResponseDTO.success(src);}catch(Exceptione){thrownewRuntimeException(合并分片失败e.getMessage());}}