Skip to content

Commit

Permalink
Robust handling and management of S3 streams for MSQ shuffle storage
Browse files Browse the repository at this point in the history
  • Loading branch information
rohangarg committed Feb 2, 2023
1 parent 440212c commit ce61691
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,32 @@
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.base.Joiner;
import com.google.common.base.Predicates;
import org.apache.druid.data.input.impl.RetryingInputStream;
import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;

import javax.annotation.Nonnull;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.SequenceInputStream;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

public class S3StorageConnector implements StorageConnector
Expand All @@ -48,11 +61,20 @@ public class S3StorageConnector implements StorageConnector

private static final String DELIM = "/";
private static final Joiner JOINER = Joiner.on(DELIM).skipNulls();
private static final long DOWNLOAD_MAX_CHUNK_SIZE = 100_000_000;

public S3StorageConnector(S3OutputConfig config, ServerSideEncryptingAmazonS3 serverSideEncryptingAmazonS3)
{
this.config = config;
this.s3Client = serverSideEncryptingAmazonS3;
if (config.getTempDir() != null) {
try {
FileUtils.mkdirp(config.getTempDir());
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

@Override
Expand All @@ -62,13 +84,13 @@ public boolean pathExists(String path)
}

@Override
public InputStream read(String path) throws IOException
public InputStream read(String path)
{
return buildInputStream(new GetObjectRequest(config.getBucket(), objectPath(path)));
return buildInputStream(new GetObjectRequest(config.getBucket(), objectPath(path)), path);
}

@Override
public InputStream readRange(String path, long from, long size) throws IOException
public InputStream readRange(String path, long from, long size)
{
if (from < 0 || size < 0) {
throw new IAE(
Expand All @@ -78,35 +100,117 @@ public InputStream readRange(String path, long from, long size) throws IOExcepti
size
);
}
return buildInputStream(new GetObjectRequest(config.getBucket(), objectPath(path)).withRange(from, from + size - 1));
return buildInputStream(
new GetObjectRequest(config.getBucket(), objectPath(path)).withRange(from, from + size - 1),
path
);
}

private RetryingInputStream buildInputStream(GetObjectRequest getObjectRequest) throws IOException
private InputStream buildInputStream(GetObjectRequest getObjectRequest, String path)
{
return new RetryingInputStream<>(
getObjectRequest,
new ObjectOpenFunction<GetObjectRequest>()
{
@Override
public InputStream open(GetObjectRequest object)
{
return s3Client.getObject(object).getObjectContent();
}
// fetch the size of the whole object to make chunks
long readEnd;
AtomicLong currReadStart = new AtomicLong(0);
if (getObjectRequest.getRange() != null) {
currReadStart.set(getObjectRequest.getRange()[0]);
readEnd = getObjectRequest.getRange()[1] + 1;
} else {
readEnd = this.s3Client.getObjectMetadata(config.getBucket(), objectPath(path)).getInstanceLength();
}

@Override
public InputStream open(GetObjectRequest object, long offset)
{
final GetObjectRequest offsetObjectRequest = new GetObjectRequest(
object.getBucketName(),
object.getKey()
// build a sequence input stream from chunks
return new SequenceInputStream(new Enumeration<InputStream>()
{
@Override
public boolean hasMoreElements()
{
// don't stop until the whole object is downloaded
return currReadStart.get() < readEnd;
}

@Override
public InputStream nextElement()
{
File outFile = new File(config.getTempDir().getAbsolutePath(), UUID.randomUUID().toString());
// in a single chunk, only download a maximum of DOWNLOAD_MAX_CHUNK_SIZE
long endPoint = Math.min(currReadStart.get() + DOWNLOAD_MAX_CHUNK_SIZE, readEnd) - 1;
try {
if (!outFile.createNewFile()) {
throw new IOE(
StringUtils.format(
"Could not create temporary file [%s] for copying [%s]",
outFile.getAbsolutePath(),
objectPath(path)
)
);
offsetObjectRequest.setRange(offset);
return open(offsetObjectRequest);
}
},
S3Utils.S3RETRY,
config.getMaxRetry()
);
FileUtils.copyLarge(
() -> new RetryingInputStream<>(
new GetObjectRequest(
config.getBucket(),
objectPath(path)
).withRange(currReadStart.get(), endPoint),
new ObjectOpenFunction<GetObjectRequest>()
{
@Override
public InputStream open(GetObjectRequest object)
{
return s3Client.getObject(object).getObjectContent();
}

@Override
public InputStream open(GetObjectRequest object, long offset)
{
if (object.getRange() != null) {
long[] oldRange = object.getRange();
object.setRange(oldRange[0] + offset, oldRange[1]);
} else {
object.setRange(offset);
}
return open(object);
}
},
S3Utils.S3RETRY,
config.getMaxRetry()
),
outFile,
new byte[8 * 1024],
Predicates.alwaysFalse(),
1,
StringUtils.format("Retrying copying of [%s] to [%s]", objectPath(path), outFile.getAbsolutePath())
);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
try {
AtomicBoolean isClosed = new AtomicBoolean(false);
return new FileInputStream(outFile)
{
@Override
public void close() throws IOException
{
// close should be idempotent
if (isClosed.get()) {
return;
}
isClosed.set(true);
try {
super.close();
}
finally {
// since endPoint is inclusive in s3's get request API, the next currReadStart is endpoint + 1
currReadStart.set(endPoint + 1);
outFile.delete();
}
}
};
}
catch (FileNotFoundException e) {
throw new UncheckedIOException(e);
}
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -104,9 +105,15 @@ public void pathExists() throws IOException
public void pathRead() throws IOException
{
EasyMock.reset(S3_CLIENT);
ObjectMetadata objectMetadata = new ObjectMetadata();
long contentLength = "test".getBytes(StandardCharsets.UTF_8).length;
objectMetadata.setContentLength(contentLength);
S3Object s3Object = new S3Object();
s3Object.setObjectContent(new ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8)));
EasyMock.expect(S3_CLIENT.getObject(new GetObjectRequest(BUCKET, PREFIX + "/" + TEST_FILE))).andReturn(s3Object);
EasyMock.expect(S3_CLIENT.getObjectMetadata(EasyMock.anyObject())).andReturn(objectMetadata);
EasyMock.expect(S3_CLIENT.getObject(
new GetObjectRequest(BUCKET, PREFIX + "/" + TEST_FILE).withRange(0, contentLength - 1))
).andReturn(s3Object);
EasyMock.replay(S3_CLIENT);

String readText = new BufferedReader(
Expand Down Expand Up @@ -141,8 +148,8 @@ public void testReadRange() throws IOException

InputStream is = storageConnector.readRange(TEST_FILE, start, length);
byte[] dataBytes = new byte[length];
Assert.assertEquals(is.read(dataBytes), length);
Assert.assertEquals(is.read(), -1); // reading further produces no data
Assert.assertEquals(length, is.read(dataBytes));
Assert.assertEquals(-1, is.read()); // reading further produces no data
Assert.assertEquals(dataQueried, new String(dataBytes, StandardCharsets.UTF_8));
EasyMock.reset(S3_CLIENT);
}
Expand Down

0 comments on commit ce61691

Please sign in to comment.