Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework client content listeners #8981

Merged
merged 5 commits into from
Jan 4, 2023

Conversation

lorban
Copy link
Contributor

@lorban lorban commented Nov 30, 2022

Proposal: get rid of DemandedContentListener and change AsyncContentListener API to adopt the SimpleContentListener one.

This depends on #8993

@lorban lorban requested a review from sbordet November 30, 2022 16:01
@lorban lorban self-assigned this Nov 30, 2022
@lorban lorban force-pushed the jetty-12.0.x-client-rework-content-listeners branch 5 times, most recently from 3120423 to 646afcf Compare December 7, 2022 08:15
@joakime joakime added this to the 12.0.x milestone Dec 8, 2022
@lorban lorban force-pushed the jetty-12.0.x-client-rework-content-listeners branch from 646afcf to b8a25b3 Compare December 12, 2022 14:46
@lorban lorban marked this pull request as ready for review December 12, 2022 16:46
@lorban lorban force-pushed the jetty-12.0.x-client-rework-content-listeners branch from 8e766e3 to 19bf6ee Compare December 13, 2022 09:48
@lorban lorban requested a review from sbordet December 13, 2022 09:49
Applications implement this method to process the content bytes in the `buffer`.
Succeeding the `callback` signals to the implementation that the application has consumed the `buffer` so that the implementation can dispose/recycle the `buffer`.
Failing the `callback` signals to the implementation to fail the response (no more content will be delivered, and the _response failed_ event will be emitted).
You must provide a `org.eclipse.jetty.io.Content.Source` read/demand implementation that reads a `Content.Chunk` from the provided `Content.Source`; if the read chunk is `null`, `Content.Source.demand(Runnable)` should be called so that the demand callback is called back when more chunks are available; if the chunk is an instance of `Content.Chunk.Error` then the error should be processed; otherwise the chunk should be processed by consuming and then releasing the chunk's `ByteBuffer`; and finally either trying to read another chunk, or demanding for another chunk (unless the current chunk is the last).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
You must provide a `org.eclipse.jetty.io.Content.Source` read/demand implementation that reads a `Content.Chunk` from the provided `Content.Source`; if the read chunk is `null`, `Content.Source.demand(Runnable)` should be called so that the demand callback is called back when more chunks are available; if the chunk is an instance of `Content.Chunk.Error` then the error should be processed; otherwise the chunk should be processed by consuming and then releasing the chunk's `ByteBuffer`; and finally either trying to read another chunk, or demanding for another chunk (unless the current chunk is the last).
You must provide a `ContentSourceListener` whose implementation reads a `Content.Chunk` from the provided `Content.Source`; if the read chunk is `null`, `Content.Source.demand(Runnable)` should be called so that the demand callback is called back when more chunks are available; if the chunk is an instance of `Content.Chunk.Error` then the error should be processed; otherwise the chunk should be processed by consuming and then releasing the chunk; and finally either trying to read another chunk, or demanding for another chunk (unless the current chunk is the last).

IMPORTANT: Succeeding the `callback` must be done only after the `buffer` bytes have been consumed.
When the `callback` is succeeded, the `HttpClient` implementation may reuse the `buffer` and overwrite the bytes with different bytes; if the application looks at the `buffer` _after_ having succeeded the `callback` is may see other, unrelated, bytes.
IMPORTANT: Calling `Content.Chunk.release()` must be done only after the bytes in the `ByteBuffer` returned by `Content.Chunk.getByteBuffer()` have been consumed.
When the `Content.Chunk` is released, the `HttpClient` implementation may reuse the `buffer` and overwrite the bytes with different bytes; if the application looks at the `buffer` _after_ having released the `Content.Chunk` is may see other, unrelated, bytes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
When the `Content.Chunk` is released, the `HttpClient` implementation may reuse the `buffer` and overwrite the bytes with different bytes; if the application looks at the `buffer` _after_ having released the `Content.Chunk` is may see other, unrelated, bytes.
When the `Content.Chunk` is released, the `HttpClient` implementation may reuse the `ByteBuffer` and overwrite the bytes with different bytes; if the application looks at the `ByteBuffer` _after_ having released the `Content.Chunk` is may see other, unrelated, bytes.


Demanding for content and consuming the content are orthogonal activities.

An application can demand "infinitely" and store aside the pairs `(buffer, callback)` to consume them later.
An application can read, store aside the `Content.Chunk` objects without releasing them (to consume them later), and demand for more chunks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
An application can read, store aside the `Content.Chunk` objects without releasing them (to consume them later), and demand for more chunks.
An application can read, store aside the `Content.Chunk` objects without releasing them (to consume them later), and demand for more chunks, but it must call `Chunk.retain()` on the stored chunks, and arrange to release them after they have been consumed later.


An application can also demand one chunk of content, consume it (by succeeding the associated `callback`) and then _not_ demand for more content until a later time.
An application can also read one chunk of content, consume it (by releasing it) and then _not_ demand for more content until a later time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
An application can also read one chunk of content, consume it (by releasing it) and then _not_ demand for more content until a later time.
An application can also read one chunk of content, consume it, release it, and then _not_ demand for more content until a later time.


Subclass `Response.AsyncContentListener` overrides the behavior of `Response.DemandedContentListener`; when an application implementing its `onContent(response, buffer, callback)` succeeds the `callback`, it will have _both_ the effect of disposing/recycling the `buffer` _and_ the effect of demanding one more chunk of content.
Subclass `Response.AsyncContentListener` overrides the behavior of `Response.ContentSourceListener`; when an application implements `onContent(response, chunk, demander)`, it can control the disposing/recycling the `buffer` by releasing the `chunk` _and_ it can control when to demand one more chunk of content by calling `demander.run()`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Subclass `Response.AsyncContentListener` overrides the behavior of `Response.ContentSourceListener`; when an application implements `onContent(response, chunk, demander)`, it can control the disposing/recycling the `buffer` by releasing the `chunk` _and_ it can control when to demand one more chunk of content by calling `demander.run()`.
Subclass `Response.AsyncContentListener` overrides the behavior of `Response.ContentSourceListener`; when an application implements `AsyncContentListener.onContent(response, chunk, demander)`, it can control the disposing/recycling of the `ByteBuffer` by releasing the chunk _and_ it can control when to demand one more chunk by calling `demander.run()`.

// When response content is received from server1, forward it to server2.
content2.write(content, Callback.from(() ->
content2.write(chunk.getByteBuffer(), Callback.from(() ->
{
// When the request content to server2 is sent,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// When the request content to server2 is sent,
// When the request chunk is successfully sent to server2,

@@ -73,13 +72,13 @@
public class InputStreamResponseListener extends Listener.Adapter
{
private static final Logger LOG = LoggerFactory.getLogger(InputStreamResponseListener.class);
private static final Chunk EOF = new Chunk(BufferUtil.EMPTY_BUFFER, Callback.NOOP);
private static final ChunkCallback EOF = new ChunkCallback(Content.Chunk.EOF, () -> {}, (x) -> {});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the parens around x.

{
if (LOG.isDebugEnabled())
LOG.debug("Skipped empty content {}", content);
callback.succeeded();
LOG.debug("Skipped empty content {}", chunk);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LOG.debug("Skipped empty content {}", chunk);
LOG.debug("Skipped empty chunk {}", chunk);

@@ -117,17 +117,18 @@ public void onContent(Response response, ByteBuffer content, Callback callback)
if (!closed)
{
if (LOG.isDebugEnabled())
LOG.debug("Queueing content {}", content);
chunks.add(new Chunk(content, callback));
LOG.debug("Queueing content {}", chunk);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LOG.debug("Queueing content {}", chunk);
LOG.debug("Queueing chunk {}", chunk);

l.signalAll();
}
}

if (closed)
{
if (LOG.isDebugEnabled())
LOG.debug("InputStream closed, ignored content {}", content);
callback.failed(new AsynchronousCloseException());
LOG.debug("InputStream closed, ignored content {}", chunk);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LOG.debug("InputStream closed, ignored content {}", chunk);
LOG.debug("InputStream closed, ignored chunk {}", chunk);

@sbordet
Copy link
Contributor

sbordet commented Dec 23, 2022

@lorban approved, but please fix the 12 documentation/nits. Thanks!

@sbordet sbordet force-pushed the jetty-12.0.x-client-rework-content-listeners branch from 902a362 to 2cdf785 Compare January 4, 2023 00:05
@sbordet sbordet merged commit fcbdab8 into jetty-12.0.x Jan 4, 2023
@sbordet sbordet deleted the jetty-12.0.x-client-rework-content-listeners branch January 4, 2023 10:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants