feat(http): basic support for the client

This commit is contained in:
2026-04-29 21:32:21 +00:00
committed by codinget
parent bb0ce0c58e
commit 56461cdd32
29 changed files with 1548 additions and 413 deletions
+1
View File
@@ -14,6 +14,7 @@
"scripts": {
"build": "tsc --project tsconfig.json",
"test": "tsx --test 'src/**/*.test.ts'",
"test:watch": "tsx --test --watch 'src/**/*.test.ts'",
"typecheck": "tsc --project tsconfig.json --noEmit"
},
"devDependencies": {
+124
View File
@@ -0,0 +1,124 @@
import { ReadBuffer, WriteBuffer } from "../common/buffer.js"
import { shouldClose } from "../common/connection.js"
import { bodyReader, readHeaders, type BodyReader } from "../common/reader.js"
import { methods, statusCodeProperties, statusCodes, type Method } from "../common/spec.js"
import type { RawTransport } from "../common/types.js"
import { hasOwnProperty } from "../common/utils.js"
import { formatBody, sendBody, writeHeaders } from "../common/writer.js"
import { ClientResponseImpl } from "./objects.js"
import type { ClientRequest, ClientResponse } from "./types.js"
export class ClientConnection {
#transport: RawTransport
#readBuffer: ReadBuffer
#writeBuffer: WriteBuffer
#encoder: TextEncoder
#prevBody: BodyReader | null
constructor(transport: RawTransport) {
this.#transport = transport
this.#readBuffer = new ReadBuffer(transport)
this.#writeBuffer = new WriteBuffer(transport)
this.#encoder = new TextEncoder()
this.#prevBody = null
}
async send(request: ClientRequest): Promise<void> {
const method = request.method.toUpperCase()
const mp = hasOwnProperty(methods, method) ? methods[method as Method] : methods.PUT
if (request.body !== null && mp.requestBody === false) {
throw new TypeError(`Cannot send a body with ${method}`)
}
// convert body and set appropriate headers
const body = formatBody(request, this.#encoder, mp.requestBody !== false)
// send request line and headers
let headers = `${method} ${request.target} HTTP/${request.version}\r\n`
headers += writeHeaders(request.headers)
headers += "\r\n"
this.#writeBuffer.write(headers)
// if we have a body, send it
if (body) await sendBody(this.#writeBuffer, this.#encoder, body)
// and flush to make sure we sent everything
await this.#writeBuffer.flushAll()
}
async #parse(request: ClientRequest): Promise<ClientResponseImpl> {
const statusLine = await this.#readBuffer.readLine()
const parts = statusLine.split(" ")
if (parts.length < 2) throw new Error("Invalid status line")
const http = parts[0]
const status = +parts[1]
if (!isFinite(status) || status < 100 || status > 999)
throw new Error("Invalid code in status line")
const statusText = parts.slice(2).join(" ") || statusCodes[status] || "Unknown"
if (http !== "HTTP/1.1" && http !== "HTTP/1.0")
throw new Error("Invalid version in status line")
const version = http === "HTTP/1.1" ? "1.1" : "1.0"
const scp = statusCodeProperties[status] ?? {}
const headers = await readHeaders(this.#readBuffer)
let bodyStream: BodyReader | null = null
if (request.method.toUpperCase() !== "HEAD" && scp.responseBody !== false) {
bodyStream = bodyReader(this.#readBuffer, headers)
}
this.#prevBody = bodyStream
return new ClientResponseImpl({
version,
status,
statusText,
headers,
bodyStream,
})
}
parse(request: ClientRequest): Promise<ClientResponse> {
return this.#parse(request)
}
async #request(req: ClientRequest): Promise<ClientResponseImpl> {
if (this.#prevBody) {
while (!this.#prevBody.closed) await this.#prevBody.read()
this.#prevBody = null
}
await this.send(req)
const res = await this.#parse(req)
// close connections that should be done
if (shouldClose(req, res))
if (this.#prevBody) {
;(this.#prevBody as BodyReader).onFinish(undefined, undefined, () => this.close())
} else {
await this.close()
}
return res
}
request(req: ClientRequest): Promise<ClientResponse> {
return this.#request(req)
}
async close(): Promise<void> {
if (this.#transport.closed) return
await this.#transport.close()
}
get ended(): boolean {
return this.#readBuffer.ended
}
get prevBodyFinished(): boolean {
return !this.#prevBody || this.#prevBody.closed
}
onPrevBodyFinished(): Promise<void> {
if (this.prevBodyFinished) return Promise.resolve()
return new Promise((ok, ko) => {
this.#prevBody!.onFinish(ok, ko)
})
}
}
+180
View File
@@ -0,0 +1,180 @@
import { ClientRequestImpl } from "./objects.js"
import { PooledDialer, UnpooledDialer, type ConnectionPool } from "./pool.js"
import type { ClientRequestInit, ClientResponse, RawDialer } from "./types.js"
export interface Fetch {
(url: string | URL, options?: Omit<ClientRequestInit, "target">): Promise<ClientResponse>
(
options: Omit<ClientRequestInit, "target"> & {
url: string | URL
},
): Promise<ClientResponse>
}
async function rawFetch(
pool: ConnectionPool,
keepAlive: boolean,
urlOrOptions:
| string
| URL
| (Omit<ClientRequestInit, "target"> & {
url: string | URL
}),
maybeOptions?: Omit<ClientRequestInit, "target">,
): Promise<ClientResponse> {
let url =
typeof urlOrOptions === "object" && "url" in urlOrOptions ? urlOrOptions.url : urlOrOptions
if (typeof url === "string") url = new URL(url)
const options =
maybeOptions ??
(typeof urlOrOptions === "object" && !(urlOrOptions instanceof URL) ? urlOrOptions : {})
if (url.protocol !== "http:" && url.protocol !== "https:") {
throw new TypeError("Invalid protocol for fetch")
}
if (url.protocol === "https:" && !pool.tlsSupported) {
throw new TypeError("TLS is not supported on this dialer")
}
let rawPort = url.port
if (!rawPort) {
if (url.protocol === "http:") rawPort = "80"
else rawPort = "443"
}
const port = +rawPort
if (!isFinite(port) || port < 1 || port > 65535) {
throw new TypeError("Invalid port for fetch")
}
const req = new ClientRequestImpl({ ...options, target: url.pathname + url.search })
if (url.username || url.password) {
req.setHeader("Authorization", `Basic ${btoa([url.username, url.password].join(":"))}`)
}
if (!req.hasHeader("Host")) req.setHeader("Host", url.host)
if (keepAlive && !req.hasHeader("Connection")) req.setHeader("Connection", "keep-alive")
if (!keepAlive && req.getHeader("Connection") !== "upgrade") req.setHeader("Connection", "close")
const conn = await pool.getConnection(url.hostname, port, url.protocol === "https:")
try {
const result = await conn.request(req)
conn.onPrevBodyFinished().then(
() => {
pool.releaseConnection(url.hostname, port, url.protocol === "https:", conn)
},
() => {
pool.rejectConnection(url.hostname, port, url.protocol === "https:", conn)
},
)
return result
} catch (e) {
pool.rejectConnection(url.hostname, port, url.protocol === "https:", conn)
throw e
}
}
export function fetch(
dialer: RawDialer,
url: string | URL,
options?: Omit<ClientRequestInit, "target">,
): Promise<ClientResponse>
export function fetch(
dialer: RawDialer,
options: Omit<ClientRequestInit, "target"> & {
url: string | URL
},
): Promise<ClientResponse>
export function fetch(
dialer: RawDialer,
urlOrOptions:
| string
| URL
| (Omit<ClientRequestInit, "target"> & {
url: string | URL
}),
maybeOptions?: Omit<ClientRequestInit, "target">,
): Promise<ClientResponse> {
return rawFetch(new UnpooledDialer(dialer), false, urlOrOptions, maybeOptions)
}
export function makeFetch(
dialer: RawDialer,
params?: {
// how long to keep connections alive after they are used
// false: close connection every time
// true: use default keepalive duration
// 0: immediately close connection if not reused by a queued request
// positive number: keep inactive connections for that many ms
// @defaultValue 5000
keepAlive?: boolean | number
// max amount of concurrent connections for a given origin (host + port + tls)
// must be strictly positive
// @defaultValue 5
maxPerOrigin?: number
// max amount of concurrent connections in total
// must be strictly positive
// @defaultValue Infinity
max?: number
},
): Fetch & { pool: ConnectionPool }
export function makeFetch(
pool: ConnectionPool,
params?: {
// should fetch keep the connections alive
// @defaultValue true
keepAlive?: boolean
},
): Fetch
export function makeFetch(
dialerOrPool: RawDialer | ConnectionPool,
{
keepAlive = true,
maxPerOrigin = 5,
max = Infinity,
}: {
keepAlive?: boolean | number
maxPerOrigin?: number
max?: number
} = {},
): Fetch {
if ("dial" in dialerOrPool) {
if (keepAlive === true) keepAlive = 5000
if (typeof keepAlive === "number" && keepAlive < 0) {
throw new RangeError("Invalid keepAlive for makeFetch")
}
if (maxPerOrigin < 1) throw new RangeError("Invalid maxPerOrigin for makeFetch")
if (max < 1) throw new RangeError("Invalid max for makeFetch")
const pool = new PooledDialer(dialerOrPool, {
keepAlive,
maxPerOrigin,
max,
})
return Object.assign(
function fetch(
urlOrOptions:
| string
| URL
| (Omit<ClientRequestInit, "target"> & {
url: string | URL
}),
maybeOptions?: Omit<ClientRequestInit, "target">,
): Promise<ClientResponse> {
return rawFetch(pool, keepAlive !== false, urlOrOptions, maybeOptions)
},
{ pool },
)
} else {
if (typeof keepAlive !== "boolean") throw new TypeError("Invalid keepAlive for makeFetch")
return function fetch(
urlOrOptions:
| string
| URL
| (Omit<ClientRequestInit, "target"> & {
url: string | URL
}),
maybeOptions?: Omit<ClientRequestInit, "target">,
): Promise<ClientResponse> {
return rawFetch(dialerOrPool, keepAlive as boolean, urlOrOptions, maybeOptions)
}
}
}
+52
View File
@@ -0,0 +1,52 @@
import { ReadableHttpImpl, WritableHttpImpl } from "../common/objects.js"
import type { Reader } from "../common/types.js"
import type { ClientRequest, ClientRequestInit, ClientResponse } from "./types.js"
import type { Headers } from "../common/headers.js"
export class ClientRequestImpl extends WritableHttpImpl implements ClientRequest {
version: "1.0" | "1.1"
target: string
method: string
constructor({ method, target, body, headers, version }: ClientRequestInit) {
super({ headers, body })
this.version = version ?? "1.1"
this.target = target
this.method = method ?? (body ? "POST" : "GET")
}
}
export class ClientResponseImpl extends ReadableHttpImpl implements ClientResponse {
#version: "1.0" | "1.1"
#status: number
#statusText: string
constructor({
version,
headers,
status,
statusText,
bodyStream,
}: {
version: "1.0" | "1.1"
headers: Headers
status: number
statusText: string
bodyStream: Reader | null
}) {
super({ headers, bodyStream })
this.#version = version
this.#status = status
this.#statusText = statusText
}
get version(): "1.0" | "1.1" {
return this.#version
}
get status(): number {
return this.#status
}
get statusText(): string {
return this.#statusText
}
}
+236
View File
@@ -0,0 +1,236 @@
import { ClientConnection } from "./connection.js"
import type { RawDialer } from "./types.js"
export interface ConnectionPool {
tlsSupported: boolean
getConnection(host: string, port: number, tls: boolean): Promise<ClientConnection>
releaseConnection(host: string, port: number, tls: boolean, connection: ClientConnection): void
rejectConnection(host: string, port: number, tls: boolean, connection: ClientConnection): void
shutdown(): Promise<void>
}
export class UnpooledDialer implements ConnectionPool {
#dialer: RawDialer
constructor(dialer: RawDialer) {
this.#dialer = dialer
}
get tlsSupported(): boolean {
return !!this.#dialer.dialTls
}
async getConnection(host: string, port: number, tls: boolean): Promise<ClientConnection> {
if (tls) {
if (!this.#dialer.dialTls) throw new Error("TLS is not supported on this dialer")
return new ClientConnection(await this.#dialer.dialTls(host, port))
}
return new ClientConnection(await this.#dialer.dial(host, port))
}
releaseConnection(host: string, port: number, tls: boolean, connection: ClientConnection): void {
void host
void port
void tls
void connection
}
rejectConnection(host: string, port: number, tls: boolean, connection: ClientConnection): void {
void host
void port
void tls
void connection
}
async shutdown(): Promise<void> {
// do nothing, we're not really pooled in the first place
}
}
export class PooledDialer implements ConnectionPool {
#dialer: RawDialer
#alive: number
#pool: Map<
`${string}:${number}:${boolean}`,
{
alive: number
idle: ClientConnection[]
waiting: (() => void)[]
}
>
#timeout: Map<ClientConnection, ReturnType<typeof setTimeout>>
#keepAlive: number | false
#maxPerOrigin: number
#max: number
#shutdown: boolean
constructor(
dialer: RawDialer,
{
keepAlive,
maxPerOrigin,
max,
}: { keepAlive: number | false; maxPerOrigin: number; max: number },
) {
this.#dialer = dialer
this.#alive = 0
this.#pool = new Map()
this.#timeout = new Map()
this.#keepAlive = keepAlive
this.#maxPerOrigin = maxPerOrigin
this.#max = max
this.#shutdown = false
}
get tlsSupported(): boolean {
return !!this.#dialer.dialTls
}
#slot(host: string, port: number, tls: boolean) {
const key = `${host}:${port}:${tls}` as const
let slot = this.#pool.get(key)
if (!slot) {
slot = { alive: 0, idle: [], waiting: [] }
this.#pool.set(key, slot)
}
return slot
}
#notify(host: string, port: number, tls: boolean): void {
const slot = this.#pool.get(`${host}:${port}:${tls}`)
if (!slot) return
if (slot.alive < 0) throw new Error("Broken invariant: alive < 0")
if (slot.idle.length > slot.alive) throw new Error("Broken invariant: idle.length > alive")
if (!slot.alive && !slot.waiting.length) {
this.#pool.delete(`${host}:${port}:${tls}`)
return
}
const callback = slot.waiting.shift()
if (callback) callback()
}
async #connect(host: string, port: number, tls: boolean): Promise<ClientConnection> {
if (tls) {
if (!this.#dialer.dialTls) throw new Error("TLS is not supported on this dialer")
return new ClientConnection(await this.#dialer.dialTls(host, port))
}
return new ClientConnection(await this.#dialer.dial(host, port))
}
async #maybeConnect(host: string, port: number, tls: boolean): Promise<ClientConnection> {
if (this.#alive >= this.#max) {
//TODO: wait instead
throw new Error("Pool full")
}
const slot = this.#slot(host, port, tls)
if (slot.alive >= this.#maxPerOrigin) {
//TODO: wait instead
throw new Error("Per-origin full")
}
const connection = await this.#connect(host, port, tls)
slot.alive++
this.#alive++
return connection
}
async getConnection(host: string, port: number, tls: boolean): Promise<ClientConnection> {
if (this.#shutdown) throw new Error("Pool has been shot down")
const slot = this.#slot(host, port, tls)
if (slot.idle.length && !slot.waiting.length) {
// happy path
const connection = slot.idle.shift()!
if (this.#timeout.has(connection)) {
clearTimeout(this.#timeout.get(connection)!)
this.#timeout.delete(connection)
}
return connection
}
return this.#maybeConnect(host, port, tls)
}
releaseConnection(host: string, port: number, tls: boolean, connection: ClientConnection): void {
if (!connection) throw new TypeError("Missing connection")
if (this.#shutdown) return
if (connection.ended) return this.rejectConnection(host, port, tls, connection)
const slot = this.#slot(host, port, tls)
if (this.#keepAlive === false) {
this.#alive--
slot.alive--
connection.close()
} else {
slot.idle.push(connection)
}
const waiting = slot.waiting.shift()
if (waiting) waiting()
if (this.#keepAlive === false) return
const idx = slot.idle.indexOf(connection)
if (idx !== -1) {
if (this.#keepAlive === 0) {
slot.idle.splice(idx, 1)
this.#alive--
slot.alive--
connection.close()
const waiting = slot.waiting.shift()
if (waiting) waiting()
} else {
this.#timeout.set(
connection,
setTimeout(() => {
const idx = slot.idle.indexOf(connection)
if (idx === -1)
throw new Error("Invariant failed: connection not in idle after setTimeout")
slot.idle.splice(idx, 1)
this.#alive--
slot.alive--
connection.close()
const waiting = slot.waiting.shift()
if (waiting) waiting()
}, this.#keepAlive),
)
}
}
}
rejectConnection(host: string, port: number, tls: boolean, connection: ClientConnection): void {
if (!connection) throw new TypeError("Missing connection")
if (this.#shutdown) return
if (this.#timeout.has(connection)) {
clearTimeout(this.#timeout.get(connection)!)
this.#timeout.delete(connection)
}
const slot = this.#slot(host, port, tls)
this.#alive--
slot.alive--
this.#notify(host, port, tls)
}
async shutdown(): Promise<void> {
const connections: ClientConnection[] = []
const waiting: (() => void)[] = []
for (const slot of this.#pool.values()) {
for (const connection of slot.idle) connections.push(connection)
slot.idle = []
for (const handler of slot.waiting) waiting.push(handler)
slot.alive = 0
}
for (const timeout of this.#timeout.values()) clearTimeout(timeout)
this.#alive = 0
this.#timeout = new Map()
this.#pool = new Map()
const errors: unknown[] = []
for (const handler of waiting) {
try {
handler()
} catch (e) {
errors.push(e)
}
}
for (const result of await Promise.allSettled(connections.map((x) => x.close()))) {
if (result.status === "rejected") errors.push(result.reason)
}
if (errors.length) throw new AggregateError(errors)
}
}
+21
View File
@@ -0,0 +1,21 @@
import type { RawTransport, ReadableHttp, WritableHttp } from "../common/types.js"
export interface RawDialer {
dial(host: string, port: number): Promise<RawTransport>
dialTls?(host: string, port: number): Promise<RawTransport>
}
export interface ClientRequest extends WritableHttp {
method: string
target: string
version: "1.0" | "1.1"
}
export interface ClientResponse extends ReadableHttp {
readonly status: number
readonly statusText: string
readonly version: "1.0" | "1.1"
}
export type ClientRequestInit = Pick<ClientRequest, "target"> &
Partial<Pick<ClientRequest, "method" | "headers" | "version" | "body">>
@@ -84,7 +84,11 @@ export class ReadBuffer {
buffers.push(this.#buffers[buf])
buf++
}
if (len) buffers.push(this.#buffers[buf].slice(0, len))
if (len) {
const chunk = this.#buffers[buf].slice(0, len)
buffers.push(chunk)
if (chunk.length !== len) throw new Error("Couldn't read the specified size")
}
return buffers
}
#sliceStr(start: number, len: number): string {
@@ -97,10 +101,10 @@ export class ReadBuffer {
async readLine(maxLen: number = 1024 * 128): Promise<string> {
const crIdx = await this.#readUntilByte(0x0d, maxLen) // CR
if (crIdx === -1) throw new Error("Couldn't find CR")
if (crIdx === -1) throw new Error("Line too long")
if (this.len < crIdx + 2) await this.read(crIdx + 2)
if (this.#at(crIdx + 1) !== 0x0a) throw new Error("CR wasn't CRLF")
if (crIdx > maxLen) throw new Error("Line was too long")
if (crIdx > maxLen) throw new Error("Line too long")
const line = this.#sliceStr(0, crIdx)
this.forward(crIdx + 2)
return line
+18
View File
@@ -0,0 +1,18 @@
import type { ReadableHttp, WritableHttp } from "./types.js"
export function shouldClose(
req: (ReadableHttp | WritableHttp) & { version: "1.0" | "1.1" },
res: (ReadableHttp | WritableHttp) & { version?: "1.0" | "1.1" },
): boolean {
if (req.getHeader("Connection") === "close" || res.getHeader("Connection") === "close") {
return true
}
if (
(req.version === "1.0" || res.version === "1.0") &&
req.getHeader("Connection") !== "keep-alive" &&
res.getHeader("Connection") !== "keep-alive"
) {
return true
}
return false
}
@@ -1,66 +1,18 @@
import type { Body, ReadableHttp, Reader, WritableHttp } from "./types.js"
import { Headers, MutableHeaders } from "./headers.js"
import { statusCodes } from "./spec.js"
import type { Body, RawTransport, Request, Response } from "./types.js"
export class RequestImpl implements Request {
#transport: RawTransport
#version: "1.0" | "1.1"
#target: string
#method: string
export class ReadableHttpImpl implements ReadableHttp {
#headers: Headers
#bodyStream: Pick<RawTransport, "read" | "closed"> | null
#bodyStream: Reader | null
constructor({
transport,
version,
target,
method,
headers,
bodyStream,
}: {
transport: RawTransport
version: "1.0" | "1.1"
target: string
method: string
headers: Headers
bodyStream: Pick<RawTransport, "read" | "closed"> | null
}) {
this.#transport = transport
this.#version = version
this.#target = target
this.#method = method
constructor({ headers, bodyStream }: { headers: Headers; bodyStream: Reader | null }) {
this.#headers = headers
this.#bodyStream = bodyStream
}
get transport(): RawTransport {
return this.#transport
}
get version(): "1.0" | "1.1" {
return this.#version
}
get target(): string {
return this.#target
}
get method(): string {
return this.#method
}
get headers(): Readonly<Record<string, string | readonly string[]>> {
return this.#headers.headers
}
#url: URL | undefined
get url(): URL {
if (this.#url) return this.#url
let host = this.#headers.get("Host")
if (Array.isArray(host)) host = host[0]
if (!host) host = "localhost"
return (this.#url = new URL(this.#target, "http://" + host))
}
get query(): URLSearchParams {
return this.url.searchParams
}
getHeader(header: string): string | readonly string[] | undefined {
return this.#headers.get(header)
}
@@ -71,9 +23,6 @@ export class RequestImpl implements Request {
get hasBody(): boolean {
return !!this.#bodyStream
}
get _bodyStream(): Pick<RawTransport, "closed" | "read"> | null {
return this.#bodyStream
}
stream(encoding?: "binary"): AsyncIterable<Uint8Array>
stream(encoding: "utf8"): AsyncIterable<string>
async *stream(encoding?: "binary" | "utf8"): AsyncIterable<string | Uint8Array> {
@@ -114,42 +63,26 @@ export class RequestImpl implements Request {
return JSON.parse(await this.text())
}
#assertBody(): Pick<RawTransport, "read" | "closed"> {
#assertBody(): Reader {
if (!this.#bodyStream)
throw new Error("Attempt to read the body of a Request that doesn't have one")
return this.#bodyStream
}
}
export class ResponseImpl implements Response {
#request: RequestImpl
#status: number
#statusText: string
export class WritableHttpImpl implements WritableHttp {
#headers: MutableHeaders
body: Body
constructor(request: RequestImpl) {
this.#request = request
this.#status = 200
this.#statusText = statusCodes[200]
this.#headers = new MutableHeaders({})
this.body = null
}
get request(): RequestImpl {
return this.#request
}
get status(): number {
return this.#status
}
get statusText(): string {
return this.#statusText
}
setStatus(status: number, statusText?: string): void {
statusText ??= statusCodes[status] ?? "Unknown Status"
this.#status = status
this.#statusText = statusText
constructor({
headers,
body,
}: {
headers?: Readonly<Record<string, string | readonly string[]>>
body?: Body
} = {}) {
this.#headers = new MutableHeaders(headers ?? {})
this.body = body ?? null
}
get headers(): Readonly<Record<string, string | readonly string[]>> {
+57
View File
@@ -0,0 +1,57 @@
export class PairSync<A, B> {
#get: () => [A, B]
#aQueue: ((b: B) => void)[] = []
#bQueue: ((a: A) => void)[] = []
#close = new Set<(err: unknown) => void>()
#closed: boolean = false
constructor(get: () => [A, B]) {
this.#get = get
}
get closed(): boolean {
return this.#closed
}
close() {
if (this.#closed) return
this.#closed = true
const snapshot = [...this.#close]
this.#aQueue = []
this.#bQueue = []
this.#close.clear()
for (const close of snapshot) close(new Error("PairSync closed"))
}
async getA(): Promise<A> {
if (this.#closed) throw new Error("Already closed")
if (this.#aQueue.length) {
const send = this.#aQueue.shift()!
const [a, b] = this.#get()
send(b)
return a
}
return new Promise<A>((ok, ko) => {
this.#bQueue.push((x) => {
ok(x)
this.#close.delete(ko)
})
this.#close.add(ko)
})
}
async getB(): Promise<B> {
if (this.#closed) throw new Error("Already closed")
if (this.#bQueue.length) {
const send = this.#bQueue.shift()!
const [a, b] = this.#get()
send(a)
return b
}
return new Promise<B>((ok, ko) => {
this.#aQueue.push((x) => {
ok(x)
this.#close.delete(ko)
})
this.#close.add(ko)
})
}
}
+176
View File
@@ -0,0 +1,176 @@
import type { ReadBuffer } from "./buffer.js"
import type { Reader } from "./types.js"
import { Headers } from "./headers.js"
import { hasOwnProperty } from "./utils.js"
export interface BodyReader extends Reader {
onFinish(
okHandler?: () => void | Promise<void>,
koHandler?: (err: unknown) => void | Promise<void>,
anyHandler?: () => void | Promise<void>,
): void
}
abstract class BaseBodyReader implements Pick<BodyReader, "onFinish"> {
#finishOkHandlers: (() => void | Promise<void>)[]
#finishKoHandlers: ((err: unknown) => void | Promise<void>)[]
constructor() {
this.#finishOkHandlers = []
this.#finishKoHandlers = []
}
onFinish(
okHandler?: () => void | Promise<void>,
koHandler?: (err: unknown) => void | Promise<void>,
anyHandler?: () => void | Promise<void>,
): void {
if (okHandler) this.#finishOkHandlers.push(okHandler)
if (koHandler) this.#finishKoHandlers.push(koHandler)
if (anyHandler) {
this.#finishOkHandlers.push(anyHandler)
this.#finishKoHandlers.push(anyHandler)
}
}
async _triggerFinishOk(): Promise<void> {
const snapshot = [...this.#finishOkHandlers]
this.#finishOkHandlers = []
this.#finishKoHandlers = []
for (const handler of snapshot) {
try {
await handler()
} catch (e) {
console.error(e)
}
}
}
async _triggerFinishKo(err: unknown): Promise<void> {
const snapshot = [...this.#finishKoHandlers]
this.#finishOkHandlers = []
this.#finishKoHandlers = []
for (const handler of snapshot) {
try {
await handler(err)
} catch (e) {
console.error(e)
}
}
}
}
export class BasicBodyReader extends BaseBodyReader implements BodyReader {
#remaining: number
#buffer: ReadBuffer
#chunks: Uint8Array[]
constructor(buffer: ReadBuffer, totalLength: number) {
super()
this.#buffer = buffer
this.#remaining = totalLength
this.#chunks = []
}
get closed(): boolean {
if (this.#buffer.ended) return true
return this.#remaining === 0
}
async read(): Promise<Uint8Array> {
try {
if (this.#chunks.length) {
const chunk = this.#chunks.shift()!
if (!this.#chunks.length && !this.#remaining) await this._triggerFinishOk()
return chunk
}
if (this.closed) throw new Error("BasicBodyReader is already closed")
while (!this.#buffer.len) await this.#buffer.readOnce()
const len = Math.min(this.#buffer.len, this.#remaining)
this.#remaining -= len
const data = this.#buffer.slice(0, len)
this.#buffer.forward(len)
for (const chunk of data) this.#chunks.push(chunk)
const chunk = this.#chunks.shift()!
if (!this.#chunks.length && !this.#remaining) await this._triggerFinishOk()
return chunk
} catch (e) {
await this._triggerFinishKo(e)
throw e
}
}
}
export class ChunkedBodyReader extends BaseBodyReader implements BodyReader {
#buffer: ReadBuffer
#chunks: Uint8Array[]
#finished: boolean
constructor(buffer: ReadBuffer) {
super()
this.#buffer = buffer
this.#chunks = []
this.#finished = false
}
get closed(): boolean {
return this.#buffer.ended || this.#finished
}
async read(): Promise<Uint8Array> {
try {
if (this.#chunks.length) return this.#chunks.shift()!
if (this.closed) throw new Error("ChunkedBodyReader is already closed")
const chunkLine = await this.#buffer.readLine()
const len = Number.parseInt(chunkLine, 16)
if (!Number.isInteger(len) || len < 0)
throw new Error("Invalid chunk size in ChunkedBodyReader")
if (!len) {
this.#finished = true
await this.#buffer.read(2)
this.#buffer.forward(2)
await this._triggerFinishOk()
return new Uint8Array(0)
}
await this.#buffer.read(len + 2)
if (len > this.#buffer.len) throw new Error("Couldn't read the specified length")
const data = this.#buffer.slice(0, len)
this.#buffer.forward(len + 2)
for (const chunk of data) this.#chunks.push(chunk)
return this.#chunks.shift()!
} catch (e) {
await this._triggerFinishKo(e)
throw e
}
}
}
export async function readHeaders(buffer: ReadBuffer): Promise<Headers> {
const rawHeaders: Record<string, string[]> = {}
while (true) {
const headerLine = await buffer.readLine()
if (!headerLine.length) {
break
}
const colon = headerLine.indexOf(":")
if (colon === -1) throw new Error("Invalid header line")
const key = headerLine.slice(0, colon)
const value = headerLine.slice(colon + 1).trim()
if (!hasOwnProperty(rawHeaders, key)) rawHeaders[key] = []
rawHeaders[key].push(value)
}
return new Headers(
Object.fromEntries(Object.entries(rawHeaders).map(([k, v]) => [k, v.length === 1 ? v[0] : v])),
)
}
export function bodyReader(buffer: ReadBuffer, headers: Headers): BodyReader | null {
if (headers.get("Transfer-Encoding") === "chunked") {
return new ChunkedBodyReader(buffer)
} else if (typeof headers.get("Content-Length") === "string") {
const len = +(headers.get("Content-Length") as string)
if (!Number.isInteger(len) || len < 0) throw new Error("Content-Length is invalid")
return new BasicBodyReader(buffer, len)
} else {
return null
}
}
@@ -9,29 +9,18 @@ export interface RawTransport {
export type Reader = Pick<RawTransport, "closed" | "read">
export type Writer = Pick<RawTransport, "write">
export interface RawListener {
get closed(): boolean
close(): void | Promise<void>
export type SimpleBody = string | Uint8Array
export type StreamBody = AsyncIterable<SimpleBody | { flush: true }>
export type JsonBody = { json: unknown }
export type Body = null | SimpleBody | StreamBody | JsonBody
accept(): Promise<RawTransport>
}
export interface Request {
readonly transport: RawTransport
// raw data
readonly version: "1.0" | "1.1"
readonly target: string
readonly method: string
export interface ReadableHttp {
// headers
readonly headers: Readonly<Record<string, string | readonly string[]>>
// parsed data
readonly url: URL
readonly query: URLSearchParams
getHeader(header: string): string | readonly string[] | undefined
hasHeader(header: string): boolean
// request body
// body
readonly hasBody: boolean
stream(encoding?: "binary"): AsyncIterable<Uint8Array>
stream(encoding: "utf8"): AsyncIterable<string>
@@ -40,18 +29,7 @@ export interface Request {
json<T>(): Promise<T>
}
export type SimpleBody = string | Uint8Array
export type StreamBody = AsyncIterable<SimpleBody | { flush: true }>
export type JsonBody = { json: unknown }
export type Body = null | SimpleBody | StreamBody | JsonBody
export interface Response {
readonly request: Request
readonly status: number
readonly statusText: string
setStatus(status: number, statusText?: string): void
export interface WritableHttp {
readonly headers: Readonly<Record<string, string | readonly string[]>>
getHeader(header: string): string | readonly string[] | undefined
hasHeader(header: string): boolean
@@ -60,16 +38,3 @@ export interface Response {
body: Body
}
// eslint-disable-next-line @typescript-eslint/no-empty-object-type
export type Context<T = {}> = {
readonly transport: RawTransport
readonly req: Request
readonly res: Response
} & T
export type Next = () => Promise<void>
// eslint-disable-next-line @typescript-eslint/no-empty-object-type
export type Middleware<T = {}> = (ctx: Context<T>, next: Next) => Promise<void>
// eslint-disable-next-line @typescript-eslint/no-empty-object-type
export type Handler<T = {}> = (ctx: Context<T>) => Promise<void>
+71
View File
@@ -0,0 +1,71 @@
import type { WriteBuffer } from "./buffer.js"
import type { StreamBody, WritableHttp } from "./types.js"
export async function writeChunkedBody(
buffer: WriteBuffer,
encoder: TextEncoder,
body: StreamBody,
): Promise<void> {
for await (let chunk of body) {
if (typeof chunk === "string") chunk = encoder.encode(chunk)
if ("flush" in chunk) {
await buffer.flushAll()
continue
}
buffer.write(`${chunk.length.toString(16)}\r\n`)
buffer.write(chunk)
buffer.write("\r\n")
}
buffer.write("0\r\n\r\n")
}
export function formatBody(
writable: WritableHttp,
encoder: TextEncoder,
bodyAllowed: boolean,
): null | Uint8Array | StreamBody {
if (writable.body && typeof writable.body === "object" && "json" in writable.body) {
if (!writable.hasHeader("Content-Type")) {
writable.setHeader("Content-Type", "application/json")
}
writable.body = JSON.stringify(writable.body.json)
}
if (typeof writable.body === "string") writable.body = encoder.encode(writable.body)
if (writable.body instanceof Uint8Array) {
writable.setHeader("Content-Length", "" + writable.body.length)
} else if (writable.body) {
writable.setHeader("Transfer-Encoding", "chunked")
} else if (bodyAllowed) {
writable.setHeader("Content-Length", "0")
}
return writable.body
}
export function writeHeaders(
headers: Readonly<Record<string, string | readonly string[]>>,
): string {
let output = ""
for (const [header, value] of Object.entries(headers)) {
if (typeof value === "string") {
output += `${header}: ${value}\r\n`
} else {
for (const item of value) {
output += `${header}: ${item}\r\n`
}
}
}
return output
}
export async function sendBody(
buffer: WriteBuffer,
encoder: TextEncoder,
body: Uint8Array | StreamBody,
): Promise<void> {
if (body instanceof Uint8Array) {
buffer.write(body)
} else if (body) {
await buffer.flush()
await writeChunkedBody(buffer, encoder, body)
}
}
-180
View File
@@ -1,180 +0,0 @@
import { RequestImpl, ResponseImpl } from "./objects.js"
import type { Handler, RawTransport, Response } from "./types.js"
import { hasOwnProperty } from "./utils.js"
import { Headers } from "./headers.js"
import { statusCodeProperties } from "./spec.js"
import { ReadBuffer, WriteBuffer } from "./buffer.js"
import { BasicBodyReader, ChunkedBodyReader } from "./reader.js"
import { writeChunkedBody } from "./writer.js"
export class Connection {
#transport: RawTransport
#readBuffer: ReadBuffer
#writeBuffer: WriteBuffer
#encoder: TextEncoder
constructor(transport: RawTransport) {
this.#transport = transport
this.#readBuffer = new ReadBuffer(transport)
this.#writeBuffer = new WriteBuffer(transport)
this.#encoder = new TextEncoder()
}
async parse(): Promise<RequestImpl> {
const requestLine = await this.#readBuffer.readLine()
const parts = requestLine.split(" ")
if (parts.length !== 2 && parts.length !== 3) throw new Error("Invalid request line")
const method = parts[0].toUpperCase()
const target = parts[1]
let version: "1.0" | "1.1" = "1.0"
if (parts.length === 3) {
if (parts[2] === "HTTP/1.1") version = "1.1"
else if (parts[2] !== "HTTP/1.0") throw new Error("Invalid version in request line")
}
const rawHeaders: Record<string, string[]> = {}
while (true) {
const headerLine = await this.#readBuffer.readLine()
if (!headerLine.length) {
break
}
const colon = headerLine.indexOf(":")
if (colon === -1) throw new Error("Invalid header line")
const key = headerLine.slice(0, colon)
const value = headerLine.slice(colon + 1).trim()
if (!hasOwnProperty(rawHeaders, key)) rawHeaders[key] = []
rawHeaders[key].push(value)
}
const headers = new Headers(
Object.fromEntries(
Object.entries(rawHeaders).map(([k, v]) => [k, v.length === 1 ? v[0] : v]),
),
)
let bodyStream: Pick<RawTransport, "closed" | "read"> | null = null
if (headers.get("Transfer-Encoding") === "chunked") {
bodyStream = new ChunkedBodyReader(this.#readBuffer)
} else if (typeof headers.get("Content-Length") === "string") {
const len = +(headers.get("Content-Length") as string)
if (!Number.isInteger(len) || len < 0) throw new Error("Content-Length is invalid")
bodyStream = new BasicBodyReader(this.#readBuffer, len)
}
const req = new RequestImpl({
transport: this.#transport,
version,
target,
method,
headers,
bodyStream,
})
return req
}
async send(response: Response): Promise<void> {
const scp = statusCodeProperties[response.status] ?? {}
// convert body and set appropriate headers
if (response.body && typeof response.body === "object" && "json" in response.body) {
if (!response.hasHeader("Content-Type")) {
response.setHeader("Content-Type", "application/json")
}
response.body = JSON.stringify(response.body.json)
}
if (typeof response.body === "string") response.body = this.#encoder.encode(response.body)
if (response.body instanceof Uint8Array) {
response.setHeader("Content-Length", "" + response.body.length)
} else if (response.body) {
response.setHeader("Transfer-Encoding", "chunked")
} else if (scp.responseBody !== false) {
response.setHeader("Content-Length", "0")
}
// validate headers
for (const header of scp.requireHeaders ?? []) {
if (!response.hasHeader(header)) {
console.warn(
`[Connection.send] Response with code ${response.status} is missing ${header} header`,
)
}
}
// send status line and headers
let headers = `HTTP/${response.request.version} ${response.status} ${response.statusText}\r\n`
for (const [header, value] of Object.entries(response.headers)) {
if (typeof value === "string") {
headers += `${header}: ${value}\r\n`
} else {
for (const item of value) {
headers += `${header}: ${item}\r\n`
}
}
}
headers += "\r\n"
this.#writeBuffer.write(headers)
// if we need to send a body, send it
if (response.request.method !== "HEAD" && scp.responseBody !== false) {
if (response.body instanceof Uint8Array) {
this.#writeBuffer.write(response.body)
} else if (response.body) {
// send as much as the connection will get first
await this.#writeBuffer.flush()
// and then actually start sending the body
await writeChunkedBody(this.#writeBuffer, this.#encoder, response.body)
}
}
// and flush to make sure we sent everything
await this.#writeBuffer.flushAll()
}
async handle(handler: Handler): Promise<void> {
try {
while (!this.#readBuffer.ended) {
// handle a single request
const req = await this.parse()
const bodyStream = req._bodyStream
const res = new ResponseImpl(req)
try {
await handler({ req, res, transport: this.#transport })
} catch (e) {
console.error("[Connection.handle] caught error in handler", e)
res.setStatus(500)
res.body = "Internal Server Error"
}
await this.send(res)
// close connections that should be done
let shouldClose = false
if (req.getHeader("Connection") === "close" || res.getHeader("Connection") === "close") {
shouldClose = true
}
if (
req.version === "1.0" &&
req.getHeader("Connection") !== "keep-alive" &&
res.getHeader("Connection") !== "keep-alive"
) {
shouldClose = true
}
if (shouldClose) {
await this.close()
}
// drain the body if needed
if (bodyStream) {
while (!bodyStream.closed) await bodyStream.read()
}
// and go for another loop
}
} finally {
await this.close()
}
}
async close(): Promise<void> {
if (this.#transport.closed) return
await this.#transport.close()
}
}
+14 -7
View File
@@ -1,7 +1,14 @@
export { Server } from "./server.js"
export { Connection } from "./connection.js"
export { RequestImpl, ResponseImpl } from "./objects.js"
export { Headers, MutableHeaders, normalizeHeaders } from "./headers.js"
export { Router } from "./router.js"
export { methods, statusCodeProperties, statusCodes, type Method } from "./spec.js"
export type * from "./types.js"
export type * from "./common/types.js"
export { Headers, MutableHeaders, normalizeHeaders } from "./common/headers.js"
export { methods, statusCodeProperties, statusCodes, type Method } from "./common/spec.js"
export type * from "./server/types.js"
export { Server } from "./server/server.js"
export { ServerConnection } from "./server/connection.js"
export { Router } from "./server/router.js"
export type * from "./client/types.js"
export { ClientRequestImpl } from "./client/objects.js"
export { fetch, makeFetch, type Fetch } from "./client/fetch.js"
export { PooledDialer, UnpooledDialer, type ConnectionPool } from "./client/pool.js"
export { ClientConnection } from "./client/connection.js"
+118
View File
@@ -0,0 +1,118 @@
import type { RawDialer } from "../client/types.js"
import { PairSync } from "../common/pair.js"
import type { RawTransport } from "../common/types.js"
import type { RawListener } from "../server/types.js"
export class LoopbackTransportHalf implements RawTransport {
#buffers: Uint8Array[] = []
#callback: ((data: Uint8Array) => void) | null = null
#errcallback: ((err: unknown) => void) | null = null
#readClosed: boolean = false
#writeClosed: boolean = false
_closeFn: (() => void) | null = null
_writeFn: ((buf: Uint8Array) => void) | null = null
_push(buf: Uint8Array): void {
const callback = this.#callback
if (callback) {
this.#callback = null
callback(buf)
} else {
this.#buffers.push(buf)
}
}
_end(): void {
this.#readClosed = true
this.#maybeEnded("end event")
}
#maybeEnded(source: string): void {
setImmediate(() => {
if (this.closed) this.#errcallback?.(new Error("Socket ended: " + source))
})
}
close(): void {
if (!this._closeFn) throw new Error("No _closeFn")
this._closeFn()
this.#writeClosed = true
this.#maybeEnded("close()")
}
get closed() {
return this.#readClosed && this.#writeClosed && !this.#buffers.length
}
async read(): Promise<Uint8Array> {
if (this.#buffers.length) {
const buf = this.#buffers.shift()!
return buf
}
if (this.#callback) {
throw new Error("Broken invariant: two calls of read() at once")
}
return new Promise<Uint8Array>((ok, ko) => {
this.#callback = ok
this.#errcallback = ko
})
}
async write(data: Uint8Array): Promise<void> {
if (this.#writeClosed) throw new Error("Write closed")
if (!this._writeFn) throw new Error("No _writeFn")
this._writeFn(data)
}
}
export function loopbackTransportPair(): [LoopbackTransportHalf, LoopbackTransportHalf] {
const a = new LoopbackTransportHalf()
const b = new LoopbackTransportHalf()
a._closeFn = () => b._end()
a._writeFn = (buf) => b._push(buf)
b._closeFn = () => a._end()
b._writeFn = (buf) => a._push(buf)
return [a, b]
}
export class LoopbackDialer implements RawDialer {
#filter: (host: string, port: number) => boolean
#pair: PairSync<LoopbackTransportHalf, LoopbackTransportHalf>
constructor(
pair: PairSync<LoopbackTransportHalf, LoopbackTransportHalf>,
filter: (host: string, port: number) => boolean = () => true,
) {
this.#pair = pair
this.#filter = filter
}
async dial(host: string, port: number): Promise<RawTransport> {
if (!this.#filter(host, port)) throw new Error("Invalid host:port pair")
return this.#pair.getA()
}
}
export class LoopbackListener implements RawListener {
#pair: PairSync<LoopbackTransportHalf, LoopbackTransportHalf>
constructor(pair: PairSync<LoopbackTransportHalf, LoopbackTransportHalf>) {
this.#pair = pair
}
get closed() {
return this.#pair.closed
}
close(): void {
this.#pair.close()
}
async accept(): Promise<RawTransport> {
return this.#pair.getB()
}
}
export function loopbackListener({
filter,
}: { filter?: (hostname: string, port: number) => boolean } = {}): [RawListener, LoopbackDialer] {
const pair = new PairSync<LoopbackTransportHalf, LoopbackTransportHalf>(loopbackTransportPair)
return [new LoopbackListener(pair), new LoopbackDialer(pair, filter)]
}
+42
View File
@@ -0,0 +1,42 @@
import test, { suite, after } from "node:test"
import { Fetch, makeFetch } from "../client/fetch.js"
import { nodeDialer } from "./transport.js"
import assert from "node:assert"
import { UnpooledDialer } from "../client/pool.js"
function fetchUrlTest(url: string, fetch: Fetch, count: number) {
test(url, async () => {
for (let i = 0; i < count; i++) {
const res = await fetch(url)
assert.strictEqual(res.status, 200)
assert.strictEqual(res.statusText, "OK")
assert.strictEqual(res.hasBody, true)
const body = await res.bytes()
if (res.hasHeader("Content-Length"))
assert.strictEqual(body.length, +res.getHeader("Content-Length")!)
}
})
}
suite("node", () => {
suite("fetch", () => {
for (const [fetch, name, count] of [
[makeFetch(new UnpooledDialer(nodeDialer), { keepAlive: false }), "no pool", 1],
[makeFetch(nodeDialer, { keepAlive: false }), "no keep-alive", 2],
[makeFetch(nodeDialer, { keepAlive: true }), "keep-alive", 2],
] as const) {
suite(name, () => {
fetchUrlTest("https://www.google.com", fetch, count)
fetchUrlTest("https://one.one.one.one", fetch, count)
fetchUrlTest("http://httpforever.com", fetch, count)
fetchUrlTest("http://httpbin.org", fetch, count)
if ("pool" in fetch) {
after(async () => {
await fetch.pool.shutdown()
})
}
})
}
})
})
+144
View File
@@ -0,0 +1,144 @@
import { connect, createServer, Server, type Socket } from "node:net"
import { connect as connectTls } from "node:tls"
import type { RawTransport } from "../common/types.js"
import type { RawDialer } from "../client/types.js"
import type { RawListener } from "../server/types.js"
export class NodeTransport implements RawTransport {
#socket: Socket
#buffers: Uint8Array[]
#callback: ((data: Uint8Array) => void) | null
#errcallback: ((err: unknown) => void) | null
#readClosed: boolean
#writeClosed: boolean
constructor(socket: Socket) {
this.#socket = socket
this.#buffers = []
this.#callback = null
this.#errcallback = null
this.#readClosed = false
this.#writeClosed = false
socket.pause()
socket.on("data", (chunk) => {
if (typeof chunk === "string") throw new Error("Unacceptable string chunk")
const callback = this.#callback
if (callback) {
this.#callback = null
callback(chunk)
} else {
socket.pause()
this.#buffers.push(chunk)
}
})
socket.on("end", () => {
this.#readClosed = true
this.#maybeEnded("end event")
})
socket.on("close", () => {
this.#writeClosed = true
this.#readClosed = true
})
socket.on("error", (err) => {
this.#writeClosed = true
this.#readClosed = true
this.#errcallback?.(err)
})
}
#maybeEnded(source: string): void {
setImmediate(() => {
if (this.closed) this.#errcallback?.(new Error("Socket ended: " + source))
})
}
close(): void {
this.#socket.end()
this.#writeClosed = true
this.#maybeEnded("close()")
}
get closed(): boolean {
return this.#readClosed && this.#writeClosed && !this.#buffers.length
}
async read(): Promise<Uint8Array> {
if (this.#buffers.length) {
const buf = this.#buffers.shift()!
return buf
}
if (this.#callback) {
throw new Error("Broken invariant: two calls of read() at once")
}
return new Promise<Uint8Array>((ok, ko) => {
this.#callback = ok
this.#errcallback = ko
this.#socket.resume()
})
}
async write(data: Uint8Array): Promise<void> {
return new Promise<void>((ok, ko) =>
this.#socket.write(data, (err) => {
if (err) ko(err)
else ok()
}),
)
}
}
export class NodeDialer implements RawDialer {
async dial(host: string, port: number): Promise<RawTransport> {
return new NodeTransport(connect({ host, port }))
}
async dialTls(host: string, port: number): Promise<RawTransport> {
return new NodeTransport(connectTls({ host, port, servername: host }))
}
}
export const nodeDialer = new NodeDialer()
export class NodeListener implements RawListener {
#server: Server
#errcallback: ((err: unknown) => void) | null
#closed: boolean
constructor(server: Server) {
this.#server = server
this.#errcallback = null
this.#closed = false
this.#server.on("error", (err) => {
this.#closed = true
this.#errcallback?.(err)
})
this.#server.on("close", () => {
this.#closed = true
this.#errcallback?.(new Error("Server closed"))
})
}
get closed(): boolean {
return this.#closed
}
close(): void {
this.#closed = true
this.#server.close()
this.#errcallback?.(new Error("Server closed"))
}
async accept(): Promise<RawTransport> {
return new Promise((ok, ko) => {
this.#errcallback = ko
this.#server.once("connection", (conn) => ok(new NodeTransport(conn)))
})
}
}
export async function nodeListen(port: number): Promise<NodeListener> {
const server = createServer()
return new Promise<NodeListener>((ok, ko) => {
server.once("error", ko)
server.listen(port, () => {
ok(new NodeListener(server))
server.removeListener("error", ko)
})
})
}
-67
View File
@@ -1,67 +0,0 @@
import type { ReadBuffer } from "./buffer.js"
import type { Reader } from "./types.js"
export class BasicBodyReader implements Reader {
#remaining: number
#buffer: ReadBuffer
#chunks: Uint8Array[]
constructor(buffer: ReadBuffer, totalLength: number) {
this.#buffer = buffer
this.#remaining = totalLength
this.#chunks = []
}
get closed(): boolean {
if (this.#buffer.ended) return true
return this.#remaining === 0
}
async read(): Promise<Uint8Array> {
if (this.#chunks.length) return this.#chunks.shift()!
if (this.closed) throw new Error("BasicBodyReader is already closed")
if (!this.#buffer.len) await this.#buffer.readOnce()
const len = Math.min(this.#buffer.len, this.#remaining)
this.#remaining -= len
const data = this.#buffer.slice(0, len)
this.#buffer.forward(len)
for (const chunk of data) this.#chunks.push(chunk)
return this.#chunks.shift()!
}
}
export class ChunkedBodyReader implements Reader {
#buffer: ReadBuffer
#chunks: Uint8Array[]
#finished: boolean
constructor(buffer: ReadBuffer) {
this.#buffer = buffer
this.#chunks = []
this.#finished = false
}
get closed(): boolean {
return this.#buffer.ended || this.#finished
}
async read(): Promise<Uint8Array> {
if (this.#chunks.length) return this.#chunks.shift()!
if (this.closed) throw new Error("ChunkedBodyReader is already closed")
const chunkLine = await this.#buffer.readLine()
const len = Number.parseInt(chunkLine, 16)
if (!Number.isInteger(len) || len < 0)
throw new Error("Invalid chunk size in ChunkedBodyReader")
if (!len) {
this.#finished = true
await this.#buffer.read(2)
this.#buffer.forward(2)
return new Uint8Array(0)
}
await this.#buffer.read(len + 2)
const data = this.#buffer.slice(0, len)
this.#buffer.forward(len + 2)
for (const chunk of data) this.#chunks.push(chunk)
return this.#chunks.shift()!
}
}
@@ -1,10 +1,10 @@
import test, { suite } from "node:test"
import { Connection } from "./connection.js"
import type { RawTransport } from "./types.js"
import type { RequestImpl } from "./objects.js"
import { ServerConnection } from "./connection.js"
import type { RawTransport } from "../common/types.js"
import type { ServerRequestImpl } from "./objects.js"
import assert from "node:assert"
suite("Connection", () => {
suite("ServerConnection", () => {
class MockTransport implements RawTransport {
#data: Uint8Array
#chunkSize: number
@@ -39,7 +39,7 @@ suite("Connection", () => {
async function testParse(
data: Uint8Array | string,
match: (conn: RequestImpl) => void | Promise<void>,
match: (conn: ServerRequestImpl) => void | Promise<void>,
) {
if (typeof data === "string") data = new TextEncoder().encode(data)
for (const dupe of [false, true]) {
@@ -54,7 +54,7 @@ suite("Connection", () => {
await test(dupe ? "with request duplicated" : "without duplicating request", async () => {
for (const chunkSize of [1, 10, Infinity]) {
await test(`for a chunkSize of ${chunkSize}`, async () => {
const connection = new Connection(new MockTransport(fullData, chunkSize))
const connection = new ServerConnection(new MockTransport(fullData, chunkSize))
const parsed = await connection.parse()
await match(parsed)
if (dupe) {
+121
View File
@@ -0,0 +1,121 @@
import { ServerRequestImpl, ServerResponseImpl } from "./objects.js"
import type { RawTransport, Reader } from "../common/types.js"
import { statusCodeProperties } from "../common/spec.js"
import { ReadBuffer, WriteBuffer } from "../common/buffer.js"
import { bodyReader, readHeaders } from "../common/reader.js"
import { formatBody, sendBody, writeHeaders } from "../common/writer.js"
import type { Handler, ServerRequest, ServerResponse } from "./types.js"
import { shouldClose } from "../common/connection.js"
export class ServerConnection {
#transport: RawTransport
#readBuffer: ReadBuffer
#writeBuffer: WriteBuffer
#encoder: TextEncoder
#prevBody: Reader | null
constructor(transport: RawTransport) {
this.#transport = transport
this.#readBuffer = new ReadBuffer(transport)
this.#writeBuffer = new WriteBuffer(transport)
this.#encoder = new TextEncoder()
this.#prevBody = null
}
async #parse(): Promise<ServerRequestImpl> {
const requestLine = await this.#readBuffer.readLine()
const parts = requestLine.split(" ")
if (parts.length !== 2 && parts.length !== 3) throw new Error("Invalid request line")
const method = parts[0].toUpperCase()
const target = parts[1]
let version: "1.0" | "1.1" = "1.0"
if (parts.length === 3) {
if (parts[2] === "HTTP/1.1") version = "1.1"
else if (parts[2] !== "HTTP/1.0") throw new Error("Invalid version in request line")
}
const headers = await readHeaders(this.#readBuffer)
const bodyStream = bodyReader(this.#readBuffer, headers)
this.#prevBody = bodyStream
return new ServerRequestImpl({
transport: this.#transport,
version,
target,
method,
headers,
bodyStream,
})
}
parse(): Promise<ServerRequest> {
return this.#parse()
}
async send(response: ServerResponse): Promise<void> {
const scp = statusCodeProperties[response.status] ?? {}
// convert body and set appropriate headers
const body = formatBody(response, this.#encoder, scp.responseBody !== false)
// validate headers
for (const header of scp.requireHeaders ?? []) {
if (!response.hasHeader(header)) {
console.warn(
`[Connection.send] Response with code ${response.status} is missing ${header} header`,
)
}
}
// send status line and headers
let headers = `HTTP/${response.request.version} ${response.status} ${response.statusText}\r\n`
headers += writeHeaders(response.headers)
headers += "\r\n"
this.#writeBuffer.write(headers)
// if we need to send a body, send it
if (response.request.method !== "HEAD" && scp.responseBody !== false && body) {
await sendBody(this.#writeBuffer, this.#encoder, body)
}
// and flush to make sure we sent everything
await this.#writeBuffer.flushAll()
}
async handle(handler: Handler): Promise<void> {
try {
while (!this.#readBuffer.ended) {
// handle a single request
const req = await this.#parse()
const res = new ServerResponseImpl(req)
try {
await handler({ req, res, transport: this.#transport })
} catch (e) {
console.error("[Connection.handle] caught error in handler", e)
res.setStatus(500)
res.body = "Internal Server Error"
}
await this.send(res)
// close connections that should be done
if (shouldClose(req, res)) {
await this.close()
}
// drain the body if needed
if (this.#prevBody) {
while (!this.#prevBody.closed) await this.#prevBody.read()
}
// and go for another loop
}
} finally {
await this.close()
}
}
async close(): Promise<void> {
if (this.#transport.closed) return
await this.#transport.close()
}
}
+89
View File
@@ -0,0 +1,89 @@
import type { Headers } from "../common/headers.js"
import { ReadableHttpImpl, WritableHttpImpl } from "../common/objects.js"
import { statusCodes } from "../common/spec.js"
import type { RawTransport, Reader } from "../common/types.js"
import type { ServerRequest, ServerResponse } from "./types.js"
export class ServerRequestImpl extends ReadableHttpImpl implements ServerRequest {
#transport: RawTransport
#version: "1.0" | "1.1"
#target: string
#method: string
constructor({
transport,
version,
target,
method,
headers,
bodyStream,
}: {
transport: RawTransport
version: "1.0" | "1.1"
target: string
method: string
headers: Headers
bodyStream: Reader | null
}) {
super({ headers, bodyStream })
this.#transport = transport
this.#version = version
this.#target = target
this.#method = method
}
get transport(): RawTransport {
return this.#transport
}
get version(): "1.0" | "1.1" {
return this.#version
}
get target(): string {
return this.#target
}
get method(): string {
return this.#method
}
#url: URL | undefined
get url(): URL {
if (this.#url) return this.#url
let host = this.getHeader("Host")
if (Array.isArray(host)) host = host[0]
if (!host) host = "localhost"
return (this.#url = new URL(this.#target, "http://" + host))
}
get query(): URLSearchParams {
return this.url.searchParams
}
}
export class ServerResponseImpl extends WritableHttpImpl implements ServerResponse {
#request: ServerRequestImpl
#status: number
#statusText: string
constructor(request: ServerRequestImpl) {
super()
this.#request = request
this.#status = 200
this.#statusText = statusCodes[200]
}
get request(): ServerRequestImpl {
return this.#request
}
get status(): number {
return this.#status
}
get statusText(): string {
return this.#statusText
}
setStatus(status: number, statusText?: string): void {
statusText ??= statusCodes[status] ?? "Unknown Status"
this.#status = status
this.#statusText = statusText
}
}
@@ -1,5 +1,5 @@
import { parse } from "regexparam"
import type { Handler, Middleware, Next, Request } from "./types.js"
import type { Handler, Middleware, Next, ServerRequest } from "./types.js"
type RouterLayer = {
// from us
@@ -117,7 +117,7 @@ export class Router {
}
#matchLayer(
req: Pick<Request, "method" | "url">,
req: Pick<ServerRequest, "method" | "url">,
layer: RouterLayer,
matchMethod: boolean = true,
): Readonly<Record<string, string>> | null {
@@ -133,7 +133,7 @@ export class Router {
return keys
}
match(req: Pick<Request, "method" | "url">): HandlerMatch | PartialHandlerMatch {
match(req: Pick<ServerRequest, "method" | "url">): HandlerMatch | PartialHandlerMatch {
const middlewares: [Middleware<Match>, Readonly<Record<string, string>>][] = []
const globalMiddlewares: [Middleware<PartialMatch>, Readonly<Record<string, string>>][] = []
const validMethods = new Set<string>()
@@ -1,4 +1,4 @@
import { Connection } from "./connection.js"
import { ServerConnection } from "./connection.js"
import { Router } from "./router.js"
import type { RawListener } from "./types.js"
@@ -7,7 +7,7 @@ export class Server extends Router {
while (!listener.closed) {
try {
const transport = await listener.accept()
const connection = new Connection(transport)
const connection = new ServerConnection(transport)
void connection
.handle(this.handler)
.catch((e) => console.error("[Server.listen] caught error in connection handling", e))
+43
View File
@@ -0,0 +1,43 @@
import type { RawTransport, ReadableHttp, WritableHttp } from "../common/types.js"
export interface RawListener {
get closed(): boolean
close(): void | Promise<void>
accept(): Promise<RawTransport>
}
export interface ServerRequest extends ReadableHttp {
readonly transport: RawTransport
// raw data
readonly version: "1.0" | "1.1"
readonly target: string
readonly method: string
readonly headers: Readonly<Record<string, string | readonly string[]>>
// parsed data
readonly url: URL
readonly query: URLSearchParams
}
export interface ServerResponse extends WritableHttp {
readonly request: ServerRequest
readonly status: number
readonly statusText: string
setStatus(status: number, statusText?: string): void
}
// eslint-disable-next-line @typescript-eslint/no-empty-object-type
export type Context<T = {}> = {
readonly transport: RawTransport
readonly req: ServerRequest
readonly res: ServerResponse
} & T
export type Next = () => Promise<void>
// eslint-disable-next-line @typescript-eslint/no-empty-object-type
export type Middleware<T = {}> = (ctx: Context<T>, next: Next) => Promise<void>
// eslint-disable-next-line @typescript-eslint/no-empty-object-type
export type Handler<T = {}> = (ctx: Context<T>) => Promise<void>
-20
View File
@@ -1,20 +0,0 @@
import type { WriteBuffer } from "./buffer.js"
import type { StreamBody } from "./types.js"
export async function writeChunkedBody(
writer: WriteBuffer,
encoder: TextEncoder,
body: StreamBody,
): Promise<void> {
for await (let chunk of body) {
if (typeof chunk === "string") chunk = encoder.encode(chunk)
if ("flush" in chunk) {
await writer.flushAll()
continue
}
writer.write(`${chunk.length.toString(16)}\r\n`)
writer.write(chunk)
writer.write("\r\n")
}
writer.write("0\r\n\r\n")
}