Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC Output Stream drops data if AsyncIterator isn't invoked before response is received #650

Open
jcready opened this issue May 31, 2024 · 3 comments · May be fixed by #662
Open

RPC Output Stream drops data if AsyncIterator isn't invoked before response is received #650

jcready opened this issue May 31, 2024 · 3 comments · May be fixed by #662

Comments

@jcready
Copy link
Contributor

jcready commented May 31, 2024

The ServerStreamingCall's responses is an RpcOutputStream which has an AsyncIterator. If the consumer does not synchronously invoke the AsyncIterator data can be dropped.

let serverStreaming = service.serverStreamingMethod(foo);
await (new Promise((resolve) => setTimeout(resolve, 1000)));

for await (let message of serverStreaming.responses) {
    console.log("got a message", message)
}

The above is a race condition. If the service starts sending streaming responses before 1 second had passed then no console log will happen. This is despite the claim:

* - If your `for await` consumes slower than the stream produces,
* for example because you are relaying messages in a slow operation,
* messages are queued.

There is also this:

* - If the stream is already complete, the `for await` will be empty.

However I had always interpreted that to mean that if something had already finished consuming the stream then invoking the AsyncIterator a second time would result in no messages. But perhaps when it refers to stream it means the underlying network call instead of the locally buffered stream.

In either case this behavior seems undesirable and non-obvious as it would be akin to saying that if the consumer didn't await the unary response before the underlying network call finished that we'd just lose the data. e.g.

let unary = service.unaryMethod(bar);
await (new Promise((resolve) => setTimeout(resolve, 1000)));
const result = await unary.response; // undefined - dropped because we didn't await synchronously

This behavior also makes it challenging to effectively invoke multiple requests in parallel:

// issue network calls in parallel
let serverStreamingCall1 = service.serverStreamingMethod1(foo);
let serverStreamingCall2 = service.serverStreamingMethod2(bar);
let unaryCall1 = service.unaryMethod1(baz);
let unaryCall2 = service.unaryMethod2(quz);

// consume them one at a time
const unaryResult1 = await unaryCall1.response;
const unaryResult2 = await unaryCall2.response;
// does not work
for await (const streamingResult1 of serverStreamingCall1.responses) {
  // ...
}
for await (const streamingResult2 of serverStreamingCall2.responses) {
  // ...
}

In order for the consumer to actually be successful they would instead need to do this:

// issue network calls in parallel
let serverStreamingCall1 = service.serverStreamingMethod1(foo);
let serverStreamingCall2 = service.serverStreamingMethod2(bar);
let unaryCall1 = service.unaryMethod1(baz);
let unaryCall2 = service.unaryMethod2(quz);

let streamingBuffer1 = serverStreamingCall1.responses[Symbol.asyncIterator]();
let streamingBuffer2 = serverStreamingCall1.responses[Symbol.asyncIterator]();
let streamingIterator1 = { [Symbol.asyncIterator]() { return streamingBuffer1 } };
let streamingIterator2 = { [Symbol.asyncIterator]() { return streamingBuffer2 } };

// consume them one at a time
const unaryResult1 = await unaryCall1.response;
const unaryResult2 = await unaryCall2.response;
for await (const streamingResult1 of streamingIterator1) {
  // ...
}
for await (const streamingResult2 of streamingIterator2) {
  // ...
}

I assume the existing behavior might be done in order to avoid extra memory/GC in the off-hand chance that a consumer invoked the streaming call and then simply did nothing with it and/or only cared about the headers or something, but that seems like an odd thing to optimize for.

@jsaguet
Copy link

jsaguet commented Nov 25, 2024

Another use case is to synchronously push a value in an RpcOutputStreamController from an interceptor.

This breaks:

interceptServerStreaming(next, method, input, options) {
   let original = next(method, input, options);
   let response = new RpcOutputStreamController();

   const initialValue = {};
   // This message will be dropped
   response.notifyMessage(initialValue);

   original.response.onNext((message, error, done) => {
     if (message) {
       response.notifyMessage(message);
     }
     if (error)
       response.notifyError(error);
     if (done)
       response.notifyComplete();
   });
   return new ServerStreamingCall(
     original.method,
     original.requestHeaders,
     original.request,
     original.headers,
     response,
     original.status,
     original.trailers
   );
 }

The workaround is to use a setTimeout to delay the emission:

interceptServerStreaming(next, method, input, options) {
   let original = next(method, input, options);
   let response = new RpcOutputStreamController();

   const initialValue = {};
   // This is fine because the message will be sent after the async iterable is subscribed.
   setTimeout(() => {
     response.notifyMessage(initialValue);
   }, 0);

   original.response.onNext((message, error, done) => {
     if (message) {
       response.notifyMessage(message);
     }
     if (error)
       response.notifyError(error);
     if (done)
       response.notifyComplete();
   });
   return new ServerStreamingCall(
     original.method,
     original.requestHeaders,
     original.request,
     original.headers,
     response,
     original.status,
     original.trailers
   );
 }

@jcready
Copy link
Contributor Author

jcready commented Nov 25, 2024

@jsaguet a better workaround would be to initialize the asyncIterator on your own RpcOutputStreamController as that will keep the order of things deterministic and easier to test. Also, you can simplify how you're forwarding the messages a bit:

interceptServerStreaming(next, method, input, options) {
   let original = next(method, input, options);
   let response = new RpcOutputStreamController();

   // initialize the async iterator so that we have an iterator state available
   response[Symbol.asyncIterator]();

   const initialValue = {};
   response.notifyMessage(initialValue);

   original.response.onNext(response.notifyNext.bind(response));
   return new ServerStreamingCall(
     original.method,
     original.requestHeaders,
     original.request,
     original.headers,
     response,
     original.status,
     original.trailers
   );
 }

@jsaguet
Copy link

jsaguet commented Nov 25, 2024

You're absolutely right. Thanks for your help

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants