使用 AWS SDK for Java 2.x 将流上传到 Amazon S3
当您使用 putObject 或 uploadPart 通过流向 S3 上传内容时,您需要使用同步 API 的 RequestBody 工厂类来提供流。对于异步 API,等效的工厂类是 AsyncRequestBody。
哪些方法可以上传流?
对于同步 API,您可以使用 RequestBody 的以下工厂方法来提供流:
fromInputStream(InputStream inputStream, long contentLength)fromContentProvider(ContentStreamProvider provider, long contentLength, String mimeType)fromContentProvider(ContentStreamProvider provider, String mimeType)
对于异步 API,您可以使用 AsyncRequestBody 的以下工厂方法:
fromInputStream(InputStream inputStream, Long contentLength, ExecutorService executor)fromInputStream(AsyncRequestBodyFromInputStreamConfiguration configuration)fromInputStream(Consumer<AsyncRequestBodyFromInputStreamConfiguration.Builder> configuration)forBlockingInputStream(Long contentLength)
执行上传
如果你知道流的长度
从之前显示的方法签名中可以看出,大多数方法都接受一个 content length 参数。如果您知道内容的字节长度,请提供确切的值:
// 始终在可用时提供确切的内容长度。
long contentLength = 1024; // 确切的字节大小。
s3Client.putObject(req -> req
.bucket("my-bucket")
.key("my-key"),
RequestBody.fromInputStream(inputStream, contentLength));警告
当您从输入流上传时,如果指定的内容长度与实际字节数不匹配,您可能会遇到:
- 如果指定的长度太小,对象会被截断
- 如果指定的长度太大,上传会失败或连接挂起
如果你不知道流的长度
使用同步 API
使用 fromContentProvider(ContentStreamProvider provider, String mimeType):
public PutObjectResponse syncClient_stream_unknown_size(String bucketName, String key, InputStream inputStream) {
S3Client s3Client = S3Client.create();
RequestBody body = RequestBody.fromContentProvider(ContentStreamProvider.fromInputStream(inputStream), "text/plain");
PutObjectResponse putObjectResponse = s3Client.putObject(b -> b.bucket(BUCKET_NAME).key(KEY_NAME), body);
return putObjectResponse;
}由于 SDK 会在内存中缓冲整个流以计算内容长度,因此处理大流时可能会遇到内存问题。如果您需要使用同步客户端上传大流,请考虑使用分段 API。
使用异步 API
您可以为 fromInputStream(InputStream inputStream, Long contentLength, ExecutorService executor) 的 contentLength 参数提供 null。
注意
对于大多数用例,我们建议对未知大小的流使用异步客户端 API。这种方法可以实现并行传输,并提供更简单的编程接口,因为如果流很大,SDK 会处理流分段为多个部分。
示例:使用基于 AWS CRT 的异步客户端
public PutObjectResponse crtClient_stream_unknown_size(String bucketName, String key, InputStream inputStream) {
S3AsyncClient s3AsyncClient = S3AsyncClient.crtCreate();
ExecutorService executor = Executors.newSingleThreadExecutor();
AsyncRequestBody body = AsyncRequestBody.fromInputStream(inputStream, null, executor); // 'null' 表示内容长度未知。
CompletableFuture<PutObjectResponse> responseFuture =
s3AsyncClient.putObject(r -> r.bucket(bucketName).key(key), body)
.exceptionally(e -> {
if (e != null){
logger.error(e.getMessage(), e);
}
return null;
});
PutObjectResponse response = responseFuture.join(); // 等待响应。
executor.shutdown();
return response;
}示例:使用启用了分段上传的标准异步客户端
public PutObjectResponse asyncClient_multipart_stream_unknown_size(String bucketName, String key, InputStream inputStream) {
S3AsyncClient s3AsyncClient = S3AsyncClient.builder().multipartEnabled(true).build();
ExecutorService executor = Executors.newSingleThreadExecutor();
AsyncRequestBody body = AsyncRequestBody.fromInputStream(inputStream, null, executor); // 'null' 表示内容长度未知。
CompletableFuture<PutObjectResponse> responseFuture =
s3AsyncClient.putObject(r -> r.bucket(bucketName).key(key), body)
.exceptionally(e -> {
if (e != null) {
logger.error(e.getMessage(), e);
}
return null;
});
PutObjectResponse response = responseFuture.join(); // 等待响应。
executor.shutdown();
return response;
}