Skip to content

Commit

Permalink
feat: add new hooks api
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Sep 21, 2024
1 parent 290e0e1 commit 0e869e9
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 13 deletions.
5 changes: 4 additions & 1 deletion lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const {
} = require('./util')
const { channels } = require('./diagnostics.js')
const { headerNameLowerCasedRecord } = require('./constants')
const DecoratorHandler = require('../handler/decorator-handler')

// Verifies that a given path is valid does not contain control chars \x00 to \x20
const invalidPathRegex = /[^\u0021-\u00ff]/
Expand Down Expand Up @@ -187,7 +188,9 @@ class Request {

this.servername = servername || getServerName(this.host) || null

this[kHandler] = handler
this[kHandler] = handler instanceof DecoratorHandler
? handler
: new DecoratorHandler(handler)

if (channels.create.hasSubscribers) {
channels.create.publish({ request: this })
Expand Down
134 changes: 128 additions & 6 deletions lib/handler/decorator-handler.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
'use strict'

const assert = require('node:assert')
const util = require('../core/util')

function toRawHeaders (headers) {
const rawHeaders = []
if (headers != null) {
for (const [key, value] of Object.entries(headers)) {
rawHeaders.push(Buffer.from(key), Buffer.from(value))
}
}
return rawHeaders
}

module.exports = class DecoratorHandler {
#handler
#onCompleteCalled = false
#onErrorCalled = false
#resume = null

constructor (handler) {
if (typeof handler !== 'object' || handler === null) {
Expand All @@ -14,13 +26,96 @@ module.exports = class DecoratorHandler {
this.#handler = handler
}

// New API

onRequestStart (reserved, abort) {
if (this.#handler.onRequestStart) {
this.#handler.onRequestStart(null, abort)
}
}

onResponseStart (resume) {
this.#resume = resume

if (this.#handler.onResponseStart) {
return this.#handler.onResponseStart(resume)
}

return true
}

onResponseHeaders (statusCode, headers) {
if (this.#handler.onResponseHeaders) {
this.#handler.onResponseHeaders(headers, statusCode)
}

if (this.#handler.onConnect) {
this.#handler.onConnect(statusCode, toRawHeaders(headers), this.#resume)
}

return true
}

onResponseData (data) {
if (this.#handler.onResponseData) {
this.#handler.onResponseData(data)
}

if (this.#handler.onData) {
this.#handler.onData(data)
}

return true
}

onResponseEnd (trailers) {
if (this.#handler.onResponseEnd) {
this.#handler.onResponseEnd(trailers)
}

if (this.#handler.onComplete) {
this.#handler.onComplete(toRawHeaders(trailers))
}
}

onResponseError (err) {
if (this.#handler.onResponseError) {
this.#handler.onResponseError(err)
}

if (this.#handler.onError) {
this.#handler.onError(err)
}
}

// Old API

onRequestSent (...args) {
if (this.#handler.onRequestSent) {
this.#handler.onRequestSent(...args)
}
}

onConnect (...args) {
return this.#handler.onConnect?.(...args)
if (this.#handler.onRequestStart) {
this.#handler.onRequestStart(null, args[0])
}

if (this.#handler.onConnect) {
return this.#handler.onConnect(...args)
}
}

onError (...args) {
this.#onErrorCalled = true
return this.#handler.onError?.(...args)

if (this.#handler.onResponseError) {
this.#handler.onResponseError(args[0])
}

if (this.#handler.onError) {
return this.#handler.onError(...args)
}
}

onUpgrade (...args) {
Expand All @@ -32,7 +127,7 @@ module.exports = class DecoratorHandler {

onResponseStarted (...args) {
assert(!this.#onCompleteCalled)
assert(!this.#onErrorCalled)
// assert(!this.#onErrorCalled)

return this.#handler.onResponseStarted?.(...args)
}
Expand All @@ -41,22 +136,49 @@ module.exports = class DecoratorHandler {
assert(!this.#onCompleteCalled)
assert(!this.#onErrorCalled)

return this.#handler.onHeaders?.(...args)
let paused = false

if (this.#handler.onResponseStart) {
paused ||= this.#handler.onResponseStart(args[2]) === false
}

if (this.#handler.onResponseHeaders) {
paused ||= this.#handler.onResponseHeaders(util.parseHeaders(args[1]), args[0]) === false
}

if (this.#handler.onHeaders) {
paused ||= this.#handler.onHeaders(...args) === false
}

return paused
}

onData (...args) {
assert(!this.#onCompleteCalled)
assert(!this.#onErrorCalled)

return this.#handler.onData?.(...args)
if (this.#handler.onResponseData) {
this.#handler.onResponseData(args[0])
}

if (this.#handler.onData) {
return this.#handler.onData(...args)
}
}

onComplete (...args) {
assert(!this.#onCompleteCalled)
assert(!this.#onErrorCalled)

this.#onCompleteCalled = true
return this.#handler.onComplete?.(...args)

if (this.#handler.onResponseEnd) {
this.#handler.onResponseEnd(util.parseHeaders(args[0]))
}

if (this.#handler.onComplete) {
return this.#handler.onComplete(...args)
}
}

onBodySent (...args) {
Expand Down
18 changes: 12 additions & 6 deletions types/dispatcher.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,19 +214,25 @@ declare namespace Dispatcher {
}
export type StreamFactory<TOpaque = null> = (data: StreamFactoryData<TOpaque>) => Writable
export interface DispatchHandlers {
/** Invoked before request is dispatched on socket. May be invoked multiple times when a request is retried when the request at the head of the pipeline fails. */
onRequestStart(reserved: null, abort: (err?: Error) => void): void;
onResponseStart(resume: () => void): boolean;
onResponseHeaders(headers: Record<string, string>, statusCode: number): boolean;
onResponseData(chunk: Buffer): boolean;
onResponseEnd(trailers: Record<string, string>): void;
onResponseError(err: Error) : void;
/** @deprecated Invoked before request is dispatched on socket. May be invoked multiple times when a request is retried when the request at the head of the pipeline fails. */
onConnect?(abort: (err?: Error) => void): void;
/** Invoked when an error has occurred. */
/** @deprecated Invoked when an error has occurred. */
onError?(err: Error): void;
/** Invoked when request is upgraded either due to a `Upgrade` header or `CONNECT` method. */
onUpgrade?(statusCode: number, headers: Buffer[] | string[] | null, socket: Duplex): void;
/** Invoked when response is received, before headers have been read. **/
/** @deprecated Invoked when response is received, before headers have been read. **/
onResponseStarted?(): void;
/** Invoked when statusCode and headers have been received. May be invoked multiple times due to 1xx informational headers. */
/** @deprecated Invoked when statusCode and headers have been received. May be invoked multiple times due to 1xx informational headers. */
onHeaders?(statusCode: number, headers: Buffer[], resume: () => void, statusText: string): boolean;
/** Invoked when response payload data is received. */
/** @deprecated Invoked when response payload data is received. */
onData?(chunk: Buffer): boolean;
/** Invoked when response payload and trailers have been received and the request has completed. */
/** @deprecated Invoked when response payload and trailers have been received and the request has completed. */
onComplete?(trailers: string[] | null): void;
/** Invoked when a body chunk is sent to the server. May be invoked multiple times for chunked requests */
onBodySent?(chunkSize: number, totalBytesSent: number): void;
Expand Down

0 comments on commit 0e869e9

Please sign in to comment.