Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hybrid node:http + fetch approach to node-fetch-server #33

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
**/*.tsbuildinfo

.nx/cache
.nx/workspace-data
.nx/workspace-data
.local
14 changes: 14 additions & 0 deletions packages/headers/src/lib/super-headers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,20 @@ export class SuperHeaders extends Headers {

// Header-specific getters and setters

/**
* The `Host` header field in a request provides the host and port information
* from the target URI, enabling the origin server to distinguish among
* resources while servicing requests for multiple host names on a single IP
* address.
*
* [MD `Host` Reference](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Host)
*
* [HTTP/1.1 Specification](https://datatracker.ietf.org/doc/html/rfc7230#section-5.4)
*/
get host() {
return this.get('host');
}
Comment on lines +257 to +259
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please put this in a separate PR, and add a setter as well? Thanks 🙏


/**
* The `Accept-Language` header contains information about preferred natural language for the
* response.
Expand Down
6 changes: 5 additions & 1 deletion packages/node-fetch-server/bench/runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ run_benchmark() {
local server_pid=$!

# Wait for the server to start
sleep 2
# sleep 2
sleep 5

wrk -t12 -c400 -d30s http://127.0.0.1:3000/

Expand All @@ -37,6 +38,9 @@ NODE_FETCH_SERVER_VERSION=$(node -e 'console.log(require("../package.json").vers
run_benchmark "node-fetch-server@$NODE_FETCH_SERVER_VERSION" \
"node --import @swc-node/register/esm-register ./servers/node-fetch-server.ts"

run_benchmark "create-fetch-server@$NODE_FETCH_SERVER_VERSION" \
"node --import @swc-node/register/esm-register ./servers/create-fetch-server.ts"

EXPRESS_VERSION=$(node -e 'console.log(require("express/package.json").version)')
run_benchmark "express@$EXPRESS_VERSION" \
"node --import @swc-node/register/esm-register ./servers/express.ts"
Expand Down
18 changes: 18 additions & 0 deletions packages/node-fetch-server/bench/servers/create-fetch-server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { createFetchServer } from '@mjackson/node-fetch-server';

const PORT = process.env.PORT || 3000;

let server = createFetchServer(() => {
let stream = new ReadableStream({
start(controller) {
controller.enqueue('<html><body><h1>Hello, world!</h1></body></html>');
controller.close();
},
});

return new Response(stream, {
headers: { 'Content-Type': 'text/html' },
});
});

server.listen(PORT);
9 changes: 7 additions & 2 deletions packages/node-fetch-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,21 @@
},
"./package.json": "./package.json"
},
"dependencies": {
"@mjackson/headers": "workspace:^"
},
"devDependencies": {
"@swc-node/register": "^1.10.9",
"@types/node": "^22.5.0",
"tsup": "^8.3.5",
"typescript": "^5.6.3"
"typescript": "^5.6.3",
"undici-types": "^6.21.0"
},
"scripts": {
"bench": "bash ./bench/runner.sh",
"build": "tsup",
"test": "node --import @swc-node/register/esm-register --test ./src/**/*.spec.ts",
"debug": "node --import @swc-node/register/esm-register ./src/test.ts",
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like ./src/test.ts is missing?

"test": "node --import @swc-node/register/esm-register --test ./src/**/*.spec.ts --test ./src/**/**/*.spec.ts",
"prepare": "pnpm run build",
"release": "node --import @swc-node/register/esm-register ../../scripts/release.ts"
},
Expand Down
74 changes: 74 additions & 0 deletions packages/node-fetch-server/src/lib/create-fetch-server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import * as http from 'node:http';

import type { ClientAddress, ErrorHandler, FetchHandler } from './fetch-handler.js';

import { FetchIncomingMessage as IncomingMessage } from './fetch-incoming-message/index.js';
import { internalServerError } from './utils.js';
import { sendResponse } from './request-listener.js';

export interface RequestListenerOptions {
/**
* Overrides the host portion of the incoming request URL. By default the request URL host is
* derived from the HTTP `Host` header.
*
* For example, if you have a `$HOST` environment variable that contains the hostname of your
* server, you can use it to set the host of all incoming request URLs like so:
*
* ```ts
* createRequestListener(handler, { host: process.env.HOST })
* ```
*/
host?: string;
/**
* An error handler that determines the response when the request handler throws an error. By
* default a 500 Internal Server Error response will be sent.
*/
onError?: ErrorHandler;
/**
* Overrides the protocol of the incoming request URL. By default the request URL protocol is
* derived from the connection protocol. So e.g. when serving over HTTPS (using
* `https.createServer()`), the request URL will begin with `https:`.
*/
protocol?: string;
}

export function createFetchServer(handler: FetchHandler, options?: RequestListenerOptions) {
let onError = options?.onError ?? defaultErrorHandler;

// @ts-expect-error
return http.createServer<IncomingMessage>(
{ IncomingMessage },
Comment on lines +39 to +40
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting! I had no idea the createServer API could use a custom IncomingMessage class. Very cool find 👍

async (req: IncomingMessage, res) => {
let controller = new AbortController();
res.on('close', () => {
controller.abort();
});

let client = {
address: req.socket.remoteAddress!,
family: req.socket.remoteFamily! as ClientAddress['family'],
port: req.socket.remotePort!,
};

let response: Response;
try {
// @ts-expect-error
response = await handler(req, client);
} catch (error) {
try {
response = (await onError(error)) ?? internalServerError();
} catch (error) {
console.error(`There was an error in the error handler: ${error}`);
response = internalServerError();
}
}

await sendResponse(res, response);
},
);
}

function defaultErrorHandler(error: unknown): Response {
console.error(error);
return internalServerError();
}
227 changes: 227 additions & 0 deletions packages/node-fetch-server/src/lib/fetch-incoming-message/body.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
import { fullyReadBody, isDisturbed } from './util.js';

const textDecoder = new TextDecoder();

/**
* @param {any} state internal state
* @param {(value: unknown) => unknown} convertBytesToJSValue
* @see https://fetch.spec.whatwg.org/#concept-body-consume-body
*/
export async function consumeBody<T>(state: any, convertBytesToJSValue: (value: Buffer) => T) {
// 1. If object is unusable, then return a promise rejected
// with a TypeError.
if (bodyUnusable(state)) {
throw new TypeError('Body is unusable: Body has already been read');
}

throwIfAborted(state);

// 2. Let promise be a new promise.
const promise = createDeferredPromise<T>();

// 3. Let errorSteps given error be to reject promise with error.
const errorSteps = (error: any) => promise.reject(error);

// 4. Let successSteps given a byte sequence data be to resolve
// promise with the result of running convertBytesToJSValue
// with data. If that threw an exception, then run errorSteps
// with that exception.
const successSteps = (data: Buffer) => {
try {
promise.resolve(convertBytesToJSValue(data));
} catch (e) {
errorSteps(e);
}
};

// 5. If object’s body is null, then run successSteps with an
// empty byte sequence.
if (state.body == null) {
successSteps(Buffer.allocUnsafe(0));
return promise.promise;
}

// 6. Otherwise, fully read object’s body given successSteps,
// errorSteps, and object’s relevant global object.
fullyReadBody(state.body, successSteps, errorSteps);

// 7. Return promise.
return promise.promise;
}

/**
* @see https://encoding.spec.whatwg.org/#utf-8-decode
* @param {Buffer} buffer
*/
export function utf8DecodeBytes(buffer: Buffer) {
if (buffer.length === 0) {
return '';
}

// 1. Let buffer be the result of peeking three bytes from
// ioQueue, converted to a byte sequence.

// 2. If buffer is 0xEF 0xBB 0xBF, then read three
// bytes from ioQueue. (Do nothing with those bytes.)
if (buffer[0] === 0xef && buffer[1] === 0xbb && buffer[2] === 0xbf) {
buffer = buffer.subarray(3);
}

// 3. Process a queue with an instance of UTF-8’s
// decoder, ioQueue, output, and "replacement".
const output = textDecoder.decode(buffer);

// 4. Return output.
return output;
}

/**
* @param {Buffer} bytes
* @see https://infra.spec.whatwg.org/#parse-json-bytes-to-a-javascript-value
*/
export function parseJSONFromBytes(bytes: Buffer) {
const json = utf8DecodeBytes(bytes);
return JSON.parse(json);
}

//#region Helpers

/**
* @param {any} state internal state
* @see https://fetch.spec.whatwg.org/#body-unusable
*/
function bodyUnusable(state: any) {
const body = state.body;

// An object including the Body interface mixin is
// said to be unusable if its body is non-null and
// its body’s stream is disturbed or locked.
return body != null && (body.stream.locked || isDisturbed(body.stream));
}

function throwIfAborted(state: any) {
if (state.aborted) {
throw new DOMException('The operation was aborted.', 'AbortError');
}
}

function createDeferredPromise<T = unknown>() {
let _resolve!: (value: T) => void;
let _reject!: (reason?: any) => void;
const promise = new Promise<T>((resolve, reject) => {
_resolve = resolve;
_reject = reject;
});

return {
promise,
resolve: _resolve,
reject: _reject,
};
}

//#endregion Helpers

//#region Undici: Reference

// function bodyMixinMethods(instance, getInternalState) {
// const methods = {
// arrayBuffer() {
// // The arrayBuffer() method steps are to return the result
// // of running consume body with this and the following step
// // given a byte sequence bytes: return a new ArrayBuffer
// // whose contents are bytes.
// return consumeBody(
// this,
// (bytes) => {
// return new Uint8Array(bytes).buffer;
// },
// instance,
// getInternalState,
// );
// },

// text() {
// // The text() method steps are to return the result of running
// // consume body with this and UTF-8 decode.
// return consumeBody(this, utf8DecodeBytes, instance, getInternalState);
// },

// json() {
// // The json() method steps are to return the result of running
// // consume body with this and parse JSON from bytes.
// return consumeBody(this, parseJSONFromBytes, instance, getInternalState);
// },

// formData() {
// // The formData() method steps are to return the result of running
// // consume body with this and the following step given a byte sequence bytes:
// return consumeBody(
// this,
// (value) => {
// // 1. Let mimeType be the result of get the MIME type with this.
// const mimeType = bodyMimeType(getInternalState(this));

// // 2. If mimeType is non-null, then switch on mimeType’s essence and run
// // the corresponding steps:
// if (mimeType !== null) {
// switch (mimeType.essence) {
// case 'multipart/form-data': {
// // 1. ... [long step]
// // 2. If that fails for some reason, then throw a TypeError.
// const parsed = multipartFormDataParser(value, mimeType);

// // 3. Return a new FormData object, appending each entry,
// // resulting from the parsing operation, to its entry list.
// const fd = new FormData();
// setFormDataState(fd, parsed);

// return fd;
// }
// case 'application/x-www-form-urlencoded': {
// // 1. Let entries be the result of parsing bytes.
// const entries = new URLSearchParams(value.toString());

// // 2. If entries is failure, then throw a TypeError.

// // 3. Return a new FormData object whose entry list is entries.
// const fd = new FormData();

// for (const [name, value] of entries) {
// fd.append(name, value);
// }

// return fd;
// }
// }
// }

// // 3. Throw a TypeError.
// throw new TypeError(
// 'Content-Type was not one of "multipart/form-data" or "application/x-www-form-urlencoded".',
// );
// },
// instance,
// getInternalState,
// );
// },

// bytes() {
// // The bytes() method steps are to return the result of running consume body
// // with this and the following step given a byte sequence bytes: return the
// // result of creating a Uint8Array from bytes in this’s relevant realm.
// return consumeBody(
// this,
// (bytes) => {
// return new Uint8Array(bytes);
// },
// instance,
// getInternalState,
// );
// },
// };

// return methods;
// }

// #endregion Undici: Reference
Loading