From 9ab12ca5784f724e6b7a56148502748b15d9a6c1 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 16 May 2019 13:17:11 -0400 Subject: [PATCH 01/22] Add util/async.ts --- util/async.ts | 67 ++++++++++++++++++++++++++++++++++++++++++++++ util/async_test.ts | 27 +++++++++++++++++++ util/test.ts | 1 + 3 files changed, 95 insertions(+) create mode 100644 util/async.ts create mode 100644 util/async_test.ts diff --git a/util/async.ts b/util/async.ts new file mode 100644 index 000000000000..e2bd8c958d43 --- /dev/null +++ b/util/async.ts @@ -0,0 +1,67 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. + +// TODO(ry) It'd be better to make Deferred a class that inherits from +// Promise, rather than an interface. This is possible in ES2016, however +// typescript produces broken code when targeting ES5 code. +// See https://github.com/Microsoft/TypeScript/issues/15202 +// At the time of writing, the github issue is closed but the problem remains. +export interface Deferred extends Promise { + resolve: (value?: T | PromiseLike) => void; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + reject: (reason?: any) => void; +} + +/** Creates a Promise with the `reject` and `resolve` functions + * placed as methods on the promise object itself. It allows you to do: + * + * const p = deferred(); + * // ... + * p.resolve(42); + */ +export function deferred(): Deferred { + let methods; + const promise = new Promise( + (resolve, reject): void => { + methods = { resolve, reject }; + } + ); + return Object.assign(promise, methods) as Deferred; +} + +export class Latch { + // TODO(ry) Can this be done without using Arrays? + + // Array of `[resolve_function, value]` tuples. + private sendQueue: Array<[() => void, T]> = []; + // Array of `resolve_function` values. + private recvQueue: Array<(value: T) => void> = []; + + send(value: T): Promise { + const recvResolve = this.recvQueue.shift(); + if (recvResolve) { + recvResolve(value); + return Promise.resolve(); + } else { + return new Promise( + (resolve, _): void => { + this.sendQueue.push([resolve, value]); + } + ); + } + } + + recv(): Promise { + const s = this.sendQueue.shift(); + if (s) { + const [sendResolve, value] = s; + sendResolve(); + return Promise.resolve(value); + } else { + return new Promise( + (res, _): void => { + this.recvQueue.push(res); + } + ); + } + } +} diff --git a/util/async_test.ts b/util/async_test.ts new file mode 100644 index 000000000000..2da72075b0c9 --- /dev/null +++ b/util/async_test.ts @@ -0,0 +1,27 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +import { test, runIfMain } from "../testing/mod.ts"; +import { assertEquals } from "../testing/asserts.ts"; +import { Latch, deferred } from "./async.ts"; + +test(async function asyncDeferred(): Promise { + const d = deferred(); + d.resolve(12); +}); + +async function send3(latch: Latch): Promise { + await latch.send(1); + await latch.send(2); + await latch.send(3); +} + +test(async function asyncLatch(): Promise { + const latch = new Latch(); + send3(latch); + + assertEquals(1, await latch.recv()); + assertEquals(2, await latch.recv()); + assertEquals(3, await latch.recv()); + let _lastPromise = latch.recv(); +}); + +runIfMain(import.meta); diff --git a/util/test.ts b/util/test.ts index a617c10ab36d..ede984904f0d 100644 --- a/util/test.ts +++ b/util/test.ts @@ -1 +1,2 @@ +import "./async_test.ts"; import "./deep_assign_test.ts"; From 1ab72b32a0745669385398e0dc7eacd0e59f0078 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 16 May 2019 13:24:32 -0400 Subject: [PATCH 02/22] http/server: Clean up async code --- http/http_bench.ts | 5 +- http/server.ts | 155 ++++++++++++++++++-------------------------- http/server_test.ts | 33 ++-------- 3 files changed, 73 insertions(+), 120 deletions(-) diff --git a/http/http_bench.ts b/http/http_bench.ts index 6d72d4be6ab9..06043f9e4e82 100644 --- a/http/http_bench.ts +++ b/http/http_bench.ts @@ -3,13 +3,12 @@ import { serve } from "./server.ts"; const addr = Deno.args[1] || "127.0.0.1:4500"; const server = serve(addr); - const body = new TextEncoder().encode("Hello World"); async function main(): Promise { console.log(`http://${addr}/`); - for await (const request of server) { - request.respond({ status: 200, body }); + for await (const req of server) { + req.respond({ body }); } } diff --git a/http/server.ts b/http/server.ts index 484ecf808d1e..30e3146ce0c8 100644 --- a/http/server.ts +++ b/http/server.ts @@ -1,5 +1,6 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. const { listen, copy, toAsyncIterator } = Deno; +type Listener = Deno.Listener; type Conn = Deno.Conn; type Reader = Deno.Reader; type Writer = Deno.Writer; @@ -7,37 +8,14 @@ import { BufReader, BufState, BufWriter } from "../io/bufio.ts"; import { TextProtoReader } from "../textproto/mod.ts"; import { STATUS_TEXT } from "./http_status.ts"; import { assert } from "../testing/asserts.ts"; - -interface Deferred { - promise: Promise<{}>; - resolve: () => void; - reject: () => void; -} - -function deferred(isResolved = false): Deferred { - let resolve, reject; - const promise = new Promise( - (res, rej): void => { - resolve = res; - reject = rej; - } - ); - if (isResolved) { - resolve(); - } - return { - promise, - resolve, - reject - }; -} +import { Latch, deferred, Deferred } from "../util/async.ts"; interface HttpConn extends Conn { // When read by a newly created request B, lastId is the id pointing to a previous // request A, such that we must wait for responses to A to complete before // writing B's response. lastPipelineId: number; - pendingDeferredMap: Map; + pendingDeferredMap: Map>; } function createHttpConn(c: Conn): HttpConn { @@ -46,8 +24,10 @@ function createHttpConn(c: Conn): HttpConn { pendingDeferredMap: new Map() }); - const resolvedDeferred = deferred(true); - httpConn.pendingDeferredMap.set(0, resolvedDeferred); + const d = deferred(); + d.resolve(); // The first request is ready immediately. + httpConn.pendingDeferredMap.set(0, d); + return httpConn; } @@ -58,6 +38,7 @@ function bufWriter(w: Writer): BufWriter { return new BufWriter(w); } } + export function setContentLength(r: Response): void { if (!r.headers) { r.headers = new Headers(); @@ -74,6 +55,7 @@ export function setContentLength(r: Response): void { } } } + async function writeChunkedBody(w: Writer, r: Reader): Promise { const writer = bufWriter(w); const encoder = new TextEncoder(); @@ -90,6 +72,7 @@ async function writeChunkedBody(w: Writer, r: Reader): Promise { const endChunk = encoder.encode("0\r\n\r\n"); await writer.write(endChunk); } + export async function writeResponse(w: Writer, r: Response): Promise { const protoMajor = 1; const protoMinor = 1; @@ -131,6 +114,7 @@ export async function writeResponse(w: Writer, r: Response): Promise { } await writer.flush(); } + async function readAllIterator( it: AsyncIterableIterator ): Promise { @@ -250,7 +234,7 @@ export class ServerRequest { lastPipelineId ); assert(!!lastPipelineDeferred); - await lastPipelineDeferred.promise; + await lastPipelineDeferred; // If yes, delete old deferred and proceed with writing. this.conn.pendingDeferredMap.delete(lastPipelineId); // Write our response! @@ -264,11 +248,6 @@ export class ServerRequest { } } -interface ServeEnv { - reqQueue: ServerRequest[]; - serveDeferred: Deferred; -} - /** Continuously read more requests from conn until EOF * Calls maybeHandleReq. * bufr is empty on a fresh TCP connection. @@ -279,11 +258,8 @@ interface ServeEnv { */ async function readRequest( c: HttpConn, - bufr?: BufReader + bufr: BufReader ): Promise<[ServerRequest, BufState]> { - if (!bufr) { - bufr = new BufReader(c); - } const bufw = new BufWriter(c); const req = new ServerRequest(); @@ -313,65 +289,62 @@ async function readRequest( return [req, err]; } -function maybeHandleReq( - env: ServeEnv, - conn: Conn, - maybeReq: [ServerRequest, BufState] -): void { - const [req, _err] = maybeReq; - if (_err) { - conn.close(); // assume EOF for now... - return; +export class Server implements AsyncIterableIterator { + private closing = false; + private looping = false; + private latch = new Latch(); + + constructor(public listener: Listener) {} + + async acceptLoop(): Promise { + assert(!this.looping); + this.looping = true; + try { + while (!this.closing) { + const conn = await this.listener.accept(); + this.serveConn(conn); // async! + } + } finally { + this.looping = false; + } } - env.reqQueue.push(req); // push req to queue - env.serveDeferred.resolve(); // signal while loop to process it -} -function serveConn(env: ServeEnv, conn: HttpConn, bufr?: BufReader): void { - readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn)); -} + close(): void { + this.closing = true; + this.listener.close(); + } -export async function* serve( - addr: string -): AsyncIterableIterator { - const listener = listen("tcp", addr); - const env: ServeEnv = { - reqQueue: [], // in case multiple promises are ready - serveDeferred: deferred() - }; - - // Routine that keeps calling accept - let handleConn = (_conn: Conn): void => {}; - let scheduleAccept = (): void => {}; - const acceptRoutine = (): void => { - scheduleAccept = (): void => { - listener.accept().then(handleConn); - }; - handleConn = (conn: Conn): void => { - const httpConn = createHttpConn(conn); - serveConn(env, httpConn); // don't block - scheduleAccept(); // schedule next accept - }; - - scheduleAccept(); - }; - - acceptRoutine(); - - // Loop hack to allow yield (yield won't work in callbacks) - while (true) { - await env.serveDeferred.promise; - env.serveDeferred = deferred(); // use a new deferred - let queueToProcess = env.reqQueue; - env.reqQueue = []; - for (const result of queueToProcess) { - yield result; - // Continue read more from conn when user is done with the current req - // Moving this here makes it easier to manage - serveConn(env, result.conn, result.r); + async serveConn(conn: Conn): Promise { + const httpConn = createHttpConn(conn); + const bufr = new BufReader(httpConn); + while (true) { + const [req, err] = await readRequest(httpConn, bufr); + if (err) { + // TODO(ry) This should be more granular. Perhaps return back a 400 or + // 500 error? + httpConn.close(); + break; + } + + await this.latch.send(req); } } - listener.close(); + + async next(): Promise> { + const req = await this.latch.recv(); + return { done: false, value: req }; + } + + [Symbol.asyncIterator](): AsyncIterableIterator { + return this; + } +} + +export function serve(addr: string): Server { + const listener = listen("tcp", addr); + const server = new Server(listener); + server.acceptLoop(); + return server; } export async function listenAndServe( diff --git a/http/server_test.ts b/http/server_test.ts index 82a368395e0d..904a667c7f9d 100644 --- a/http/server_test.ts +++ b/http/server_test.ts @@ -11,6 +11,7 @@ import { assertEquals } from "../testing/asserts.ts"; import { Response, ServerRequest, writeResponse } from "./server.ts"; import { BufReader, BufWriter } from "../io/bufio.ts"; import { StringReader } from "../io/readers.ts"; +import { deferred } from "../util/async.ts"; interface ResponseTest { response: Response; @@ -22,31 +23,6 @@ const dec = new TextDecoder(); type Handler = () => void; -interface Deferred { - promise: Promise<{}>; - resolve: Handler; - reject: Handler; -} - -function deferred(isResolved = false): Deferred { - let resolve: Handler = (): void => void 0; - let reject: Handler = (): void => void 0; - const promise = new Promise( - (res, rej): void => { - resolve = res; - reject = rej; - } - ); - if (isResolved) { - resolve(); - } - return { - promise, - resolve, - reject - }; -} - const responseTests: ResponseTest[] = [ // Default response { @@ -74,6 +50,11 @@ test(async function responseWrite(): Promise { const request = new ServerRequest(); request.pipelineId = 1; request.w = bufw; + + const d0 = deferred(); + d0.resolve(); + const d1 = deferred(); + request.conn = { localAddr: "", remoteAddr: "", @@ -88,7 +69,7 @@ test(async function responseWrite(): Promise { }, close: (): void => {}, lastPipelineId: 0, - pendingDeferredMap: new Map([[0, deferred(true)], [1, deferred()]]) + pendingDeferredMap: new Map([[0, d0], [1, d1]]) }; await request.respond(testCase.response); From 79065b4cfa68112cf4e98fa894ec03777de320cc Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Fri, 17 May 2019 12:18:35 -0400 Subject: [PATCH 03/22] Rename Latch to Channel --- http/server.ts | 8 ++++---- prettier/testdata/opts/0.ts | 4 ++-- prettier/testdata/opts/1.ts | 2 +- prettier/testdata/opts/2.ts | 2 +- util/async.ts | 8 +++----- util/async_test.ts | 24 ++++++++++++------------ 6 files changed, 23 insertions(+), 25 deletions(-) diff --git a/http/server.ts b/http/server.ts index 30e3146ce0c8..5bbc7fd08174 100644 --- a/http/server.ts +++ b/http/server.ts @@ -8,7 +8,7 @@ import { BufReader, BufState, BufWriter } from "../io/bufio.ts"; import { TextProtoReader } from "../textproto/mod.ts"; import { STATUS_TEXT } from "./http_status.ts"; import { assert } from "../testing/asserts.ts"; -import { Latch, deferred, Deferred } from "../util/async.ts"; +import { Channel, deferred, Deferred } from "../util/async.ts"; interface HttpConn extends Conn { // When read by a newly created request B, lastId is the id pointing to a previous @@ -292,7 +292,7 @@ async function readRequest( export class Server implements AsyncIterableIterator { private closing = false; private looping = false; - private latch = new Latch(); + private channel = new Channel(); constructor(public listener: Listener) {} @@ -326,12 +326,12 @@ export class Server implements AsyncIterableIterator { break; } - await this.latch.send(req); + await this.channel.send(req); } } async next(): Promise> { - const req = await this.latch.recv(); + const req = await this.channel.recv(); return { done: false, value: req }; } diff --git a/prettier/testdata/opts/0.ts b/prettier/testdata/opts/0.ts index fb85014a5e9e..0277269118cf 100644 --- a/prettier/testdata/opts/0.ts +++ b/prettier/testdata/opts/0.ts @@ -1,2 +1,2 @@ -console.log(0) -console.log([function foo() {}, function baz() {}, a => {}]) +console.log(0); +console.log([function foo() {}, function baz() {}, (a) => {}]); diff --git a/prettier/testdata/opts/1.ts b/prettier/testdata/opts/1.ts index c23a66c28e91..4748527e94fa 100644 --- a/prettier/testdata/opts/1.ts +++ b/prettier/testdata/opts/1.ts @@ -1 +1 @@ -console.log ("1") +console.log('1'); diff --git a/prettier/testdata/opts/2.ts b/prettier/testdata/opts/2.ts index 2cfe9aece463..e0c70341fd62 100644 --- a/prettier/testdata/opts/2.ts +++ b/prettier/testdata/opts/2.ts @@ -1 +1 @@ -console.log({a:1}) +console.log({a: 1}); diff --git a/util/async.ts b/util/async.ts index e2bd8c958d43..c664c9ce28b3 100644 --- a/util/async.ts +++ b/util/async.ts @@ -28,12 +28,10 @@ export function deferred(): Deferred { return Object.assign(promise, methods) as Deferred; } -export class Latch { - // TODO(ry) Can this be done without using Arrays? - - // Array of `[resolve_function, value]` tuples. +/** Sends objects between asynchronous tasks, with backpressure. */ +export class Channel { + // TODO(ry) Can Channel be implemented without using Arrays? private sendQueue: Array<[() => void, T]> = []; - // Array of `resolve_function` values. private recvQueue: Array<(value: T) => void> = []; send(value: T): Promise { diff --git a/util/async_test.ts b/util/async_test.ts index 2da72075b0c9..226cb784dc92 100644 --- a/util/async_test.ts +++ b/util/async_test.ts @@ -1,27 +1,27 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. import { test, runIfMain } from "../testing/mod.ts"; import { assertEquals } from "../testing/asserts.ts"; -import { Latch, deferred } from "./async.ts"; +import { Channel, deferred } from "./async.ts"; test(async function asyncDeferred(): Promise { const d = deferred(); d.resolve(12); }); -async function send3(latch: Latch): Promise { - await latch.send(1); - await latch.send(2); - await latch.send(3); +async function send3(channel: Channel): Promise { + await channel.send(1); + await channel.send(2); + await channel.send(3); } -test(async function asyncLatch(): Promise { - const latch = new Latch(); - send3(latch); +test(async function asyncChannel(): Promise { + const channel = new Channel(); + send3(channel); - assertEquals(1, await latch.recv()); - assertEquals(2, await latch.recv()); - assertEquals(3, await latch.recv()); - let _lastPromise = latch.recv(); + assertEquals(1, await channel.recv()); + assertEquals(2, await channel.recv()); + assertEquals(3, await channel.recv()); + let _lastPromise = channel.recv(); }); runIfMain(import.meta); From 29f3e8e7cb7b4cd535554d2a37ceb57ca7bb6320 Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Fri, 17 May 2019 18:21:10 -0700 Subject: [PATCH 04/22] This is how it should be done --- http/server.ts | 108 ++++++++++++++++++++----------------------------- 1 file changed, 44 insertions(+), 64 deletions(-) diff --git a/http/server.ts b/http/server.ts index 5bbc7fd08174..0f38d3a9d50d 100644 --- a/http/server.ts +++ b/http/server.ts @@ -249,102 +249,82 @@ export class ServerRequest { } /** Continuously read more requests from conn until EOF - * Calls maybeHandleReq. * bufr is empty on a fresh TCP connection. * Would be passed around and reused for later request on same conn * TODO: make them async function after this change is done * https://github.com/tc39/ecma262/pull/1250 * See https://v8.dev/blog/fast-async */ -async function readRequest( - c: HttpConn, - bufr: BufReader -): Promise<[ServerRequest, BufState]> { +async function* iterateHttpRequests( + c: HttpConn +): AsyncIterableIterator<[ServerRequest | null, BufState]> { + const bufr = new BufReader(c); const bufw = new BufWriter(c); - const req = new ServerRequest(); - - // Set and incr pipeline id; - req.pipelineId = ++c.lastPipelineId; - // Set a new pipeline deferred associated with this request - // for future requests to wait for. - c.pendingDeferredMap.set(req.pipelineId, deferred()); - - req.conn = c; - req.r = bufr!; - req.w = bufw; - const tp = new TextProtoReader(bufr!); - - let s: string; - let err: BufState; - - // First line: GET /index.html HTTP/1.0 - [s, err] = await tp.readLine(); - if (err) { - return [null, err]; - } - [req.method, req.url, req.proto] = s.split(" ", 3); - [req.headers, err] = await tp.readMIMEHeader(); + for (;;) { + const req = new ServerRequest(); + + // Set and incr pipeline id; + req.pipelineId = ++c.lastPipelineId; + // Set a new pipeline deferred associated with this request + // for future requests to wait for. + c.pendingDeferredMap.set(req.pipelineId, deferred()); + + req.conn = c; + req.r = bufr!; + req.w = bufw; + + // First line: GET /index.html HTTP/1.0 + const tp = new TextProtoReader(bufr!); + let [s, err]: [string, BufState] = await tp.readLine(); + if (err) { + yield [null, err]; + return; + } - return [req, err]; + [req.method, req.url, req.proto] = s.split(" ", 3); + [req.headers, err] = await tp.readMIMEHeader(); + yield [req, err]; + } } -export class Server implements AsyncIterableIterator { +export class Server implements AsyncIterable { private closing = false; private looping = false; private channel = new Channel(); constructor(public listener: Listener) {} - async acceptLoop(): Promise { - assert(!this.looping); - this.looping = true; - try { - while (!this.closing) { - const conn = await this.listener.accept(); - this.serveConn(conn); // async! - } - } finally { - this.looping = false; - } - } - close(): void { this.closing = true; this.listener.close(); } - async serveConn(conn: Conn): Promise { - const httpConn = createHttpConn(conn); - const bufr = new BufReader(httpConn); - while (true) { - const [req, err] = await readRequest(httpConn, bufr); - if (err) { - // TODO(ry) This should be more granular. Perhaps return back a 400 or - // 500 error? - httpConn.close(); - break; + private async *iterateRequests(): AsyncIterableIterator { + while (!this.closing) { + const conn = await this.listener.accept(); + const httpConn = createHttpConn(conn); + + for await (const [req, err] of iterateHttpRequests(httpConn)) { + if (err) { + // TODO(ry) This should be more granular. Perhaps return back a 400 or + // 500 error? + httpConn.close(); + break; + } + yield req; } - - await this.channel.send(req); } } - async next(): Promise> { - const req = await this.channel.recv(); - return { done: false, value: req }; - } - [Symbol.asyncIterator](): AsyncIterableIterator { - return this; + return this.iterateRequests(); } } export function serve(addr: string): Server { const listener = listen("tcp", addr); - const server = new Server(listener); - server.acceptLoop(); - return server; + return new Server(listener); } export async function listenAndServe( From f5d9359db0d110de2c4bc2c50bbbaa9e35304b6f Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Fri, 17 May 2019 19:28:31 -0700 Subject: [PATCH 05/22] Clean up a bit more --- http/server.ts | 71 ++++++++++++++++++++++++-------------------------- 1 file changed, 34 insertions(+), 37 deletions(-) diff --git a/http/server.ts b/http/server.ts index 0f38d3a9d50d..733c39ef512b 100644 --- a/http/server.ts +++ b/http/server.ts @@ -7,8 +7,8 @@ type Writer = Deno.Writer; import { BufReader, BufState, BufWriter } from "../io/bufio.ts"; import { TextProtoReader } from "../textproto/mod.ts"; import { STATUS_TEXT } from "./http_status.ts"; -import { assert } from "../testing/asserts.ts"; -import { Channel, deferred, Deferred } from "../util/async.ts"; +import { assert, fail } from "../testing/asserts.ts"; +import { deferred, Deferred } from "../util/async.ts"; interface HttpConn extends Conn { // When read by a newly created request B, lastId is the id pointing to a previous @@ -256,42 +256,53 @@ export class ServerRequest { * See https://v8.dev/blog/fast-async */ async function* iterateHttpRequests( - c: HttpConn -): AsyncIterableIterator<[ServerRequest | null, BufState]> { - const bufr = new BufReader(c); - const bufw = new BufWriter(c); + conn: Conn +): AsyncIterableIterator { + const http_conn = createHttpConn(conn); + const bufr = new BufReader(http_conn); + const bufw = new BufWriter(http_conn); + const tp = new TextProtoReader(bufr); + + let buf_state_err: BufState; for (;;) { const req = new ServerRequest(); // Set and incr pipeline id; - req.pipelineId = ++c.lastPipelineId; + req.pipelineId = ++http_conn.lastPipelineId; // Set a new pipeline deferred associated with this request // for future requests to wait for. - c.pendingDeferredMap.set(req.pipelineId, deferred()); + http_conn.pendingDeferredMap.set(req.pipelineId, deferred()); - req.conn = c; - req.r = bufr!; + req.conn = http_conn; + req.r = bufr; req.w = bufw; // First line: GET /index.html HTTP/1.0 - const tp = new TextProtoReader(bufr!); - let [s, err]: [string, BufState] = await tp.readLine(); - if (err) { - yield [null, err]; - return; - } + let first_line: string; + [first_line, buf_state_err] = await tp.readLine(); + if (buf_state_err !== null) break; + [req.method, req.url, req.proto] = first_line.split(" ", 3); + + [req.headers, buf_state_err] = await tp.readMIMEHeader(); + if (buf_state_err !== null) break; - [req.method, req.url, req.proto] = s.split(" ", 3); - [req.headers, err] = await tp.readMIMEHeader(); - yield [req, err]; + yield req; + } + + if (buf_state_err === "EOF") { + // The connection was gracefully closed. + } else if (buf_state_err instanceof Error) { + // TODO(ry): send something back like a HTTP 500 status. + } else { + fail(`unexpected BufState: ${buf_state_err}`); } + + http_conn.close(); } export class Server implements AsyncIterable { private closing = false; - private looping = false; - private channel = new Channel(); constructor(public listener: Listener) {} @@ -300,26 +311,12 @@ export class Server implements AsyncIterable { this.listener.close(); } - private async *iterateRequests(): AsyncIterableIterator { + async *[Symbol.asyncIterator](): AsyncIterableIterator { while (!this.closing) { const conn = await this.listener.accept(); - const httpConn = createHttpConn(conn); - - for await (const [req, err] of iterateHttpRequests(httpConn)) { - if (err) { - // TODO(ry) This should be more granular. Perhaps return back a 400 or - // 500 error? - httpConn.close(); - break; - } - yield req; - } + yield* iterateHttpRequests(conn); } } - - [Symbol.asyncIterator](): AsyncIterableIterator { - return this.iterateRequests(); - } } export function serve(addr: string): Server { From 76ce233fb48880bcd0c82f6cd95c57cb822352ff Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Fri, 17 May 2019 20:58:54 -0700 Subject: [PATCH 06/22] Serve multiple requests in parallel (with backpressure) --- http/server.ts | 114 +++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 92 insertions(+), 22 deletions(-) diff --git a/http/server.ts b/http/server.ts index 733c39ef512b..165643b47aa9 100644 --- a/http/server.ts +++ b/http/server.ts @@ -258,47 +258,106 @@ export class ServerRequest { async function* iterateHttpRequests( conn: Conn ): AsyncIterableIterator { - const http_conn = createHttpConn(conn); + const httpConn = createHttpConn(conn); - const bufr = new BufReader(http_conn); - const bufw = new BufWriter(http_conn); + const bufr = new BufReader(httpConn); + const bufw = new BufWriter(httpConn); const tp = new TextProtoReader(bufr); - let buf_state_err: BufState; + let bufStateErr: BufState; for (;;) { const req = new ServerRequest(); // Set and incr pipeline id; - req.pipelineId = ++http_conn.lastPipelineId; + req.pipelineId = ++httpConn.lastPipelineId; // Set a new pipeline deferred associated with this request // for future requests to wait for. - http_conn.pendingDeferredMap.set(req.pipelineId, deferred()); + httpConn.pendingDeferredMap.set(req.pipelineId, deferred()); - req.conn = http_conn; + req.conn = httpConn; req.r = bufr; req.w = bufw; // First line: GET /index.html HTTP/1.0 - let first_line: string; - [first_line, buf_state_err] = await tp.readLine(); - if (buf_state_err !== null) break; - [req.method, req.url, req.proto] = first_line.split(" ", 3); + let firstLine: string; + [firstLine, bufStateErr] = await tp.readLine(); + if (bufStateErr !== null) break; + [req.method, req.url, req.proto] = firstLine.split(" ", 3); - [req.headers, buf_state_err] = await tp.readMIMEHeader(); - if (buf_state_err !== null) break; + [req.headers, bufStateErr] = await tp.readMIMEHeader(); + if (bufStateErr !== null) break; yield req; } - if (buf_state_err === "EOF") { + if (bufStateErr === "EOF") { // The connection was gracefully closed. - } else if (buf_state_err instanceof Error) { + } else if (bufStateErr instanceof Error) { // TODO(ry): send something back like a HTTP 500 status. } else { - fail(`unexpected BufState: ${buf_state_err}`); + fail(`unexpected BufState: ${bufStateErr}`); } - http_conn.close(); + httpConn.close(); +} + +// The MuxAsyncIterator class multiplexes multiple async iterators into a +// single stream. It currently makes a few assumptions: +// * The iterators do not throw. +// * The final result (the value returned and not yielded from the iterator) +// does not matter; if there is any, it is discarded. +// * Adding an iterator while the multiplexer is blocked does not take +// effect immediately. +interface WrappedIteratorResult { + iterator: AsyncIterableIterator; + result: IteratorResult; +} +class MuxAsyncIterator { + private iteratorNextPromiseMap: Map< + AsyncIterableIterator, + Promise> + > = new Map(); + + private async wrapIteratorNext( + iterator: AsyncIterableIterator + ): Promise> { + return { iterator, result: await iterator.next() }; + } + + add(iterator: AsyncIterableIterator) { + this.iteratorNextPromiseMap.set(iterator, this.wrapIteratorNext(iterator)); + } + + async next(): Promise> { + while (this.iteratorNextPromiseMap.size > 0) { + // Wait for the next iteration result of any of the iterators, whichever + // yields first. + const { iterator, result }: WrappedIteratorResult = await Promise.race( + this.iteratorNextPromiseMap.values() + ); + assert(this.iteratorNextPromiseMap.has(iterator)); + + if (result.done) { + // The iterator that yielded is done, remove it from the map. + this.iteratorNextPromiseMap.delete(iterator); + } else { + // The iterator has yielded a value. Call `next()` on it, wrap the + // returned promise, and store it in the map. + this.iteratorNextPromiseMap.set( + iterator, + this.wrapIteratorNext(iterator) + ); + return result; + } + } + + // There are no iterators left in the multiplexer, so report we're done. + return { value: null, done: true }; + } + + [Symbol.asyncIterator]() { + return this; + } } export class Server implements AsyncIterable { @@ -311,11 +370,22 @@ export class Server implements AsyncIterable { this.listener.close(); } - async *[Symbol.asyncIterator](): AsyncIterableIterator { - while (!this.closing) { - const conn = await this.listener.accept(); - yield* iterateHttpRequests(conn); - } + async *iterateRequestsOnNewConnection( + mux: MuxAsyncIterator + ): AsyncIterableIterator { + if (this.closing) return; + // Wait for a new connection. + const conn = await this.listener.accept(); + // Try to accept another connection and add it to the multiplexer. + mux.add(this.iterateRequestsOnNewConnection(mux)); + // Yield the requests that arrive on the just-accepted connection. + yield* iterateHttpRequests(conn); + } + + [Symbol.asyncIterator](): AsyncIterableIterator { + const mux: MuxAsyncIterator = new MuxAsyncIterator(); + mux.add(this.iterateRequestsOnNewConnection(mux)); + return mux; } } From 1adb3a9cf8ac650a63a9cfac8e2797f279115874 Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Fri, 17 May 2019 21:03:44 -0700 Subject: [PATCH 07/22] fix --- http/server.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/http/server.ts b/http/server.ts index 165643b47aa9..8ee56cdd3c5e 100644 --- a/http/server.ts +++ b/http/server.ts @@ -312,7 +312,7 @@ interface WrappedIteratorResult { iterator: AsyncIterableIterator; result: IteratorResult; } -class MuxAsyncIterator { +class MuxAsyncIterator implements AsyncIterableIterator { private iteratorNextPromiseMap: Map< AsyncIterableIterator, Promise> @@ -324,7 +324,7 @@ class MuxAsyncIterator { return { iterator, result: await iterator.next() }; } - add(iterator: AsyncIterableIterator) { + add(iterator: AsyncIterableIterator): void { this.iteratorNextPromiseMap.set(iterator, this.wrapIteratorNext(iterator)); } @@ -355,7 +355,7 @@ class MuxAsyncIterator { return { value: null, done: true }; } - [Symbol.asyncIterator]() { + [Symbol.asyncIterator](): MuxAsyncIterator { return this; } } From 6e3b8846b42a967c31be5d98f93f892d6078386f Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Sat, 18 May 2019 09:50:37 -0400 Subject: [PATCH 08/22] Move MuxAsyncIterator to util/async.ts --- http/server.ts | 61 +--------------------------------- util/async.ts | 82 ++++++++++++++++++++++++++++++---------------- util/async_test.ts | 37 +++++++++++++-------- 3 files changed, 79 insertions(+), 101 deletions(-) diff --git a/http/server.ts b/http/server.ts index 8ee56cdd3c5e..568a0c659177 100644 --- a/http/server.ts +++ b/http/server.ts @@ -8,7 +8,7 @@ import { BufReader, BufState, BufWriter } from "../io/bufio.ts"; import { TextProtoReader } from "../textproto/mod.ts"; import { STATUS_TEXT } from "./http_status.ts"; import { assert, fail } from "../testing/asserts.ts"; -import { deferred, Deferred } from "../util/async.ts"; +import { deferred, Deferred, MuxAsyncIterator } from "../util/async.ts"; interface HttpConn extends Conn { // When read by a newly created request B, lastId is the id pointing to a previous @@ -301,65 +301,6 @@ async function* iterateHttpRequests( httpConn.close(); } -// The MuxAsyncIterator class multiplexes multiple async iterators into a -// single stream. It currently makes a few assumptions: -// * The iterators do not throw. -// * The final result (the value returned and not yielded from the iterator) -// does not matter; if there is any, it is discarded. -// * Adding an iterator while the multiplexer is blocked does not take -// effect immediately. -interface WrappedIteratorResult { - iterator: AsyncIterableIterator; - result: IteratorResult; -} -class MuxAsyncIterator implements AsyncIterableIterator { - private iteratorNextPromiseMap: Map< - AsyncIterableIterator, - Promise> - > = new Map(); - - private async wrapIteratorNext( - iterator: AsyncIterableIterator - ): Promise> { - return { iterator, result: await iterator.next() }; - } - - add(iterator: AsyncIterableIterator): void { - this.iteratorNextPromiseMap.set(iterator, this.wrapIteratorNext(iterator)); - } - - async next(): Promise> { - while (this.iteratorNextPromiseMap.size > 0) { - // Wait for the next iteration result of any of the iterators, whichever - // yields first. - const { iterator, result }: WrappedIteratorResult = await Promise.race( - this.iteratorNextPromiseMap.values() - ); - assert(this.iteratorNextPromiseMap.has(iterator)); - - if (result.done) { - // The iterator that yielded is done, remove it from the map. - this.iteratorNextPromiseMap.delete(iterator); - } else { - // The iterator has yielded a value. Call `next()` on it, wrap the - // returned promise, and store it in the map. - this.iteratorNextPromiseMap.set( - iterator, - this.wrapIteratorNext(iterator) - ); - return result; - } - } - - // There are no iterators left in the multiplexer, so report we're done. - return { value: null, done: true }; - } - - [Symbol.asyncIterator](): MuxAsyncIterator { - return this; - } -} - export class Server implements AsyncIterable { private closing = false; diff --git a/util/async.ts b/util/async.ts index c664c9ce28b3..7e60bf416327 100644 --- a/util/async.ts +++ b/util/async.ts @@ -1,4 +1,5 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +import { assert } from "../testing/asserts.ts"; // TODO(ry) It'd be better to make Deferred a class that inherits from // Promise, rather than an interface. This is possible in ES2016, however @@ -28,38 +29,63 @@ export function deferred(): Deferred { return Object.assign(promise, methods) as Deferred; } -/** Sends objects between asynchronous tasks, with backpressure. */ -export class Channel { - // TODO(ry) Can Channel be implemented without using Arrays? - private sendQueue: Array<[() => void, T]> = []; - private recvQueue: Array<(value: T) => void> = []; +interface WrappedIteratorResult { + iterator: AsyncIterableIterator; + result: IteratorResult; +} - send(value: T): Promise { - const recvResolve = this.recvQueue.shift(); - if (recvResolve) { - recvResolve(value); - return Promise.resolve(); - } else { - return new Promise( - (resolve, _): void => { - this.sendQueue.push([resolve, value]); - } - ); - } +/** The MuxAsyncIterator class multiplexes multiple async iterators into a + * single stream. It currently makes a few assumptions: + * - The iterators do not throw. + * - The final result (the value returned and not yielded from the iterator) + * does not matter; if there is any, it is discarded. + * - Adding an iterator while the multiplexer is blocked does not take effect + * immediately. + */ +export class MuxAsyncIterator implements AsyncIterableIterator { + private iteratorNextPromiseMap: Map< + AsyncIterableIterator, + Promise> + > = new Map(); + + private async wrapIteratorNext( + iterator: AsyncIterableIterator + ): Promise> { + return { iterator, result: await iterator.next() }; } - recv(): Promise { - const s = this.sendQueue.shift(); - if (s) { - const [sendResolve, value] = s; - sendResolve(); - return Promise.resolve(value); - } else { - return new Promise( - (res, _): void => { - this.recvQueue.push(res); - } + add(iterator: AsyncIterableIterator): void { + this.iteratorNextPromiseMap.set(iterator, this.wrapIteratorNext(iterator)); + } + + async next(): Promise> { + while (this.iteratorNextPromiseMap.size > 0) { + // Wait for the next iteration result of any of the iterators, whichever + // yields first. + const { iterator, result }: WrappedIteratorResult = await Promise.race( + this.iteratorNextPromiseMap.values() ); + assert(this.iteratorNextPromiseMap.has(iterator)); + + if (result.done) { + // The iterator that yielded is done, remove it from the map. + this.iteratorNextPromiseMap.delete(iterator); + } else { + // The iterator has yielded a value. Call `next()` on it, wrap the + // returned promise, and store it in the map. + this.iteratorNextPromiseMap.set( + iterator, + this.wrapIteratorNext(iterator) + ); + return result; + } } + + // There are no iterators left in the multiplexer, so report we're done. + return { value: null, done: true }; + } + + [Symbol.asyncIterator](): MuxAsyncIterator { + return this; } } diff --git a/util/async_test.ts b/util/async_test.ts index 226cb784dc92..0f9bf28a8c15 100644 --- a/util/async_test.ts +++ b/util/async_test.ts @@ -1,27 +1,38 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. import { test, runIfMain } from "../testing/mod.ts"; -import { assertEquals } from "../testing/asserts.ts"; -import { Channel, deferred } from "./async.ts"; +import { assert, assertEquals } from "../testing/asserts.ts"; +import { MuxAsyncIterator, deferred } from "./async.ts"; test(async function asyncDeferred(): Promise { const d = deferred(); d.resolve(12); }); -async function send3(channel: Channel): Promise { - await channel.send(1); - await channel.send(2); - await channel.send(3); +async function* gen123(): AsyncIterableIterator { + yield 1; + yield 2; + yield 3; } -test(async function asyncChannel(): Promise { - const channel = new Channel(); - send3(channel); +async function* gen456(): AsyncIterableIterator { + yield 4; + yield 5; + yield 6; +} - assertEquals(1, await channel.recv()); - assertEquals(2, await channel.recv()); - assertEquals(3, await channel.recv()); - let _lastPromise = channel.recv(); +test(async function asyncMuxAsyncIterator(): Promise { + const mux = new MuxAsyncIterator(); + mux.add(gen123()); + mux.add(gen456()); + const results = new Set(); + for (let i = 0; i < 6; i++) { + let r = await mux.next(); + assert(!r.done); + results.add(r.value); + } + let r = await mux.next(); + assert(r.done); + assertEquals(results.size, 6); }); runIfMain(import.meta); From 49d5a6b2bdcd4bc7b1786acfafae454bb60ec524 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Sat, 18 May 2019 10:09:52 -0400 Subject: [PATCH 09/22] Bring back readRequest --- http/server.ts | 56 ++++++++++++++++++++++++++++---------------------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/http/server.ts b/http/server.ts index 568a0c659177..14b2110f2548 100644 --- a/http/server.ts +++ b/http/server.ts @@ -248,6 +248,34 @@ export class ServerRequest { } } +async function readRequest( + httpConn: HttpConn, + bufr: BufReader +): Promise<[ServerRequest, BufState]> { + const req = new ServerRequest(); + + // Set and incr pipeline id; + req.pipelineId = ++httpConn.lastPipelineId; + // Set a new pipeline deferred associated with this request + // for future requests to wait for. + httpConn.pendingDeferredMap.set(req.pipelineId, deferred()); + + req.conn = httpConn; + req.r = bufr; + req.w = new BufWriter(httpConn); + const tp = new TextProtoReader(bufr); + let err: BufState; + // First line: GET /index.html HTTP/1.0 + let firstLine: string; + [firstLine, err] = await tp.readLine(); + if (err) { + return [null, err]; + } + [req.method, req.url, req.proto] = firstLine.split(" ", 3); + [req.headers, err] = await tp.readMIMEHeader(); + return [req, err]; +} + /** Continuously read more requests from conn until EOF * bufr is empty on a fresh TCP connection. * Would be passed around and reused for later request on same conn @@ -259,34 +287,12 @@ async function* iterateHttpRequests( conn: Conn ): AsyncIterableIterator { const httpConn = createHttpConn(conn); - const bufr = new BufReader(httpConn); - const bufw = new BufWriter(httpConn); - const tp = new TextProtoReader(bufr); - let bufStateErr: BufState; + let req: ServerRequest; for (;;) { - const req = new ServerRequest(); - - // Set and incr pipeline id; - req.pipelineId = ++httpConn.lastPipelineId; - // Set a new pipeline deferred associated with this request - // for future requests to wait for. - httpConn.pendingDeferredMap.set(req.pipelineId, deferred()); - - req.conn = httpConn; - req.r = bufr; - req.w = bufw; - - // First line: GET /index.html HTTP/1.0 - let firstLine: string; - [firstLine, bufStateErr] = await tp.readLine(); - if (bufStateErr !== null) break; - [req.method, req.url, req.proto] = firstLine.split(" ", 3); - - [req.headers, bufStateErr] = await tp.readMIMEHeader(); - if (bufStateErr !== null) break; - + [req, bufStateErr] = await readRequest(httpConn, bufr); + if (bufStateErr) break; yield req; } From 635e6b6a49edb94adb8febbb23d9c0154563f5cd Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Sat, 18 May 2019 10:15:51 -0400 Subject: [PATCH 10/22] Add TODO comments --- http/server.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/http/server.ts b/http/server.ts index 14b2110f2548..1fba110e1432 100644 --- a/http/server.ts +++ b/http/server.ts @@ -10,6 +10,7 @@ import { STATUS_TEXT } from "./http_status.ts"; import { assert, fail } from "../testing/asserts.ts"; import { deferred, Deferred, MuxAsyncIterator } from "../util/async.ts"; +// TODO(ry) This should be a class, not an interface. interface HttpConn extends Conn { // When read by a newly created request B, lastId is the id pointing to a previous // request A, such that we must wait for responses to A to complete before @@ -260,6 +261,12 @@ async function readRequest( // for future requests to wait for. httpConn.pendingDeferredMap.set(req.pipelineId, deferred()); + // TODO(ry) Let's say the request has a body which is processed by the consumer + // of this iterator. We'd want to wait for this processing to be complete + // before reading a new set of headers. Therefore we might need an await + // statement after this yield, e.g. await req.done where done is a Promise + // that's resolved when the request has been processed. + req.conn = httpConn; req.r = bufr; req.w = new BufWriter(httpConn); From eab25c54f488b743906c62178773a81b67b3ea1b Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Sat, 18 May 2019 13:03:03 -0700 Subject: [PATCH 11/22] Wait for req body to be read and simplify further --- http/server.ts | 131 ++++++++++++++++--------------------------------- 1 file changed, 41 insertions(+), 90 deletions(-) diff --git a/http/server.ts b/http/server.ts index 1fba110e1432..6ab5ab399925 100644 --- a/http/server.ts +++ b/http/server.ts @@ -10,28 +10,6 @@ import { STATUS_TEXT } from "./http_status.ts"; import { assert, fail } from "../testing/asserts.ts"; import { deferred, Deferred, MuxAsyncIterator } from "../util/async.ts"; -// TODO(ry) This should be a class, not an interface. -interface HttpConn extends Conn { - // When read by a newly created request B, lastId is the id pointing to a previous - // request A, such that we must wait for responses to A to complete before - // writing B's response. - lastPipelineId: number; - pendingDeferredMap: Map>; -} - -function createHttpConn(c: Conn): HttpConn { - const httpConn = Object.assign(c, { - lastPipelineId: 0, - pendingDeferredMap: new Map() - }); - - const d = deferred(); - d.resolve(); // The first request is ready immediately. - httpConn.pendingDeferredMap.set(0, d); - - return httpConn; -} - function bufWriter(w: Writer): BufWriter { if (w instanceof BufWriter) { return w; @@ -139,14 +117,14 @@ async function readAllIterator( } export class ServerRequest { - pipelineId: number; url: string; method: string; proto: string; headers: Headers; - conn: HttpConn; + conn: Conn; r: BufReader; w: BufWriter; + done: Deferred = deferred(); public async *bodyStream(): AsyncIterableIterator { if (this.headers.has("content-length")) { @@ -229,47 +207,22 @@ export class ServerRequest { } async respond(r: Response): Promise { - // Check and wait if the previous request is done responding. - const lastPipelineId = this.pipelineId - 1; - const lastPipelineDeferred = this.conn.pendingDeferredMap.get( - lastPipelineId - ); - assert(!!lastPipelineDeferred); - await lastPipelineDeferred; - // If yes, delete old deferred and proceed with writing. - this.conn.pendingDeferredMap.delete(lastPipelineId); // Write our response! await writeResponse(this.w, r); - // Signal the next pending request that it can start writing. - const currPipelineDeferred = this.conn.pendingDeferredMap.get( - this.pipelineId - ); - assert(!!currPipelineDeferred); - currPipelineDeferred.resolve(); + // Signal that this request has been processed and the next pipelined + // request on the same connection can be accepted. + this.done.resolve(); } } async function readRequest( - httpConn: HttpConn, + conn: Conn, bufr: BufReader ): Promise<[ServerRequest, BufState]> { const req = new ServerRequest(); - - // Set and incr pipeline id; - req.pipelineId = ++httpConn.lastPipelineId; - // Set a new pipeline deferred associated with this request - // for future requests to wait for. - httpConn.pendingDeferredMap.set(req.pipelineId, deferred()); - - // TODO(ry) Let's say the request has a body which is processed by the consumer - // of this iterator. We'd want to wait for this processing to be complete - // before reading a new set of headers. Therefore we might need an await - // statement after this yield, e.g. await req.done where done is a Promise - // that's resolved when the request has been processed. - - req.conn = httpConn; + req.conn = conn; req.r = bufr; - req.w = new BufWriter(httpConn); + req.w = new BufWriter(conn); const tp = new TextProtoReader(bufr); let err: BufState; // First line: GET /index.html HTTP/1.0 @@ -283,37 +236,6 @@ async function readRequest( return [req, err]; } -/** Continuously read more requests from conn until EOF - * bufr is empty on a fresh TCP connection. - * Would be passed around and reused for later request on same conn - * TODO: make them async function after this change is done - * https://github.com/tc39/ecma262/pull/1250 - * See https://v8.dev/blog/fast-async - */ -async function* iterateHttpRequests( - conn: Conn -): AsyncIterableIterator { - const httpConn = createHttpConn(conn); - const bufr = new BufReader(httpConn); - let bufStateErr: BufState; - let req: ServerRequest; - for (;;) { - [req, bufStateErr] = await readRequest(httpConn, bufr); - if (bufStateErr) break; - yield req; - } - - if (bufStateErr === "EOF") { - // The connection was gracefully closed. - } else if (bufStateErr instanceof Error) { - // TODO(ry): send something back like a HTTP 500 status. - } else { - fail(`unexpected BufState: ${bufStateErr}`); - } - - httpConn.close(); -} - export class Server implements AsyncIterable { private closing = false; @@ -324,21 +246,50 @@ export class Server implements AsyncIterable { this.listener.close(); } - async *iterateRequestsOnNewConnection( + /** Yield all HTTP requests on a single TCP connection. */ + async *iterateHttpRequests(conn: Conn): AsyncIterableIterator { + const bufr = new BufReader(conn); + let bufStateErr: BufState; + let req: ServerRequest; + + while (!this.closing) { + [req, bufStateErr] = await readRequest(conn, bufr); + if (bufStateErr) break; + yield req; + // Wait for the request to be processed before we accept a new request on + // this connection. + await req.done; + } + + if (bufStateErr === "EOF") { + // The connection was gracefully closed. + } else if (bufStateErr instanceof Error) { + // TODO(ry): send something back like a HTTP 500 status. + } else if (this.closing) { + // There are more requests incoming but the server is closing. + // TODO(ry): send a back a HTTP 503 Service Unavailable status. + } else { + fail(`unexpected BufState: ${bufStateErr}`); + } + + conn.close(); + } + + async *acceptConnAndIterateHttpRequests( mux: MuxAsyncIterator ): AsyncIterableIterator { if (this.closing) return; // Wait for a new connection. const conn = await this.listener.accept(); // Try to accept another connection and add it to the multiplexer. - mux.add(this.iterateRequestsOnNewConnection(mux)); + mux.add(this.acceptConnAndIterateHttpRequests(mux)); // Yield the requests that arrive on the just-accepted connection. - yield* iterateHttpRequests(conn); + yield* this.iterateHttpRequests(conn); } [Symbol.asyncIterator](): AsyncIterableIterator { const mux: MuxAsyncIterator = new MuxAsyncIterator(); - mux.add(this.iterateRequestsOnNewConnection(mux)); + mux.add(this.acceptConnAndIterateHttpRequests(mux)); return mux; } } From 6bcd5cb5efee943d20355d0faebd387773500cb3 Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Sat, 18 May 2019 13:14:42 -0700 Subject: [PATCH 12/22] fix test --- http/server_test.ts | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/http/server_test.ts b/http/server_test.ts index 904a667c7f9d..782feda0162e 100644 --- a/http/server_test.ts +++ b/http/server_test.ts @@ -48,13 +48,8 @@ test(async function responseWrite(): Promise { const buf = new Buffer(); const bufw = new BufWriter(buf); const request = new ServerRequest(); - request.pipelineId = 1; request.w = bufw; - const d0 = deferred(); - d0.resolve(); - const d1 = deferred(); - request.conn = { localAddr: "", remoteAddr: "", @@ -67,13 +62,12 @@ test(async function responseWrite(): Promise { write: async (): Promise => { return -1; }, - close: (): void => {}, - lastPipelineId: 0, - pendingDeferredMap: new Map([[0, d0], [1, d1]]) + close: (): void => {} }; await request.respond(testCase.response); assertEquals(buf.toString(), testCase.raw); + await request.done; } }); From e28c59c3b380e97f6cfecf8782f6866655c69d46 Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Sat, 18 May 2019 13:19:44 -0700 Subject: [PATCH 13/22] fix comments --- http/server.ts | 10 ++++++++-- http/server_test.ts | 1 - 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/http/server.ts b/http/server.ts index 6ab5ab399925..c804cd79814d 100644 --- a/http/server.ts +++ b/http/server.ts @@ -246,8 +246,10 @@ export class Server implements AsyncIterable { this.listener.close(); } - /** Yield all HTTP requests on a single TCP connection. */ - async *iterateHttpRequests(conn: Conn): AsyncIterableIterator { + // Yields all HTTP requests on a single TCP connection. + private async *iterateHttpRequests( + conn: Conn + ): AsyncIterableIterator { const bufr = new BufReader(conn); let bufStateErr: BufState; let req: ServerRequest; @@ -275,6 +277,10 @@ export class Server implements AsyncIterable { conn.close(); } + // Accepts a new TCP connection and yields all HTTP requests that arrive on + // it. When a connection is accepted, it also adds a creates a new iterator + // of the same kind and adds it to the request multiplexer so that more + // TCP connections can be accepted. async *acceptConnAndIterateHttpRequests( mux: MuxAsyncIterator ): AsyncIterableIterator { diff --git a/http/server_test.ts b/http/server_test.ts index 782feda0162e..396a0321a1ae 100644 --- a/http/server_test.ts +++ b/http/server_test.ts @@ -11,7 +11,6 @@ import { assertEquals } from "../testing/asserts.ts"; import { Response, ServerRequest, writeResponse } from "./server.ts"; import { BufReader, BufWriter } from "../io/bufio.ts"; import { StringReader } from "../io/readers.ts"; -import { deferred } from "../util/async.ts"; interface ResponseTest { response: Response; From 1ec3e1b871f92c5bfb35b11cef48c07ff1d5739a Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Sat, 18 May 2019 13:23:45 -0700 Subject: [PATCH 14/22] Grammar --- http/server.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/http/server.ts b/http/server.ts index c804cd79814d..b6c7adaacd1e 100644 --- a/http/server.ts +++ b/http/server.ts @@ -278,9 +278,9 @@ export class Server implements AsyncIterable { } // Accepts a new TCP connection and yields all HTTP requests that arrive on - // it. When a connection is accepted, it also adds a creates a new iterator - // of the same kind and adds it to the request multiplexer so that more - // TCP connections can be accepted. + // it. When a connection is accepted, it also creates a new iterator of the + // same kind and adds it to the request multiplexer so that another TCP + // connection can be accepted. async *acceptConnAndIterateHttpRequests( mux: MuxAsyncIterator ): AsyncIterableIterator { From 0a140a32d62952459aa544e28ed52fb1c28c8c57 Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Sat, 18 May 2019 13:36:26 -0700 Subject: [PATCH 15/22] private --- http/server.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/http/server.ts b/http/server.ts index b6c7adaacd1e..b80f307e178d 100644 --- a/http/server.ts +++ b/http/server.ts @@ -281,7 +281,7 @@ export class Server implements AsyncIterable { // it. When a connection is accepted, it also creates a new iterator of the // same kind and adds it to the request multiplexer so that another TCP // connection can be accepted. - async *acceptConnAndIterateHttpRequests( + private async *acceptConnAndIterateHttpRequests( mux: MuxAsyncIterator ): AsyncIterableIterator { if (this.closing) return; From deeeb61fd7556d3e29c3ed5c5798e16f848ff395 Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Sun, 19 May 2019 15:40:53 -0700 Subject: [PATCH 16/22] Improve performance --- http/server.ts | 2 +- util/async.ts | 72 +++++++++++++++++++++++--------------------------- 2 files changed, 34 insertions(+), 40 deletions(-) diff --git a/http/server.ts b/http/server.ts index b80f307e178d..281f8d30254f 100644 --- a/http/server.ts +++ b/http/server.ts @@ -296,7 +296,7 @@ export class Server implements AsyncIterable { [Symbol.asyncIterator](): AsyncIterableIterator { const mux: MuxAsyncIterator = new MuxAsyncIterator(); mux.add(this.acceptConnAndIterateHttpRequests(mux)); - return mux; + return mux.iterate(); } } diff --git a/util/async.ts b/util/async.ts index 7e60bf416327..7cf10b59a330 100644 --- a/util/async.ts +++ b/util/async.ts @@ -29,9 +29,9 @@ export function deferred(): Deferred { return Object.assign(promise, methods) as Deferred; } -interface WrappedIteratorResult { +interface TaggedYieldedValue { iterator: AsyncIterableIterator; - result: IteratorResult; + value: T; } /** The MuxAsyncIterator class multiplexes multiple async iterators into a @@ -39,53 +39,47 @@ interface WrappedIteratorResult { * - The iterators do not throw. * - The final result (the value returned and not yielded from the iterator) * does not matter; if there is any, it is discarded. - * - Adding an iterator while the multiplexer is blocked does not take effect - * immediately. */ -export class MuxAsyncIterator implements AsyncIterableIterator { - private iteratorNextPromiseMap: Map< - AsyncIterableIterator, - Promise> - > = new Map(); +export class MuxAsyncIterator implements AsyncIterable { + private iteratorCount = 0; + private yields: Array> = []; + private signal: Deferred = deferred(); - private async wrapIteratorNext( - iterator: AsyncIterableIterator - ): Promise> { - return { iterator, result: await iterator.next() }; + add(iterator: AsyncIterableIterator): void { + ++this.iteratorCount; + this.callIteratorNext(iterator); + this.signal.resolve(); } - add(iterator: AsyncIterableIterator): void { - this.iteratorNextPromiseMap.set(iterator, this.wrapIteratorNext(iterator)); + private async callIteratorNext(iterator: AsyncIterableIterator) { + const { value, done } = await iterator.next(); + if (done) { + this.iteratorCount--; + } else { + this.yields.push({ iterator, value }); + } + this.signal.resolve(); } - async next(): Promise> { - while (this.iteratorNextPromiseMap.size > 0) { - // Wait for the next iteration result of any of the iterators, whichever - // yields first. - const { iterator, result }: WrappedIteratorResult = await Promise.race( - this.iteratorNextPromiseMap.values() - ); - assert(this.iteratorNextPromiseMap.has(iterator)); + async *iterate(): AsyncIterableIterator { + while (this.iteratorCount > 0) { + // Sleep until any of the wrapped iterators yields. + await this.signal; - if (result.done) { - // The iterator that yielded is done, remove it from the map. - this.iteratorNextPromiseMap.delete(iterator); - } else { - // The iterator has yielded a value. Call `next()` on it, wrap the - // returned promise, and store it in the map. - this.iteratorNextPromiseMap.set( - iterator, - this.wrapIteratorNext(iterator) - ); - return result; + // Note that while we're looping over `yields`, new items may be added. + for (let i = 0; i < this.yields.length; i++) { + const { iterator, value } = this.yields[i]; + yield value; + this.callIteratorNext(iterator); } - } - // There are no iterators left in the multiplexer, so report we're done. - return { value: null, done: true }; + // Clear the `yields` list and reset the `signal` promise. + this.yields.length = 0; + this.signal = deferred(); + } } - [Symbol.asyncIterator](): MuxAsyncIterator { - return this; + [Symbol.asyncIterator](): AsyncIterableIterator { + return this.iterate(); } } From 6dfe3491887586cb2367bc3dbbe6e907a4746cbf Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Sun, 19 May 2019 15:47:25 -0700 Subject: [PATCH 17/22] fix --- util/async.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/util/async.ts b/util/async.ts index 7cf10b59a330..089dd49b0af8 100644 --- a/util/async.ts +++ b/util/async.ts @@ -1,5 +1,4 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -import { assert } from "../testing/asserts.ts"; // TODO(ry) It'd be better to make Deferred a class that inherits from // Promise, rather than an interface. This is possible in ES2016, however @@ -51,7 +50,9 @@ export class MuxAsyncIterator implements AsyncIterable { this.signal.resolve(); } - private async callIteratorNext(iterator: AsyncIterableIterator) { + private async callIteratorNext( + iterator: AsyncIterableIterator + ): Promise { const { value, done } = await iterator.next(); if (done) { this.iteratorCount--; From e95738986a084196e189bee9cc7ddbc7124c6ebf Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Sun, 19 May 2019 16:35:59 -0700 Subject: [PATCH 18/22] Remove unnecessary --- util/async.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/util/async.ts b/util/async.ts index 089dd49b0af8..f9f2477d06b6 100644 --- a/util/async.ts +++ b/util/async.ts @@ -47,7 +47,6 @@ export class MuxAsyncIterator implements AsyncIterable { add(iterator: AsyncIterableIterator): void { ++this.iteratorCount; this.callIteratorNext(iterator); - this.signal.resolve(); } private async callIteratorNext( @@ -55,7 +54,7 @@ export class MuxAsyncIterator implements AsyncIterable { ): Promise { const { value, done } = await iterator.next(); if (done) { - this.iteratorCount--; + --this.iteratorCount; } else { this.yields.push({ iterator, value }); } From 025602b8e68494ef99d4c03e0e8c4dfb7d4fb3b6 Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Sun, 19 May 2019 19:12:13 -0700 Subject: [PATCH 19/22] Fix MuxAsyncIterator test --- util/async_test.ts | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/util/async_test.ts b/util/async_test.ts index 0f9bf28a8c15..03e76f22eb92 100644 --- a/util/async_test.ts +++ b/util/async_test.ts @@ -25,13 +25,9 @@ test(async function asyncMuxAsyncIterator(): Promise { mux.add(gen123()); mux.add(gen456()); const results = new Set(); - for (let i = 0; i < 6; i++) { - let r = await mux.next(); - assert(!r.done); - results.add(r.value); + for await (const value of mux) { + results.add(value); } - let r = await mux.next(); - assert(r.done); assertEquals(results.size, 6); }); From bedb5e6b793e120101a06eeef0bd487f7bc9cf5d Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Sun, 19 May 2019 19:12:21 -0700 Subject: [PATCH 20/22] Actually run util tests --- test.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/test.ts b/test.ts index cc921cf9f150..221bbc9858b8 100755 --- a/test.ts +++ b/test.ts @@ -16,6 +16,7 @@ import "./strings/test.ts"; import "./testing/test.ts"; import "./textproto/test.ts"; import "./toml/test.ts"; +import "./util/test.ts"; import "./ws/test.ts"; import "./testing/main.ts"; From 45ee8fa495a88f4582e4254d1c033a52a8072202 Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Sun, 19 May 2019 19:15:06 -0700 Subject: [PATCH 21/22] fix --- util/async_test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/async_test.ts b/util/async_test.ts index 03e76f22eb92..c704002d4108 100644 --- a/util/async_test.ts +++ b/util/async_test.ts @@ -1,6 +1,6 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. import { test, runIfMain } from "../testing/mod.ts"; -import { assert, assertEquals } from "../testing/asserts.ts"; +import { assertEquals } from "../testing/asserts.ts"; import { MuxAsyncIterator, deferred } from "./async.ts"; test(async function asyncDeferred(): Promise { From f92287f26bad0836157bab30b00d972985a163f4 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Mon, 20 May 2019 09:12:45 -0400 Subject: [PATCH 22/22] revert changes to prettier --- prettier/testdata/opts/0.ts | 4 ++-- prettier/testdata/opts/1.ts | 2 +- prettier/testdata/opts/2.ts | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/prettier/testdata/opts/0.ts b/prettier/testdata/opts/0.ts index 0277269118cf..fb85014a5e9e 100644 --- a/prettier/testdata/opts/0.ts +++ b/prettier/testdata/opts/0.ts @@ -1,2 +1,2 @@ -console.log(0); -console.log([function foo() {}, function baz() {}, (a) => {}]); +console.log(0) +console.log([function foo() {}, function baz() {}, a => {}]) diff --git a/prettier/testdata/opts/1.ts b/prettier/testdata/opts/1.ts index 4748527e94fa..c23a66c28e91 100644 --- a/prettier/testdata/opts/1.ts +++ b/prettier/testdata/opts/1.ts @@ -1 +1 @@ -console.log('1'); +console.log ("1") diff --git a/prettier/testdata/opts/2.ts b/prettier/testdata/opts/2.ts index e0c70341fd62..2cfe9aece463 100644 --- a/prettier/testdata/opts/2.ts +++ b/prettier/testdata/opts/2.ts @@ -1 +1 @@ -console.log({a: 1}); +console.log({a:1})