使用 AWS SDK for Java 2.x 将流上传到 Amazon S3

当您使用 putObjectuploadPart 通过流向 S3 上传内容时,您需要使用同步 API 的 RequestBody 工厂类来提供流。对于异步 API,等效的工厂类是 AsyncRequestBody

哪些方法可以上传流?

对于同步 API,您可以使用 RequestBody 的以下工厂方法来提供流:

  • fromInputStream(InputStream inputStream, long contentLength)
  • fromContentProvider(ContentStreamProvider provider, long contentLength, String mimeType)
  • fromContentProvider(ContentStreamProvider provider, String mimeType)

对于异步 API,您可以使用 AsyncRequestBody 的以下工厂方法:

  • fromInputStream(InputStream inputStream, Long contentLength, ExecutorService executor)
  • fromInputStream(AsyncRequestBodyFromInputStreamConfiguration configuration)
  • fromInputStream(Consumer<AsyncRequestBodyFromInputStreamConfiguration.Builder> configuration)
  • forBlockingInputStream(Long contentLength)

执行上传

如果你知道流的长度

从之前显示的方法签名中可以看出,大多数方法都接受一个 content length 参数。如果您知道内容的字节长度,请提供确切的值:

// 始终在可用时提供确切的内容长度。
long contentLength = 1024; // 确切的字节大小。
s3Client.putObject(req -> req
    .bucket("my-bucket")
    .key("my-key"),
RequestBody.fromInputStream(inputStream, contentLength));

如果你不知道流的长度

使用同步 API

使用 fromContentProvider(ContentStreamProvider provider, String mimeType)

public PutObjectResponse syncClient_stream_unknown_size(String bucketName, String key, InputStream inputStream) {
    S3Client s3Client = S3Client.create();
    RequestBody body = RequestBody.fromContentProvider(ContentStreamProvider.fromInputStream(inputStream), "text/plain");
    PutObjectResponse putObjectResponse = s3Client.putObject(b -> b.bucket(BUCKET_NAME).key(KEY_NAME), body);
    return putObjectResponse;
}

由于 SDK 会在内存中缓冲整个流以计算内容长度,因此处理大流时可能会遇到内存问题。如果您需要使用同步客户端上传大流,请考虑使用分段 API。

使用异步 API

您可以为 fromInputStream(InputStream inputStream, Long contentLength, ExecutorService executor)contentLength 参数提供 null

示例:使用基于 AWS CRT 的异步客户端

public PutObjectResponse crtClient_stream_unknown_size(String bucketName, String key, InputStream inputStream) {

    S3AsyncClient s3AsyncClient = S3AsyncClient.crtCreate();
    ExecutorService executor = Executors.newSingleThreadExecutor();
    AsyncRequestBody body = AsyncRequestBody.fromInputStream(inputStream, null, executor);  // 'null' 表示内容长度未知。
    
    CompletableFuture<PutObjectResponse> responseFuture =
            s3AsyncClient.putObject(r -> r.bucket(bucketName).key(key), body)
                    .exceptionally(e -> {
                        if (e != null){
                            logger.error(e.getMessage(), e);
                        }
                        return null;
                    });

    PutObjectResponse response = responseFuture.join(); // 等待响应。
    executor.shutdown();
    return response;
}

示例:使用启用了分段上传的标准异步客户端

public PutObjectResponse asyncClient_multipart_stream_unknown_size(String bucketName, String key, InputStream inputStream) {

    S3AsyncClient s3AsyncClient = S3AsyncClient.builder().multipartEnabled(true).build();
    ExecutorService executor = Executors.newSingleThreadExecutor();
    AsyncRequestBody body = AsyncRequestBody.fromInputStream(inputStream, null, executor); // 'null' 表示内容长度未知。

    CompletableFuture<PutObjectResponse> responseFuture =
            s3AsyncClient.putObject(r -> r.bucket(bucketName).key(key), body)
                    .exceptionally(e -> {
                        if (e != null) {
                            logger.error(e.getMessage(), e);
                        }
                        return null;
                    });

    PutObjectResponse response = responseFuture.join(); // 等待响应。
    executor.shutdown();
    return response;
}