Compare commits

...

6 Commits

Author SHA1 Message Date
codinget 9ebbd6ec2d feat(http): export prependTransport via _internals
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-30 14:30:28 +00:00
codinget a952de815b test(http): achieve 100% coverage for hijack and prependTransport
Fix two inline RawTransport imports in server/connection.test.ts (the
type was already imported at the top of the file).

Add prependTransport suite to common/buffer.test.ts covering every
getter and method: read() prefix-then-base, closed, close(), readEnded,
whenClosed, remoteAddr, localAddr, write(), and both branches of the
halfClose conditional.

Add a client/connection test where the server sends the 101 response
and raw protocol bytes in the same write, leaving bytes in the
ReadBuffer when hijack() is called. This exercises the prependTransport
path inside #takeTransport().

Add a fetch() hijack test that sends a 101 response through the full
fetch stack. This exercises the poolDone guard in rawFetch: hijack()
fires the synchronous listener (done(true)) before the
onPrevBodyFinished microtask runs done(false), which hits the guard and
is correctly a no-op.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-30 14:17:06 +00:00
codinget 873f392a79 fix(http): guard hijacked ClientConnection against reuse and fix pool timing
Guard: send() and request() now throw "Cannot use a hijacked connection"
if called after hijack(), preventing accidental state corruption.

Pool timing: the previous onPrevBodyFinished().then(releaseOrReject)
approach queued pool management as a microtask that ran before the
caller's await-continuation, meaning hijack() hadn't been called yet
when the pool decision fired. The fix injects a synchronous hijack
listener via setHijackListener() that calls pool.rejectConnection()
immediately when hijack() is invoked. A poolDone latch ensures only
one of the two paths (listener or body-finish callback) updates the
pool, regardless of call order or async gaps between fetch() and
hijack().

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 16:05:40 +00:00
codinget dfa4ab69cf feat(http): add sendResponse option to server hijack
hijack({ sendResponse: false }) flushes any pending write-buffer bytes
without sending the response headers/body. This is intended for use
with the upcoming 1xx support: a handler can send a 101 Switching
Protocols as an interim response via the 1xx mechanism, then call
hijack({ sendResponse: false }) to take ownership of the transport
without double-sending the response.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 15:34:24 +00:00
codinget f905206441 docs: update AI disclosure for hijack() implementation
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 10:52:42 +00:00
codinget a70ba05ce3 feat(http): add hijack() to server and client responses for HTTP upgrades
ServerResponse.hijack() sends the current response then returns the
underlying RawTransport so handlers can switch to a different protocol
(WebSocket, raw TCP, etc.) without the server loop taking over again.

ClientResponse.hijack() returns the raw transport after a 101 response
so the caller can take over the connection for the new protocol.

Both variants prepend any bytes the ReadBuffer had already read ahead,
ensuring no data is lost. The server's handle() loop exits cleanly on
hijack and leaves the transport open. The pool integration in fetch.ts
rejects hijacked connections rather than returning them to the pool.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 10:52:26 +00:00
17 changed files with 548 additions and 22 deletions
+2
View File
@@ -73,3 +73,5 @@ The following were hand-written but with substantial Claude Code contributions:
- **`packages/http` — package and build scaffolding**: initial `package.json`, `tsconfig.json`, and build configuration were set up by Claude Code
The test suite for `packages/http` was mostly generated by Claude Code, which also identified and fixed several further bugs: a body-size-limit error being swallowed on the final body chunk, a stale idle-connection entry left behind by `rejectConnection`, and a `read()` call hanging indefinitely after a socket close.
- **`packages/http``hijack()` on server and client responses**: the `hijack()` method on `ServerResponse` and `ClientResponse`, the `ReadBuffer.drain()` helper, and the `prependTransport()` utility were implemented by Claude Code
+1
View File
@@ -12,6 +12,7 @@ export {
bodyReader,
formatBody,
hasOwnProperty,
prependTransport,
readHeaders,
sendBody,
withTimeout,
+119
View File
@@ -361,4 +361,123 @@ suite("ClientConnection", { skip: skipIfNotIntegration }, () => {
void res1 // suppress unused warning
})
})
suite("hijack()", () => {
test("hijack returns the raw transport on a 101 response", async () => {
const { conn, serverWrite, serverBuf } = await makeClientServer()
const req = new ClientRequestImpl({
target: "/",
method: "GET",
headers: { Upgrade: "proto", Connection: "Upgrade" },
})
const responsePromise = conn.request(req)
await drainHeaders(serverBuf)
await serverWrite("HTTP/1.1 101 Switching Protocols\r\nUpgrade: proto\r\n\r\n")
const res = await responsePromise
assert.strictEqual(res.status, 101)
const transport = res.hijack()
assert.ok(transport, "hijack should return a transport")
})
test("raw bytes from server arrive via hijacked transport", async () => {
const { conn, serverWrite, serverBuf, serverTransport } = await makeClientServer()
const req = new ClientRequestImpl({
target: "/",
method: "GET",
headers: { Upgrade: "proto", Connection: "Upgrade" },
})
const responsePromise = conn.request(req)
await drainHeaders(serverBuf)
await serverWrite("HTTP/1.1 101 Switching Protocols\r\nUpgrade: proto\r\n\r\n")
const res = await responsePromise
const transport = res.hijack()
await serverTransport.write(enc.encode("HELLO"))
const chunk = await transport.read()
assert.strictEqual(new TextDecoder().decode(chunk), "HELLO")
})
test("pre-read bytes are prepended to the hijacked transport", async () => {
const { conn, serverBuf, serverTransport } = await makeClientServer()
const req = new ClientRequestImpl({
target: "/",
headers: { Upgrade: "proto", Connection: "Upgrade" },
})
const responsePromise = conn.request(req)
await drainHeaders(serverBuf)
// Send 101 response and raw protocol bytes in a single write so the HTTP
// parser reads ahead into the protocol data.
await serverTransport.write(
enc.encode("HTTP/1.1 101 Switching Protocols\r\nUpgrade: proto\r\n\r\nHELLO"),
)
const res = await responsePromise
const transport = res.hijack()
const chunk = await transport.read()
assert.strictEqual(new TextDecoder().decode(chunk), "HELLO")
})
test("hijack marks connection as hijacked", async () => {
const { conn, serverWrite, serverBuf } = await makeClientServer()
const req = new ClientRequestImpl({ target: "/", headers: { Upgrade: "proto" } })
const responsePromise = conn.request(req)
await drainHeaders(serverBuf)
await serverWrite("HTTP/1.1 101 Switching Protocols\r\nUpgrade: proto\r\n\r\n")
const res = await responsePromise
assert.strictEqual(conn.hijacked, false)
res.hijack()
assert.strictEqual(conn.hijacked, true)
})
test("hijack throws if called twice", async () => {
const { conn, serverWrite, serverBuf } = await makeClientServer()
const req = new ClientRequestImpl({ target: "/", headers: { Upgrade: "proto" } })
const responsePromise = conn.request(req)
await drainHeaders(serverBuf)
await serverWrite("HTTP/1.1 101 Switching Protocols\r\nUpgrade: proto\r\n\r\n")
const res = await responsePromise
res.hijack()
assert.throws(() => res.hijack(), /hijack not available/)
})
test("send() throws after hijack", async () => {
const { conn, serverWrite, serverBuf } = await makeClientServer()
const req = new ClientRequestImpl({ target: "/", headers: { Upgrade: "proto" } })
const responsePromise = conn.request(req)
await drainHeaders(serverBuf)
await serverWrite("HTTP/1.1 101 Switching Protocols\r\nUpgrade: proto\r\n\r\n")
const res = await responsePromise
res.hijack()
await assert.rejects(
() => conn.send(new ClientRequestImpl({ target: "/" })),
/Cannot use a hijacked connection/,
)
})
test("request() throws after hijack", async () => {
const { conn, serverWrite, serverBuf } = await makeClientServer()
const req = new ClientRequestImpl({ target: "/", headers: { Upgrade: "proto" } })
const responsePromise = conn.request(req)
await drainHeaders(serverBuf)
await serverWrite("HTTP/1.1 101 Switching Protocols\r\nUpgrade: proto\r\n\r\n")
const res = await responsePromise
res.hijack()
await assert.rejects(
() => conn.request(new ClientRequestImpl({ target: "/" })),
/Cannot use a hijacked connection/,
)
})
test("setHijackListener fires synchronously when hijack() is called", async () => {
const { conn, serverWrite, serverBuf } = await makeClientServer()
const req = new ClientRequestImpl({ target: "/", headers: { Upgrade: "proto" } })
const responsePromise = conn.request(req)
await drainHeaders(serverBuf)
await serverWrite("HTTP/1.1 101 Switching Protocols\r\nUpgrade: proto\r\n\r\n")
const res = await responsePromise
let fired = false
conn.setHijackListener(() => { fired = true })
assert.strictEqual(fired, false)
res.hijack()
assert.strictEqual(fired, true)
})
})
})
+27 -1
View File
@@ -1,4 +1,4 @@
import { ReadBuffer, WriteBuffer, type ReadBufferOptions } from "../common/buffer.js"
import { prependTransport, ReadBuffer, WriteBuffer, type ReadBufferOptions } from "../common/buffer.js"
import { shouldClose } from "../common/connection.js"
import {
bodyReader,
@@ -34,6 +34,8 @@ export class ClientConnection {
#writeBuffer: WriteBuffer
#encoder: TextEncoder
#prevBody: BodyReader | null
#hijacked: boolean
#onHijack: (() => void) | null
#options: ClientConnectionOptions
@@ -43,16 +45,35 @@ export class ClientConnection {
this.#writeBuffer = new WriteBuffer(transport)
this.#encoder = new TextEncoder()
this.#prevBody = null
this.#hijacked = false
this.#onHijack = null
this.#options = options
}
get hijacked(): boolean {
return this.#hijacked
}
setHijackListener(cb: () => void): void {
this.#onHijack = cb
}
#takeTransport(): RawTransport {
this.#onHijack?.()
this.#onHijack = null
const prefix = this.#readBuffer.drain()
if (prefix.length > 0) return prependTransport(prefix, this.#transport)
return this.#transport
}
setOptions(options: ClientConnectionOptions = {}): void {
this.#options = options
this.#readBuffer.setOptions(options)
}
async send(request: ClientRequest): Promise<void> {
if (this.#hijacked) throw new Error("Cannot use a hijacked connection")
const method = request.method.toUpperCase()
const mp = hasOwnProperty(methods, method) ? methods[method as Method] : methods.PUT
if (request.body !== null && mp.requestBody === false) {
@@ -113,6 +134,10 @@ export class ClientConnection {
headers,
bodyStream,
defaultBodyReaderOptions: this.#options,
hijackFn: () => {
this.#hijacked = true
return this.#takeTransport()
},
})
}
parse(request: ClientRequest): Promise<ClientResponse> {
@@ -120,6 +145,7 @@ export class ClientConnection {
}
async #request(req: ClientRequest): Promise<ClientResponseImpl> {
if (this.#hijacked) throw new Error("Cannot use a hijacked connection")
if (this.#prevBody) {
while (!this.#prevBody.closed) await this.#prevBody.read()
this.#prevBody = null
+26
View File
@@ -197,6 +197,32 @@ suite("fetch()", { skip: skipIfNotIntegration }, () => {
await serverDone
})
test("hijack() works through fetch and correctly manages the pool", async () => {
const [listener, dialer] = loopbackListener()
const serverDone = (async () => {
const transport = await listener.accept()
const buf = new ReadBuffer(transport)
while ((await buf.readLine()) !== "") {
/* drain headers */
}
await transport.write(
enc.encode("HTTP/1.1 101 Switching Protocols\r\nUpgrade: proto\r\n\r\n"),
)
listener.close()
})()
// Connection: upgrade prevents fetch() from overriding it with Connection: close
const res = await fetch(dialer, {
url: "http://localhost/",
headers: { Connection: "upgrade" },
})
assert.strictEqual(res.status, 101)
// hijack() fires the synchronous listener (done(true)), then onPrevBodyFinished
// microtask fires done(false) which hits the poolDone guard and is a no-op
const transport = res.hijack()
assert.ok(transport, "hijack should return a transport")
await serverDone
})
test("rejectConnection is called when response body read fails (truncated body)", async () => {
// Server sends 2 of 10 promised bytes then closes — body read fails mid-stream.
// Use Connection: upgrade so rawFetch doesn't add Connection: close, keeping shouldClose=false.
+13 -7
View File
@@ -63,14 +63,20 @@ async function rawFetch(
)
try {
const result = await conn.request(req)
conn.onPrevBodyFinished().then(
() => {
pool.releaseConnection(url.hostname, port, url.protocol === "https:", conn)
},
() => {
let poolDone = false
const done = (reject: boolean) => {
if (poolDone) return
poolDone = true
if (reject) {
pool.rejectConnection(url.hostname, port, url.protocol === "https:", conn)
},
)
} else {
pool.releaseConnection(url.hostname, port, url.protocol === "https:", conn)
}
}
// Synchronous path: hijack() fires done(true) immediately, regardless of
// when it is called relative to the onPrevBodyFinished microtask.
conn.setHijackListener(() => done(true))
conn.onPrevBodyFinished().then(() => done(false), () => done(true))
return result
} catch (e) {
pool.rejectConnection(url.hostname, port, url.protocol === "https:", conn)
+12 -1
View File
@@ -1,6 +1,6 @@
import { ReadableHttpImpl, WritableHttpImpl } from "../common/objects.js"
import type { BodyReaderOptions } from "../common/reader.js"
import type { Reader } from "../common/types.js"
import type { RawTransport, Reader } from "../common/types.js"
import type { ClientRequest, ClientRequestInit, ClientResponse } from "./types.js"
import type { Headers } from "../common/headers.js"
@@ -21,6 +21,7 @@ export class ClientResponseImpl extends ReadableHttpImpl implements ClientRespon
#version: "1.0" | "1.1"
#status: number
#statusText: string
#hijackFn: (() => RawTransport) | null
constructor({
version,
@@ -29,6 +30,7 @@ export class ClientResponseImpl extends ReadableHttpImpl implements ClientRespon
statusText,
bodyStream,
defaultBodyReaderOptions,
hijackFn,
}: {
version: "1.0" | "1.1"
headers: Headers
@@ -36,11 +38,13 @@ export class ClientResponseImpl extends ReadableHttpImpl implements ClientRespon
statusText: string
bodyStream: Reader | null
defaultBodyReaderOptions?: BodyReaderOptions
hijackFn: () => RawTransport
}) {
super({ headers, bodyStream, defaultBodyReaderOptions })
this.#version = version
this.#status = status
this.#statusText = statusText
this.#hijackFn = hijackFn
}
get version(): "1.0" | "1.1" {
@@ -52,4 +56,11 @@ export class ClientResponseImpl extends ReadableHttpImpl implements ClientRespon
get statusText(): string {
return this.#statusText
}
hijack(): RawTransport {
if (!this.#hijackFn) throw new Error("hijack not available on this response")
const fn = this.#hijackFn
this.#hijackFn = null
return fn()
}
}
+1
View File
@@ -16,6 +16,7 @@ export interface ClientResponse extends ReadableHttp {
readonly status: number
readonly statusText: string
readonly version: "1.0" | "1.1"
hijack(): RawTransport
}
export type ClientRequestInit = Pick<ClientRequest, "target"> &
+1 -1
View File
@@ -1,5 +1,5 @@
export { ReadableHttpImpl, WritableHttpImpl } from "./objects.js"
export { ReadBuffer, WriteBuffer } from "./buffer.js"
export { prependTransport, ReadBuffer, WriteBuffer } from "./buffer.js"
export { PairSync } from "./pair.js"
export {
BasicBodyReader,
+113 -2
View File
@@ -1,7 +1,7 @@
import test, { suite } from "node:test"
import assert from "node:assert"
import { ReadBuffer, WriteBuffer } from "./buffer.js"
import type { Reader, Writer } from "./types.js"
import { prependTransport, ReadBuffer, WriteBuffer } from "./buffer.js"
import type { RawTransport, Reader, Writer } from "./types.js"
const enc = new TextEncoder()
const dec = new TextDecoder()
@@ -298,3 +298,114 @@ suite("WriteBuffer", () => {
assert.strictEqual(dec.decode(out), "hello world")
})
})
suite("prependTransport", () => {
function makeBase(
chunks: Uint8Array[] = [],
{ remoteAddr, localAddr, halfClose = true }: { remoteAddr?: string; localAddr?: string; halfClose?: boolean } = {},
): RawTransport & { written: Uint8Array[] } {
let offset = 0
let closed = false
const written: Uint8Array[] = []
const whenClosed = Promise.resolve()
const t: RawTransport & { written: Uint8Array[] } = {
written,
get closed() { return closed },
close() { closed = true },
async write(data) { written.push(data) },
async read() {
if (offset >= chunks.length) throw new Error("end of stream")
return chunks[offset++]
},
get readEnded() { return offset >= chunks.length },
get whenClosed() { return whenClosed },
remoteAddr,
localAddr,
}
if (halfClose) t.halfClose = () => {}
return t
}
test("read() returns prefix first then delegates to base", async () => {
const base = makeBase([enc.encode("BASE")])
const p = prependTransport(enc.encode("PRE"), base)
assert.strictEqual(dec.decode(await p.read()), "PRE")
assert.strictEqual(dec.decode(await p.read()), "BASE")
})
test("closed is false before prefix is sent", () => {
const base = makeBase()
const p = prependTransport(enc.encode("X"), base)
assert.strictEqual(p.closed, false)
})
test("closed reflects base.closed after prefix is sent", async () => {
const base = makeBase()
const p = prependTransport(enc.encode("X"), base)
await p.read()
base.close()
assert.strictEqual(p.closed, true)
})
test("close() delegates to base", () => {
const base = makeBase()
const p = prependTransport(enc.encode("X"), base)
p.close()
assert.strictEqual(base.closed, true)
})
test("readEnded is false before prefix is sent", () => {
const base = makeBase()
const p = prependTransport(enc.encode("X"), base)
assert.strictEqual(p.readEnded, false)
})
test("readEnded reflects base.readEnded after prefix is sent", async () => {
const base = makeBase()
const p = prependTransport(enc.encode("X"), base)
await p.read()
assert.strictEqual(p.readEnded, base.readEnded)
})
test("whenClosed delegates to base", () => {
const base = makeBase()
const p = prependTransport(enc.encode("X"), base)
assert.strictEqual(p.whenClosed, base.whenClosed)
})
test("remoteAddr delegates to base", () => {
const base = makeBase([], { remoteAddr: "1.2.3.4:5678" })
const p = prependTransport(enc.encode("X"), base)
assert.strictEqual(p.remoteAddr, "1.2.3.4:5678")
})
test("localAddr delegates to base", () => {
const base = makeBase([], { localAddr: "127.0.0.1:9999" })
const p = prependTransport(enc.encode("X"), base)
assert.strictEqual(p.localAddr, "127.0.0.1:9999")
})
test("write() delegates to base", async () => {
const base = makeBase()
const p = prependTransport(enc.encode("X"), base)
await p.write(enc.encode("hello"))
assert.strictEqual(base.written.length, 1)
assert.strictEqual(dec.decode(base.written[0]), "hello")
})
test("halfClose is present and delegates when base has it", () => {
let called = false
const base = makeBase()
base.halfClose = () => { called = true }
const p = prependTransport(enc.encode("X"), base)
assert.ok(p.halfClose, "halfClose should be present")
p.halfClose!()
assert.strictEqual(called, true)
})
test("halfClose is absent when base has no halfClose", () => {
const base = makeBase([], { halfClose: false })
const p = prependTransport(enc.encode("X"), base)
assert.strictEqual(p.halfClose, undefined)
})
})
+51 -1
View File
@@ -1,4 +1,4 @@
import type { Reader, Writer } from "./types.js"
import type { RawTransport, Reader, Writer } from "./types.js"
export type ReadBufferOptions = {
/**
@@ -117,6 +117,20 @@ export class ReadBuffer {
return text + this.#decoder.decode()
}
drain(): Uint8Array {
const len = this.len
if (!len) return new Uint8Array(0)
const slices = this.slice(0, len)
this.forward(len)
const buf = new Uint8Array(len)
let off = 0
for (const s of slices) {
buf.set(s, off)
off += s.length
}
return buf
}
async readLine(): Promise<string> {
const crIdx = await this.#readUntilByte(0x0d, this.#maxLineLength) // CR
if (crIdx === -1) throw new Error(`Line too long (exceeded ${this.#maxLineLength} bytes)`)
@@ -181,3 +195,39 @@ export class WriteBuffer {
while (this.#buffers.length) await this.flush()
}
}
export function prependTransport(prefix: Uint8Array, base: RawTransport): RawTransport {
let sent = false
const transport: RawTransport = {
get closed() {
return sent && base.closed
},
close() {
return base.close()
},
get readEnded() {
return sent && base.readEnded
},
get whenClosed() {
return base.whenClosed
},
get remoteAddr() {
return base.remoteAddr
},
get localAddr() {
return base.localAddr
},
write(data: Uint8Array) {
return base.write(data)
},
async read() {
if (!sent) {
sent = true
return prefix
}
return base.read()
},
}
if (base.halfClose) transport.halfClose = () => base.halfClose!()
return transport
}
+1
View File
@@ -20,6 +20,7 @@ export {
type ReadHeadersOptions,
type ReadableHttp,
type ServerConnectionOptions,
type ServerHijackOptions,
type ServerRequest,
type ServerResponse,
type SimpleBody,
+118
View File
@@ -392,5 +392,123 @@ suite("ServerConnection", () => {
})
assert.strictEqual(count, 2)
})
suite("hijack()", () => {
test("hijack sends response and returns raw transport", async () => {
const [serverTransport, clientTransport] = loopbackTransportPair()
const conn = new ServerConnection(serverTransport)
await clientTransport.write(
enc.encode("GET / HTTP/1.1\r\nHost: localhost\r\nUpgrade: proto\r\n\r\n"),
)
let hijackedTransport: RawTransport | undefined
await conn.handle(async ({ res }) => {
res.setStatus(101, "Switching Protocols")
res.setHeader("Upgrade", "proto")
res.setHeader("Connection", "Upgrade")
hijackedTransport = await res.hijack()
})
assert.ok(hijackedTransport, "hijack should return a transport")
const clientBuf = new ReadBuffer(clientTransport)
assert.strictEqual(await clientBuf.readLine(), "HTTP/1.1 101 Switching Protocols")
})
test("handle() does not close the transport after hijack", async () => {
const [serverTransport, clientTransport] = loopbackTransportPair()
const conn = new ServerConnection(serverTransport)
await clientTransport.write(
enc.encode("GET / HTTP/1.1\r\nHost: localhost\r\nUpgrade: proto\r\n\r\n"),
)
let hijackedTransport: RawTransport | undefined
await conn.handle(async ({ res }) => {
res.setStatus(101, "Switching Protocols")
res.setHeader("Upgrade", "proto")
res.setHeader("Connection", "Upgrade")
hijackedTransport = await res.hijack()
})
assert.ok(!hijackedTransport!.closed, "transport must still be open after hijack")
})
test("raw bytes written to hijacked transport reach the client", async () => {
const [serverTransport, clientTransport] = loopbackTransportPair()
const conn = new ServerConnection(serverTransport)
await clientTransport.write(
enc.encode("GET / HTTP/1.1\r\nHost: localhost\r\nUpgrade: proto\r\n\r\n"),
)
await conn.handle(async ({ res }) => {
res.setStatus(101, "Switching Protocols")
res.setHeader("Upgrade", "proto")
res.setHeader("Connection", "Upgrade")
const raw = await res.hijack()
await raw.write(enc.encode("HELLO"))
})
const clientBuf = new ReadBuffer(clientTransport)
await clientBuf.readLine() // status line
while ((await clientBuf.readLine()) !== "") {
/* skip headers */
}
await clientBuf.read(5)
const text = clientBuf
.slice(0, 5)
.map((c) => new TextDecoder().decode(c))
.join("")
assert.strictEqual(text, "HELLO")
})
test("hijack prevents further requests from being processed", async () => {
const [serverTransport, clientTransport] = loopbackTransportPair()
const conn = new ServerConnection(serverTransport)
const req1 = "GET / HTTP/1.1\r\nHost: localhost\r\nUpgrade: proto\r\n\r\n"
const req2 = "GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"
await clientTransport.write(enc.encode(req1 + req2))
let count = 0
await conn.handle(async ({ res }) => {
count++
res.setStatus(101, "Switching Protocols")
res.setHeader("Upgrade", "proto")
res.setHeader("Connection", "Upgrade")
await res.hijack()
})
assert.strictEqual(count, 1, "only the hijacking request should be processed")
})
test("hijack throws if called twice", async () => {
const [serverTransport, clientTransport] = loopbackTransportPair()
const conn = new ServerConnection(serverTransport)
await clientTransport.write(
enc.encode("GET / HTTP/1.1\r\nHost: localhost\r\nUpgrade: proto\r\n\r\n"),
)
await conn.handle(async ({ res }) => {
res.setStatus(101, "Switching Protocols")
res.setHeader("Upgrade", "proto")
res.setHeader("Connection", "Upgrade")
await res.hijack()
await assert.rejects(() => res.hijack(), /hijack not available/)
})
})
test("sendResponse: false skips sending response headers", async () => {
const [serverTransport, clientTransport] = loopbackTransportPair()
const conn = new ServerConnection(serverTransport)
await clientTransport.write(
enc.encode("GET / HTTP/1.1\r\nHost: localhost\r\nUpgrade: proto\r\n\r\n"),
)
await conn.handle(async ({ res }) => {
res.setStatus(101, "Switching Protocols")
res.setHeader("Upgrade", "proto")
res.setHeader("Connection", "Upgrade")
// Simulate the response already having been sent (e.g. via a 1xx mechanism)
const raw = await res.hijack({ sendResponse: false })
await raw.write(enc.encode("RAW"))
})
// With sendResponse: false the 101 headers must NOT appear; only the raw bytes follow
const clientBuf = new ReadBuffer(clientTransport)
await clientBuf.read(3)
const text = clientBuf
.slice(0, 3)
.map((c) => new TextDecoder().decode(c))
.join("")
assert.strictEqual(text, "RAW")
})
})
})
})
+32 -7
View File
@@ -1,7 +1,7 @@
import { ServerRequestImpl, ServerResponseImpl } from "./objects.js"
import type { RawTransport, Reader } from "../common/types.js"
import { statusCodeProperties } from "../common/spec.js"
import { ReadBuffer, WriteBuffer, type ReadBufferOptions } from "../common/buffer.js"
import { prependTransport, ReadBuffer, WriteBuffer, type ReadBufferOptions } from "../common/buffer.js"
import {
bodyReader,
readHeaders,
@@ -9,7 +9,7 @@ import {
type ReadHeadersOptions,
} from "../common/reader.js"
import { formatBody, sendBody, writeHeaders } from "../common/writer.js"
import type { Handler, ServerRequest, ServerResponse } from "./types.js"
import type { Handler, ServerHijackOptions, ServerRequest, ServerResponse } from "./types.js"
import { shouldClose } from "../common/connection.js"
import { TimeoutError, withTimeout } from "../common/utils.js"
@@ -39,6 +39,7 @@ export class ServerConnection {
#writeBuffer: WriteBuffer
#encoder: TextEncoder
#prevBody: Reader | null
#hijacked: boolean
#options: ServerConnectionOptions
@@ -48,10 +49,17 @@ export class ServerConnection {
this.#writeBuffer = new WriteBuffer(transport)
this.#encoder = new TextEncoder()
this.#prevBody = null
this.#hijacked = false
this.#options = options
}
#takeTransport(): RawTransport {
const prefix = this.#readBuffer.drain()
if (prefix.length > 0) return prependTransport(prefix, this.#transport)
return this.#transport
}
get #maxTargetLength(): number {
return this.#options.maxTargetLength ?? 1024 * 16
}
@@ -151,14 +159,31 @@ export class ServerConnection {
throw e
}
first = false
const res = new ServerResponseImpl(req)
let res!: ServerResponseImpl
const hijackFn = async (options?: ServerHijackOptions) => {
this.#hijacked = true
if (options?.sendResponse === false) {
await this.#writeBuffer.flushAll()
} else {
await this.send(res)
}
return this.#takeTransport()
}
res = new ServerResponseImpl(req, hijackFn)
try {
await handler({ req, res, transport: this.#transport })
} catch (e) {
console.error("[Connection.handle] caught error in handler", e)
res.setStatus(500)
res.body = "Internal Server Error"
if (!res.hijacked) {
console.error("[Connection.handle] caught error in handler", e)
res.setStatus(500)
res.body = "Internal Server Error"
}
}
if (res.hijacked) return
await this.send(res)
if (shouldClose(req, res)) {
@@ -171,7 +196,7 @@ export class ServerConnection {
// and go for another loop
}
} finally {
await this.close()
if (!this.#hijacked) await this.close()
}
}
+1
View File
@@ -9,6 +9,7 @@ export {
export { Server, type ListenOptions } from "./server.js"
export type {
RawListener,
ServerHijackOptions,
ServerRequest,
ServerResponse,
Context,
+20 -2
View File
@@ -3,7 +3,9 @@ import { ReadableHttpImpl, WritableHttpImpl } from "../common/objects.js"
import type { BodyReaderOptions } from "../common/reader.js"
import { statusCodes } from "../common/spec.js"
import type { RawTransport, Reader } from "../common/types.js"
import type { ServerRequest, ServerResponse } from "./types.js"
import type { ServerHijackOptions, ServerRequest, ServerResponse } from "./types.js"
type HijackFn = (options?: ServerHijackOptions) => Promise<RawTransport>
export class ServerRequestImpl extends ReadableHttpImpl implements ServerRequest {
#transport: RawTransport
@@ -66,12 +68,16 @@ export class ServerResponseImpl extends WritableHttpImpl implements ServerRespon
#request: ServerRequestImpl
#status: number
#statusText: string
#hijackFn: HijackFn | null
#hijacked: boolean
constructor(request: ServerRequestImpl) {
constructor(request: ServerRequestImpl, hijackFn: HijackFn | null = null) {
super()
this.#request = request
this.#status = 200
this.#statusText = statusCodes[200]
this.#hijackFn = hijackFn
this.#hijacked = false
}
get request(): ServerRequestImpl {
@@ -89,4 +95,16 @@ export class ServerResponseImpl extends WritableHttpImpl implements ServerRespon
this.#status = status
this.#statusText = statusText
}
get hijacked(): boolean {
return this.#hijacked
}
async hijack(options?: ServerHijackOptions): Promise<RawTransport> {
if (!this.#hijackFn) throw new Error("hijack not available on this response")
const fn = this.#hijackFn
this.#hijackFn = null
this.#hijacked = true
return fn(options)
}
}
+10
View File
@@ -26,12 +26,22 @@ export interface ServerRequest extends ReadableHttp {
readonly query: URLSearchParams
}
export type ServerHijackOptions = {
/**
* Whether to send the current response before handing off the connection.
* Set to false when the response has already been sent (e.g. via a 1xx interim response).
* @defaultValue true
*/
sendResponse?: boolean
}
export interface ServerResponse extends WritableHttp {
readonly request: ServerRequest
readonly status: number
readonly statusText: string
setStatus(status: number, statusText?: string): void
hijack(options?: ServerHijackOptions): Promise<RawTransport>
}
// eslint-disable-next-line @typescript-eslint/no-empty-object-type