Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AmazonS3-33f2ede.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "Amazon S3",
"contributor": "",
"description": "Fixed the issue where S3 multipart client failed to download zero-byte file, causing `Content range header is missing` exception to throw."
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,21 @@ public void onResponse(T response) {
if (!contentRangeOpt.isPresent()) {
contentRangeOpt = response.sdkHttpResponse().firstMatchingHeader("content-range");
if (!contentRangeOpt.isPresent()) {
// Bad state! This is intended to cancel everything
if (subscriber != null) {
Optional<String> contentLength = response.sdkHttpResponse().firstMatchingHeader("content-length");
long transformerCount = FileAsyncResponseTransformerPublisher.this.transformerCount.get();
// Error out if content range header is missing and this is not the initial request
if (subscriber != null && transformerCount > 0) {
subscriber.onError(new IllegalStateException("Content range header is missing"));
return;
}
return;

if (!contentLength.isPresent()) {
subscriber.onError(new IllegalStateException("Content length header is missing"));
return;
}
String totalLength = contentLength.get();
long endByte = Long.parseLong(totalLength) - 1;
contentRangeOpt = Optional.of("bytes 0-" + endByte + "/" + totalLength);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.utils.CompletableFutureUtils;
import software.amazon.awssdk.utils.ContentRangeParser;

class FileAsyncResponseTransformerPublisherTest {

Expand Down Expand Up @@ -239,11 +240,17 @@ public void onComplete() {
}

private SdkResponse createMockResponseWithRange(String contentRange) {
return createMockResponseWithRange(contentRange,
ContentRangeParser.totalBytes(contentRange).getAsLong());
}

private SdkResponse createMockResponseWithRange(String contentRange, Long contentLength) {
SdkResponse mockResponse = mock(SdkResponse.class);
SdkHttpResponse mockHttpResponse = mock(SdkHttpResponse.class);

when(mockResponse.sdkHttpResponse()).thenReturn(mockHttpResponse);
when(mockHttpResponse.firstMatchingHeader("x-amz-content-range")).thenReturn(Optional.of(contentRange));
when(mockHttpResponse.firstMatchingHeader("content-length")).thenReturn(Optional.ofNullable(String.valueOf(contentLength)));
when(mockHttpResponse.firstMatchingHeader("x-amz-content-range")).thenReturn(Optional.ofNullable(contentRange));

return mockResponse;
}
Expand Down Expand Up @@ -298,7 +305,101 @@ void createOrAppendToExisting_shouldThrowException() {
assertThatThrownBy(() -> new FileAsyncResponseTransformerPublisher<>((FileAsyncResponseTransformer<?>) initialTransformer))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("CREATE_OR_APPEND_TO_EXISTING");
}

@Test
void singleDemand_contentRangeMissing_shouldSucceed() throws Exception {
AsyncResponseTransformer<SdkResponse, SdkResponse> initialTransformer = AsyncResponseTransformer.toFile(testFile);
FileAsyncResponseTransformerPublisher<SdkResponse> publisher =
new FileAsyncResponseTransformerPublisher<>((FileAsyncResponseTransformer<SdkResponse>) initialTransformer);

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<AsyncResponseTransformer<SdkResponse, SdkResponse>> receivedTransformer = new AtomicReference<>();
CompletableFuture<SdkResponse> future = new CompletableFuture<>();

publisher.subscribe(new Subscriber<AsyncResponseTransformer<SdkResponse, SdkResponse>>() {
private Subscription subscription;

@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
s.request(1);
}

@Override
public void onNext(AsyncResponseTransformer<SdkResponse, SdkResponse> transformer) {
receivedTransformer.set(transformer);

// Simulate response with content-range header
SdkResponse mockResponse = createMockResponseWithRange(null, 0L);
CompletableFuture<SdkResponse> prepareFuture = transformer.prepare();
CompletableFutureUtils.forwardResultTo(prepareFuture, future);
transformer.onResponse(mockResponse);

// Simulate stream data
SdkPublisher<ByteBuffer> mockPublisher = createMockPublisher();
transformer.onStream(mockPublisher);

latch.countDown();
}

@Override
public void onError(Throwable t) {
fail("Unexpected error with exception: " + t.getMessage());
}

@Override
public void onComplete() {
latch.countDown();
}
});

assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(receivedTransformer.get()).isNotNull();
assertThat(Files.exists(testFile)).isTrue();
assertThat(future).succeedsWithin(10, TimeUnit.SECONDS);
}

@Test
void multipleTransformers_contentRangeMissingOnSecondRequest_shouldFail() throws Exception {
AsyncResponseTransformer<SdkResponse, SdkResponse> initialTransformer = AsyncResponseTransformer.toFile(testFile);
FileAsyncResponseTransformerPublisher<SdkResponse> publisher =
new FileAsyncResponseTransformerPublisher<>((FileAsyncResponseTransformer<SdkResponse>) initialTransformer);

CountDownLatch latch = new CountDownLatch(1);
CompletableFuture<SdkResponse> future = new CompletableFuture<>();
AtomicReference<Throwable> exception = new AtomicReference<>();

publisher.subscribe(new Subscriber<AsyncResponseTransformer<SdkResponse, SdkResponse>>() {

@Override
public void onSubscribe(Subscription s) {
s.request(2);
}

@Override
public void onNext(AsyncResponseTransformer<SdkResponse, SdkResponse> transformer) {
SdkResponse mockResponse = createMockResponseWithRange(null, 0L);

CompletableFuture<SdkResponse> prepareFuture = transformer.prepare();
CompletableFutureUtils.forwardResultTo(prepareFuture, future);
transformer.onResponse(mockResponse);
}

@Override
public void onError(Throwable t) {
exception.set(t);
latch.countDown();
}

@Override
public void onComplete() {
fail("Unexpected onComplete");
}
});

assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(exception.get()).hasMessageContaining("Content range header is missing");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class S3MultipartClientFileDownloadIntegrationTest extends S3IntegrationT
private static final int MIB = 1024 * 1024;
private static final String TEST_BUCKET = temporaryBucketName(S3MultipartClientFileDownloadIntegrationTest.class);
private static final String TEST_KEY = "testfile.dat";
private static final String ZERO_BYTE_KEY = "zero.dat";
private static final int OBJ_SIZE = 100 * MIB;
private static final long PART_SIZE = 5 * MIB;

Expand Down Expand Up @@ -108,6 +109,22 @@ void download_defaultCreateNewFile_shouldSucceed() throws Exception {
path.toFile().delete();
}

@Test
void download_emptyFile_shouldSucceed() throws Exception {
Path path = tmpPath().resolve(UUID.randomUUID().toString());
s3Client.putObject(b -> b.bucket(TEST_BUCKET).key(ZERO_BYTE_KEY), AsyncRequestBody.empty()).join();
CompletableFuture<GetObjectResponse> future = s3Client.getObject(
req -> req.bucket(TEST_BUCKET).key(ZERO_BYTE_KEY),
AsyncResponseTransformer.toFile(path, FileTransformerConfiguration.defaultCreateNew()));
future.join();
MessageDigest md = MessageDigest.getInstance("SHA-256");
byte[] downloadedHash = md.digest(Files.readAllBytes(path));
md.reset();
byte[] originalHash = md.digest(new byte[0]);
assertThat(downloadedHash).isEqualTo(originalHash);
}


private Path tmpPath() {
return Paths.get(JavaSystemSetting.TEMP_DIRECTORY.getStringValueOrThrow());
}
Expand Down
Loading