Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Robust handling and management of S3 streams for MSQ shuffle storage #13741

Merged
merged 4 commits into from
Feb 7, 2023

Conversation

rohangarg
Copy link
Member

Many MSQ jobs start failing with Premature EOF / SDKException errors when faultTolerance and durableShuffleStorage context params are enabled.
After adding some logging, it was found that these exceptions are preceded by ConnectionReset exceptions which might happen from S3 server side incase we open an S3 request for very long time. Trying out a change where we first download the file on s3 to local disk and then read fixed the issue for the jobs - but the caveat was that multiple threads downloading the files concurrently might put pressure on disks.
So, the current change downloads the file chunk-by-chunk locally at runtime and then stitch the chunks together under a SequenceInputStream for upstream MSQ processing engine. Each chunk is currently of size 100MB and is downloaded eagerly to a local file. When that chunk is consumed, the local file is deleted and the next chunk is queue is processed.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Copy link
Contributor

@imply-cheddar imply-cheddar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few suggestions for fixes, it's probably "okay" to merge as-is, but I think we should be a bit more allergic to retries (used willy-nilly, they often just cause more pain than they resolve).

I'm going to approve because the current code and usage of retries isn't really a regression from the old code, but rather all of the suggestions are to improve the general state of the code overall.


public S3StorageConnector(S3OutputConfig config, ServerSideEncryptingAmazonS3 serverSideEncryptingAmazonS3)
{
this.config = config;
this.s3Client = serverSideEncryptingAmazonS3;
if (config.getTempDir() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If getTempDir is null, this code is still gonna fail, but a lot later on. You can test and validate here instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks - added more validation for the config vars

config.getMaxRetry()
);
FileUtils.copyLarge(
() -> new RetryingInputStream<>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need to do this inside of a retrying input stream? We've had a rash of issues where code was trying to retry a thing, and then a layer above it was trying to retry a thing and then a layer above that leading to multiplicative growth of retries and really long delays in processing. Given that the whole fault-tolerance stuff exists and will retry failed tasks anyway, perhaps this isn't the right layer to add yet another set of retries? Taht or like, limit it to only 2 retries at a max if we really do want to have a retry at this layer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can have the retry since all other parts of the code also do the same (probably derived from production systems previously). We could re-consider our retrying in future but that can be done separately.
Also, agree on using a fixed small number of retries since SDK also does some retries. Changed the maximum tries to 3 which means 2 retries.

);
}
catch (IOException e) {
throw new UncheckedIOException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's an UncheckedIOException other than just a RuntimeException?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they are practically the same - did UncheckedIOException since it seemed natural. Changed to custom RE with more info

Comment on lines +193 to +196
// close should be idempotent
if (isClosed.get()) {
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm very much scared of places where the contract is that close can be called multiple times. It's indicative of a lack of ability and understanding of when the lifecycle of an object is truly over and often highlights other problems.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree too that if close is being called multiple times, it could indicate some loose contract in the execution layer. Will check that independently - trying to push this change through since it unblocks the usage of durable storage feature in MSQ.

Comment on lines 105 to 117
public void pathRead() throws IOException
{
EasyMock.reset(S3_CLIENT);
ObjectMetadata objectMetadata = new ObjectMetadata();
long contentLength = "test".getBytes(StandardCharsets.UTF_8).length;
objectMetadata.setContentLength(contentLength);
S3Object s3Object = new S3Object();
s3Object.setObjectContent(new ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8)));
EasyMock.expect(S3_CLIENT.getObject(new GetObjectRequest(BUCKET, PREFIX + "/" + TEST_FILE))).andReturn(s3Object);
EasyMock.expect(S3_CLIENT.getObjectMetadata(EasyMock.anyObject())).andReturn(objectMetadata);
EasyMock.expect(S3_CLIENT.getObject(
new GetObjectRequest(BUCKET, PREFIX + "/" + TEST_FILE).withRange(0, contentLength - 1))
).andReturn(s3Object);
EasyMock.replay(S3_CLIENT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't have any testing for the retry behavior. If retries exist with the configs, please test all of the different ways that retries can happen and whether we expect multiplicativity or not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The configs from retries are removed and they are hard coded now. I think for S3 in future, we can create a custom S3RetryInputStream which would be easier to test for retries and can be used in all s3 read paths. I'm not sure if we can test the multiplicative retries since we mock the AWSClient currently.

@rohangarg rohangarg added the Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 label Feb 3, 2023
@rohangarg rohangarg merged commit a0f8889 into apache:master Feb 7, 2023
@clintropolis clintropolis added this to the 26.0 milestone Apr 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants