Compare commits
6 Commits
f0ec13e9b7
...
9ebbd6ec2d
| Author | SHA1 | Date | |
|---|---|---|---|
| 9ebbd6ec2d | |||
| a952de815b | |||
| 873f392a79 | |||
| dfa4ab69cf | |||
| f905206441 | |||
| a70ba05ce3 |
@@ -73,3 +73,5 @@ The following were hand-written but with substantial Claude Code contributions:
|
||||
- **`packages/http` — package and build scaffolding**: initial `package.json`, `tsconfig.json`, and build configuration were set up by Claude Code
|
||||
|
||||
The test suite for `packages/http` was mostly generated by Claude Code, which also identified and fixed several further bugs: a body-size-limit error being swallowed on the final body chunk, a stale idle-connection entry left behind by `rejectConnection`, and a `read()` call hanging indefinitely after a socket close.
|
||||
|
||||
- **`packages/http` — `hijack()` on server and client responses**: the `hijack()` method on `ServerResponse` and `ClientResponse`, the `ReadBuffer.drain()` helper, and the `prependTransport()` utility were implemented by Claude Code
|
||||
|
||||
@@ -12,6 +12,7 @@ export {
|
||||
bodyReader,
|
||||
formatBody,
|
||||
hasOwnProperty,
|
||||
prependTransport,
|
||||
readHeaders,
|
||||
sendBody,
|
||||
withTimeout,
|
||||
|
||||
@@ -361,4 +361,123 @@ suite("ClientConnection", { skip: skipIfNotIntegration }, () => {
|
||||
void res1 // suppress unused warning
|
||||
})
|
||||
})
|
||||
|
||||
suite("hijack()", () => {
|
||||
test("hijack returns the raw transport on a 101 response", async () => {
|
||||
const { conn, serverWrite, serverBuf } = await makeClientServer()
|
||||
const req = new ClientRequestImpl({
|
||||
target: "/",
|
||||
method: "GET",
|
||||
headers: { Upgrade: "proto", Connection: "Upgrade" },
|
||||
})
|
||||
const responsePromise = conn.request(req)
|
||||
await drainHeaders(serverBuf)
|
||||
await serverWrite("HTTP/1.1 101 Switching Protocols\r\nUpgrade: proto\r\n\r\n")
|
||||
const res = await responsePromise
|
||||
assert.strictEqual(res.status, 101)
|
||||
const transport = res.hijack()
|
||||
assert.ok(transport, "hijack should return a transport")
|
||||
})
|
||||
|
||||
test("raw bytes from server arrive via hijacked transport", async () => {
|
||||
const { conn, serverWrite, serverBuf, serverTransport } = await makeClientServer()
|
||||
const req = new ClientRequestImpl({
|
||||
target: "/",
|
||||
method: "GET",
|
||||
headers: { Upgrade: "proto", Connection: "Upgrade" },
|
||||
})
|
||||
const responsePromise = conn.request(req)
|
||||
await drainHeaders(serverBuf)
|
||||
await serverWrite("HTTP/1.1 101 Switching Protocols\r\nUpgrade: proto\r\n\r\n")
|
||||
const res = await responsePromise
|
||||
const transport = res.hijack()
|
||||
await serverTransport.write(enc.encode("HELLO"))
|
||||
const chunk = await transport.read()
|
||||
assert.strictEqual(new TextDecoder().decode(chunk), "HELLO")
|
||||
})
|
||||
|
||||
test("pre-read bytes are prepended to the hijacked transport", async () => {
|
||||
const { conn, serverBuf, serverTransport } = await makeClientServer()
|
||||
const req = new ClientRequestImpl({
|
||||
target: "/",
|
||||
headers: { Upgrade: "proto", Connection: "Upgrade" },
|
||||
})
|
||||
const responsePromise = conn.request(req)
|
||||
await drainHeaders(serverBuf)
|
||||
// Send 101 response and raw protocol bytes in a single write so the HTTP
|
||||
// parser reads ahead into the protocol data.
|
||||
await serverTransport.write(
|
||||
enc.encode("HTTP/1.1 101 Switching Protocols\r\nUpgrade: proto\r\n\r\nHELLO"),
|
||||
)
|
||||
const res = await responsePromise
|
||||
const transport = res.hijack()
|
||||
const chunk = await transport.read()
|
||||
assert.strictEqual(new TextDecoder().decode(chunk), "HELLO")
|
||||
})
|
||||
|
||||
test("hijack marks connection as hijacked", async () => {
|
||||
const { conn, serverWrite, serverBuf } = await makeClientServer()
|
||||
const req = new ClientRequestImpl({ target: "/", headers: { Upgrade: "proto" } })
|
||||
const responsePromise = conn.request(req)
|
||||
await drainHeaders(serverBuf)
|
||||
await serverWrite("HTTP/1.1 101 Switching Protocols\r\nUpgrade: proto\r\n\r\n")
|
||||
const res = await responsePromise
|
||||
assert.strictEqual(conn.hijacked, false)
|
||||
res.hijack()
|
||||
assert.strictEqual(conn.hijacked, true)
|
||||
})
|
||||
|
||||
test("hijack throws if called twice", async () => {
|
||||
const { conn, serverWrite, serverBuf } = await makeClientServer()
|
||||
const req = new ClientRequestImpl({ target: "/", headers: { Upgrade: "proto" } })
|
||||
const responsePromise = conn.request(req)
|
||||
await drainHeaders(serverBuf)
|
||||
await serverWrite("HTTP/1.1 101 Switching Protocols\r\nUpgrade: proto\r\n\r\n")
|
||||
const res = await responsePromise
|
||||
res.hijack()
|
||||
assert.throws(() => res.hijack(), /hijack not available/)
|
||||
})
|
||||
|
||||
test("send() throws after hijack", async () => {
|
||||
const { conn, serverWrite, serverBuf } = await makeClientServer()
|
||||
const req = new ClientRequestImpl({ target: "/", headers: { Upgrade: "proto" } })
|
||||
const responsePromise = conn.request(req)
|
||||
await drainHeaders(serverBuf)
|
||||
await serverWrite("HTTP/1.1 101 Switching Protocols\r\nUpgrade: proto\r\n\r\n")
|
||||
const res = await responsePromise
|
||||
res.hijack()
|
||||
await assert.rejects(
|
||||
() => conn.send(new ClientRequestImpl({ target: "/" })),
|
||||
/Cannot use a hijacked connection/,
|
||||
)
|
||||
})
|
||||
|
||||
test("request() throws after hijack", async () => {
|
||||
const { conn, serverWrite, serverBuf } = await makeClientServer()
|
||||
const req = new ClientRequestImpl({ target: "/", headers: { Upgrade: "proto" } })
|
||||
const responsePromise = conn.request(req)
|
||||
await drainHeaders(serverBuf)
|
||||
await serverWrite("HTTP/1.1 101 Switching Protocols\r\nUpgrade: proto\r\n\r\n")
|
||||
const res = await responsePromise
|
||||
res.hijack()
|
||||
await assert.rejects(
|
||||
() => conn.request(new ClientRequestImpl({ target: "/" })),
|
||||
/Cannot use a hijacked connection/,
|
||||
)
|
||||
})
|
||||
|
||||
test("setHijackListener fires synchronously when hijack() is called", async () => {
|
||||
const { conn, serverWrite, serverBuf } = await makeClientServer()
|
||||
const req = new ClientRequestImpl({ target: "/", headers: { Upgrade: "proto" } })
|
||||
const responsePromise = conn.request(req)
|
||||
await drainHeaders(serverBuf)
|
||||
await serverWrite("HTTP/1.1 101 Switching Protocols\r\nUpgrade: proto\r\n\r\n")
|
||||
const res = await responsePromise
|
||||
let fired = false
|
||||
conn.setHijackListener(() => { fired = true })
|
||||
assert.strictEqual(fired, false)
|
||||
res.hijack()
|
||||
assert.strictEqual(fired, true)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { ReadBuffer, WriteBuffer, type ReadBufferOptions } from "../common/buffer.js"
|
||||
import { prependTransport, ReadBuffer, WriteBuffer, type ReadBufferOptions } from "../common/buffer.js"
|
||||
import { shouldClose } from "../common/connection.js"
|
||||
import {
|
||||
bodyReader,
|
||||
@@ -34,6 +34,8 @@ export class ClientConnection {
|
||||
#writeBuffer: WriteBuffer
|
||||
#encoder: TextEncoder
|
||||
#prevBody: BodyReader | null
|
||||
#hijacked: boolean
|
||||
#onHijack: (() => void) | null
|
||||
|
||||
#options: ClientConnectionOptions
|
||||
|
||||
@@ -43,16 +45,35 @@ export class ClientConnection {
|
||||
this.#writeBuffer = new WriteBuffer(transport)
|
||||
this.#encoder = new TextEncoder()
|
||||
this.#prevBody = null
|
||||
this.#hijacked = false
|
||||
this.#onHijack = null
|
||||
|
||||
this.#options = options
|
||||
}
|
||||
|
||||
get hijacked(): boolean {
|
||||
return this.#hijacked
|
||||
}
|
||||
|
||||
setHijackListener(cb: () => void): void {
|
||||
this.#onHijack = cb
|
||||
}
|
||||
|
||||
#takeTransport(): RawTransport {
|
||||
this.#onHijack?.()
|
||||
this.#onHijack = null
|
||||
const prefix = this.#readBuffer.drain()
|
||||
if (prefix.length > 0) return prependTransport(prefix, this.#transport)
|
||||
return this.#transport
|
||||
}
|
||||
|
||||
setOptions(options: ClientConnectionOptions = {}): void {
|
||||
this.#options = options
|
||||
this.#readBuffer.setOptions(options)
|
||||
}
|
||||
|
||||
async send(request: ClientRequest): Promise<void> {
|
||||
if (this.#hijacked) throw new Error("Cannot use a hijacked connection")
|
||||
const method = request.method.toUpperCase()
|
||||
const mp = hasOwnProperty(methods, method) ? methods[method as Method] : methods.PUT
|
||||
if (request.body !== null && mp.requestBody === false) {
|
||||
@@ -113,6 +134,10 @@ export class ClientConnection {
|
||||
headers,
|
||||
bodyStream,
|
||||
defaultBodyReaderOptions: this.#options,
|
||||
hijackFn: () => {
|
||||
this.#hijacked = true
|
||||
return this.#takeTransport()
|
||||
},
|
||||
})
|
||||
}
|
||||
parse(request: ClientRequest): Promise<ClientResponse> {
|
||||
@@ -120,6 +145,7 @@ export class ClientConnection {
|
||||
}
|
||||
|
||||
async #request(req: ClientRequest): Promise<ClientResponseImpl> {
|
||||
if (this.#hijacked) throw new Error("Cannot use a hijacked connection")
|
||||
if (this.#prevBody) {
|
||||
while (!this.#prevBody.closed) await this.#prevBody.read()
|
||||
this.#prevBody = null
|
||||
|
||||
@@ -197,6 +197,32 @@ suite("fetch()", { skip: skipIfNotIntegration }, () => {
|
||||
await serverDone
|
||||
})
|
||||
|
||||
test("hijack() works through fetch and correctly manages the pool", async () => {
|
||||
const [listener, dialer] = loopbackListener()
|
||||
const serverDone = (async () => {
|
||||
const transport = await listener.accept()
|
||||
const buf = new ReadBuffer(transport)
|
||||
while ((await buf.readLine()) !== "") {
|
||||
/* drain headers */
|
||||
}
|
||||
await transport.write(
|
||||
enc.encode("HTTP/1.1 101 Switching Protocols\r\nUpgrade: proto\r\n\r\n"),
|
||||
)
|
||||
listener.close()
|
||||
})()
|
||||
// Connection: upgrade prevents fetch() from overriding it with Connection: close
|
||||
const res = await fetch(dialer, {
|
||||
url: "http://localhost/",
|
||||
headers: { Connection: "upgrade" },
|
||||
})
|
||||
assert.strictEqual(res.status, 101)
|
||||
// hijack() fires the synchronous listener (done(true)), then onPrevBodyFinished
|
||||
// microtask fires done(false) which hits the poolDone guard and is a no-op
|
||||
const transport = res.hijack()
|
||||
assert.ok(transport, "hijack should return a transport")
|
||||
await serverDone
|
||||
})
|
||||
|
||||
test("rejectConnection is called when response body read fails (truncated body)", async () => {
|
||||
// Server sends 2 of 10 promised bytes then closes — body read fails mid-stream.
|
||||
// Use Connection: upgrade so rawFetch doesn't add Connection: close, keeping shouldClose=false.
|
||||
|
||||
@@ -63,14 +63,20 @@ async function rawFetch(
|
||||
)
|
||||
try {
|
||||
const result = await conn.request(req)
|
||||
conn.onPrevBodyFinished().then(
|
||||
() => {
|
||||
pool.releaseConnection(url.hostname, port, url.protocol === "https:", conn)
|
||||
},
|
||||
() => {
|
||||
let poolDone = false
|
||||
const done = (reject: boolean) => {
|
||||
if (poolDone) return
|
||||
poolDone = true
|
||||
if (reject) {
|
||||
pool.rejectConnection(url.hostname, port, url.protocol === "https:", conn)
|
||||
},
|
||||
)
|
||||
} else {
|
||||
pool.releaseConnection(url.hostname, port, url.protocol === "https:", conn)
|
||||
}
|
||||
}
|
||||
// Synchronous path: hijack() fires done(true) immediately, regardless of
|
||||
// when it is called relative to the onPrevBodyFinished microtask.
|
||||
conn.setHijackListener(() => done(true))
|
||||
conn.onPrevBodyFinished().then(() => done(false), () => done(true))
|
||||
return result
|
||||
} catch (e) {
|
||||
pool.rejectConnection(url.hostname, port, url.protocol === "https:", conn)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { ReadableHttpImpl, WritableHttpImpl } from "../common/objects.js"
|
||||
import type { BodyReaderOptions } from "../common/reader.js"
|
||||
import type { Reader } from "../common/types.js"
|
||||
import type { RawTransport, Reader } from "../common/types.js"
|
||||
import type { ClientRequest, ClientRequestInit, ClientResponse } from "./types.js"
|
||||
import type { Headers } from "../common/headers.js"
|
||||
|
||||
@@ -21,6 +21,7 @@ export class ClientResponseImpl extends ReadableHttpImpl implements ClientRespon
|
||||
#version: "1.0" | "1.1"
|
||||
#status: number
|
||||
#statusText: string
|
||||
#hijackFn: (() => RawTransport) | null
|
||||
|
||||
constructor({
|
||||
version,
|
||||
@@ -29,6 +30,7 @@ export class ClientResponseImpl extends ReadableHttpImpl implements ClientRespon
|
||||
statusText,
|
||||
bodyStream,
|
||||
defaultBodyReaderOptions,
|
||||
hijackFn,
|
||||
}: {
|
||||
version: "1.0" | "1.1"
|
||||
headers: Headers
|
||||
@@ -36,11 +38,13 @@ export class ClientResponseImpl extends ReadableHttpImpl implements ClientRespon
|
||||
statusText: string
|
||||
bodyStream: Reader | null
|
||||
defaultBodyReaderOptions?: BodyReaderOptions
|
||||
hijackFn: () => RawTransport
|
||||
}) {
|
||||
super({ headers, bodyStream, defaultBodyReaderOptions })
|
||||
this.#version = version
|
||||
this.#status = status
|
||||
this.#statusText = statusText
|
||||
this.#hijackFn = hijackFn
|
||||
}
|
||||
|
||||
get version(): "1.0" | "1.1" {
|
||||
@@ -52,4 +56,11 @@ export class ClientResponseImpl extends ReadableHttpImpl implements ClientRespon
|
||||
get statusText(): string {
|
||||
return this.#statusText
|
||||
}
|
||||
|
||||
hijack(): RawTransport {
|
||||
if (!this.#hijackFn) throw new Error("hijack not available on this response")
|
||||
const fn = this.#hijackFn
|
||||
this.#hijackFn = null
|
||||
return fn()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ export interface ClientResponse extends ReadableHttp {
|
||||
readonly status: number
|
||||
readonly statusText: string
|
||||
readonly version: "1.0" | "1.1"
|
||||
hijack(): RawTransport
|
||||
}
|
||||
|
||||
export type ClientRequestInit = Pick<ClientRequest, "target"> &
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
export { ReadableHttpImpl, WritableHttpImpl } from "./objects.js"
|
||||
export { ReadBuffer, WriteBuffer } from "./buffer.js"
|
||||
export { prependTransport, ReadBuffer, WriteBuffer } from "./buffer.js"
|
||||
export { PairSync } from "./pair.js"
|
||||
export {
|
||||
BasicBodyReader,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import test, { suite } from "node:test"
|
||||
import assert from "node:assert"
|
||||
import { ReadBuffer, WriteBuffer } from "./buffer.js"
|
||||
import type { Reader, Writer } from "./types.js"
|
||||
import { prependTransport, ReadBuffer, WriteBuffer } from "./buffer.js"
|
||||
import type { RawTransport, Reader, Writer } from "./types.js"
|
||||
|
||||
const enc = new TextEncoder()
|
||||
const dec = new TextDecoder()
|
||||
@@ -298,3 +298,114 @@ suite("WriteBuffer", () => {
|
||||
assert.strictEqual(dec.decode(out), "hello world")
|
||||
})
|
||||
})
|
||||
|
||||
suite("prependTransport", () => {
|
||||
function makeBase(
|
||||
chunks: Uint8Array[] = [],
|
||||
{ remoteAddr, localAddr, halfClose = true }: { remoteAddr?: string; localAddr?: string; halfClose?: boolean } = {},
|
||||
): RawTransport & { written: Uint8Array[] } {
|
||||
let offset = 0
|
||||
let closed = false
|
||||
const written: Uint8Array[] = []
|
||||
const whenClosed = Promise.resolve()
|
||||
const t: RawTransport & { written: Uint8Array[] } = {
|
||||
written,
|
||||
get closed() { return closed },
|
||||
close() { closed = true },
|
||||
async write(data) { written.push(data) },
|
||||
async read() {
|
||||
if (offset >= chunks.length) throw new Error("end of stream")
|
||||
return chunks[offset++]
|
||||
},
|
||||
get readEnded() { return offset >= chunks.length },
|
||||
get whenClosed() { return whenClosed },
|
||||
remoteAddr,
|
||||
localAddr,
|
||||
}
|
||||
if (halfClose) t.halfClose = () => {}
|
||||
return t
|
||||
}
|
||||
|
||||
test("read() returns prefix first then delegates to base", async () => {
|
||||
const base = makeBase([enc.encode("BASE")])
|
||||
const p = prependTransport(enc.encode("PRE"), base)
|
||||
assert.strictEqual(dec.decode(await p.read()), "PRE")
|
||||
assert.strictEqual(dec.decode(await p.read()), "BASE")
|
||||
})
|
||||
|
||||
test("closed is false before prefix is sent", () => {
|
||||
const base = makeBase()
|
||||
const p = prependTransport(enc.encode("X"), base)
|
||||
assert.strictEqual(p.closed, false)
|
||||
})
|
||||
|
||||
test("closed reflects base.closed after prefix is sent", async () => {
|
||||
const base = makeBase()
|
||||
const p = prependTransport(enc.encode("X"), base)
|
||||
await p.read()
|
||||
base.close()
|
||||
assert.strictEqual(p.closed, true)
|
||||
})
|
||||
|
||||
test("close() delegates to base", () => {
|
||||
const base = makeBase()
|
||||
const p = prependTransport(enc.encode("X"), base)
|
||||
p.close()
|
||||
assert.strictEqual(base.closed, true)
|
||||
})
|
||||
|
||||
test("readEnded is false before prefix is sent", () => {
|
||||
const base = makeBase()
|
||||
const p = prependTransport(enc.encode("X"), base)
|
||||
assert.strictEqual(p.readEnded, false)
|
||||
})
|
||||
|
||||
test("readEnded reflects base.readEnded after prefix is sent", async () => {
|
||||
const base = makeBase()
|
||||
const p = prependTransport(enc.encode("X"), base)
|
||||
await p.read()
|
||||
assert.strictEqual(p.readEnded, base.readEnded)
|
||||
})
|
||||
|
||||
test("whenClosed delegates to base", () => {
|
||||
const base = makeBase()
|
||||
const p = prependTransport(enc.encode("X"), base)
|
||||
assert.strictEqual(p.whenClosed, base.whenClosed)
|
||||
})
|
||||
|
||||
test("remoteAddr delegates to base", () => {
|
||||
const base = makeBase([], { remoteAddr: "1.2.3.4:5678" })
|
||||
const p = prependTransport(enc.encode("X"), base)
|
||||
assert.strictEqual(p.remoteAddr, "1.2.3.4:5678")
|
||||
})
|
||||
|
||||
test("localAddr delegates to base", () => {
|
||||
const base = makeBase([], { localAddr: "127.0.0.1:9999" })
|
||||
const p = prependTransport(enc.encode("X"), base)
|
||||
assert.strictEqual(p.localAddr, "127.0.0.1:9999")
|
||||
})
|
||||
|
||||
test("write() delegates to base", async () => {
|
||||
const base = makeBase()
|
||||
const p = prependTransport(enc.encode("X"), base)
|
||||
await p.write(enc.encode("hello"))
|
||||
assert.strictEqual(base.written.length, 1)
|
||||
assert.strictEqual(dec.decode(base.written[0]), "hello")
|
||||
})
|
||||
|
||||
test("halfClose is present and delegates when base has it", () => {
|
||||
let called = false
|
||||
const base = makeBase()
|
||||
base.halfClose = () => { called = true }
|
||||
const p = prependTransport(enc.encode("X"), base)
|
||||
assert.ok(p.halfClose, "halfClose should be present")
|
||||
p.halfClose!()
|
||||
assert.strictEqual(called, true)
|
||||
})
|
||||
|
||||
test("halfClose is absent when base has no halfClose", () => {
|
||||
const base = makeBase([], { halfClose: false })
|
||||
const p = prependTransport(enc.encode("X"), base)
|
||||
assert.strictEqual(p.halfClose, undefined)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import type { Reader, Writer } from "./types.js"
|
||||
import type { RawTransport, Reader, Writer } from "./types.js"
|
||||
|
||||
export type ReadBufferOptions = {
|
||||
/**
|
||||
@@ -117,6 +117,20 @@ export class ReadBuffer {
|
||||
return text + this.#decoder.decode()
|
||||
}
|
||||
|
||||
drain(): Uint8Array {
|
||||
const len = this.len
|
||||
if (!len) return new Uint8Array(0)
|
||||
const slices = this.slice(0, len)
|
||||
this.forward(len)
|
||||
const buf = new Uint8Array(len)
|
||||
let off = 0
|
||||
for (const s of slices) {
|
||||
buf.set(s, off)
|
||||
off += s.length
|
||||
}
|
||||
return buf
|
||||
}
|
||||
|
||||
async readLine(): Promise<string> {
|
||||
const crIdx = await this.#readUntilByte(0x0d, this.#maxLineLength) // CR
|
||||
if (crIdx === -1) throw new Error(`Line too long (exceeded ${this.#maxLineLength} bytes)`)
|
||||
@@ -181,3 +195,39 @@ export class WriteBuffer {
|
||||
while (this.#buffers.length) await this.flush()
|
||||
}
|
||||
}
|
||||
|
||||
export function prependTransport(prefix: Uint8Array, base: RawTransport): RawTransport {
|
||||
let sent = false
|
||||
const transport: RawTransport = {
|
||||
get closed() {
|
||||
return sent && base.closed
|
||||
},
|
||||
close() {
|
||||
return base.close()
|
||||
},
|
||||
get readEnded() {
|
||||
return sent && base.readEnded
|
||||
},
|
||||
get whenClosed() {
|
||||
return base.whenClosed
|
||||
},
|
||||
get remoteAddr() {
|
||||
return base.remoteAddr
|
||||
},
|
||||
get localAddr() {
|
||||
return base.localAddr
|
||||
},
|
||||
write(data: Uint8Array) {
|
||||
return base.write(data)
|
||||
},
|
||||
async read() {
|
||||
if (!sent) {
|
||||
sent = true
|
||||
return prefix
|
||||
}
|
||||
return base.read()
|
||||
},
|
||||
}
|
||||
if (base.halfClose) transport.halfClose = () => base.halfClose!()
|
||||
return transport
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ export {
|
||||
type ReadHeadersOptions,
|
||||
type ReadableHttp,
|
||||
type ServerConnectionOptions,
|
||||
type ServerHijackOptions,
|
||||
type ServerRequest,
|
||||
type ServerResponse,
|
||||
type SimpleBody,
|
||||
|
||||
@@ -392,5 +392,123 @@ suite("ServerConnection", () => {
|
||||
})
|
||||
assert.strictEqual(count, 2)
|
||||
})
|
||||
|
||||
suite("hijack()", () => {
|
||||
test("hijack sends response and returns raw transport", async () => {
|
||||
const [serverTransport, clientTransport] = loopbackTransportPair()
|
||||
const conn = new ServerConnection(serverTransport)
|
||||
await clientTransport.write(
|
||||
enc.encode("GET / HTTP/1.1\r\nHost: localhost\r\nUpgrade: proto\r\n\r\n"),
|
||||
)
|
||||
let hijackedTransport: RawTransport | undefined
|
||||
await conn.handle(async ({ res }) => {
|
||||
res.setStatus(101, "Switching Protocols")
|
||||
res.setHeader("Upgrade", "proto")
|
||||
res.setHeader("Connection", "Upgrade")
|
||||
hijackedTransport = await res.hijack()
|
||||
})
|
||||
assert.ok(hijackedTransport, "hijack should return a transport")
|
||||
const clientBuf = new ReadBuffer(clientTransport)
|
||||
assert.strictEqual(await clientBuf.readLine(), "HTTP/1.1 101 Switching Protocols")
|
||||
})
|
||||
|
||||
test("handle() does not close the transport after hijack", async () => {
|
||||
const [serverTransport, clientTransport] = loopbackTransportPair()
|
||||
const conn = new ServerConnection(serverTransport)
|
||||
await clientTransport.write(
|
||||
enc.encode("GET / HTTP/1.1\r\nHost: localhost\r\nUpgrade: proto\r\n\r\n"),
|
||||
)
|
||||
let hijackedTransport: RawTransport | undefined
|
||||
await conn.handle(async ({ res }) => {
|
||||
res.setStatus(101, "Switching Protocols")
|
||||
res.setHeader("Upgrade", "proto")
|
||||
res.setHeader("Connection", "Upgrade")
|
||||
hijackedTransport = await res.hijack()
|
||||
})
|
||||
assert.ok(!hijackedTransport!.closed, "transport must still be open after hijack")
|
||||
})
|
||||
|
||||
test("raw bytes written to hijacked transport reach the client", async () => {
|
||||
const [serverTransport, clientTransport] = loopbackTransportPair()
|
||||
const conn = new ServerConnection(serverTransport)
|
||||
await clientTransport.write(
|
||||
enc.encode("GET / HTTP/1.1\r\nHost: localhost\r\nUpgrade: proto\r\n\r\n"),
|
||||
)
|
||||
await conn.handle(async ({ res }) => {
|
||||
res.setStatus(101, "Switching Protocols")
|
||||
res.setHeader("Upgrade", "proto")
|
||||
res.setHeader("Connection", "Upgrade")
|
||||
const raw = await res.hijack()
|
||||
await raw.write(enc.encode("HELLO"))
|
||||
})
|
||||
const clientBuf = new ReadBuffer(clientTransport)
|
||||
await clientBuf.readLine() // status line
|
||||
while ((await clientBuf.readLine()) !== "") {
|
||||
/* skip headers */
|
||||
}
|
||||
await clientBuf.read(5)
|
||||
const text = clientBuf
|
||||
.slice(0, 5)
|
||||
.map((c) => new TextDecoder().decode(c))
|
||||
.join("")
|
||||
assert.strictEqual(text, "HELLO")
|
||||
})
|
||||
|
||||
test("hijack prevents further requests from being processed", async () => {
|
||||
const [serverTransport, clientTransport] = loopbackTransportPair()
|
||||
const conn = new ServerConnection(serverTransport)
|
||||
const req1 = "GET / HTTP/1.1\r\nHost: localhost\r\nUpgrade: proto\r\n\r\n"
|
||||
const req2 = "GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"
|
||||
await clientTransport.write(enc.encode(req1 + req2))
|
||||
let count = 0
|
||||
await conn.handle(async ({ res }) => {
|
||||
count++
|
||||
res.setStatus(101, "Switching Protocols")
|
||||
res.setHeader("Upgrade", "proto")
|
||||
res.setHeader("Connection", "Upgrade")
|
||||
await res.hijack()
|
||||
})
|
||||
assert.strictEqual(count, 1, "only the hijacking request should be processed")
|
||||
})
|
||||
|
||||
test("hijack throws if called twice", async () => {
|
||||
const [serverTransport, clientTransport] = loopbackTransportPair()
|
||||
const conn = new ServerConnection(serverTransport)
|
||||
await clientTransport.write(
|
||||
enc.encode("GET / HTTP/1.1\r\nHost: localhost\r\nUpgrade: proto\r\n\r\n"),
|
||||
)
|
||||
await conn.handle(async ({ res }) => {
|
||||
res.setStatus(101, "Switching Protocols")
|
||||
res.setHeader("Upgrade", "proto")
|
||||
res.setHeader("Connection", "Upgrade")
|
||||
await res.hijack()
|
||||
await assert.rejects(() => res.hijack(), /hijack not available/)
|
||||
})
|
||||
})
|
||||
|
||||
test("sendResponse: false skips sending response headers", async () => {
|
||||
const [serverTransport, clientTransport] = loopbackTransportPair()
|
||||
const conn = new ServerConnection(serverTransport)
|
||||
await clientTransport.write(
|
||||
enc.encode("GET / HTTP/1.1\r\nHost: localhost\r\nUpgrade: proto\r\n\r\n"),
|
||||
)
|
||||
await conn.handle(async ({ res }) => {
|
||||
res.setStatus(101, "Switching Protocols")
|
||||
res.setHeader("Upgrade", "proto")
|
||||
res.setHeader("Connection", "Upgrade")
|
||||
// Simulate the response already having been sent (e.g. via a 1xx mechanism)
|
||||
const raw = await res.hijack({ sendResponse: false })
|
||||
await raw.write(enc.encode("RAW"))
|
||||
})
|
||||
// With sendResponse: false the 101 headers must NOT appear; only the raw bytes follow
|
||||
const clientBuf = new ReadBuffer(clientTransport)
|
||||
await clientBuf.read(3)
|
||||
const text = clientBuf
|
||||
.slice(0, 3)
|
||||
.map((c) => new TextDecoder().decode(c))
|
||||
.join("")
|
||||
assert.strictEqual(text, "RAW")
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { ServerRequestImpl, ServerResponseImpl } from "./objects.js"
|
||||
import type { RawTransport, Reader } from "../common/types.js"
|
||||
import { statusCodeProperties } from "../common/spec.js"
|
||||
import { ReadBuffer, WriteBuffer, type ReadBufferOptions } from "../common/buffer.js"
|
||||
import { prependTransport, ReadBuffer, WriteBuffer, type ReadBufferOptions } from "../common/buffer.js"
|
||||
import {
|
||||
bodyReader,
|
||||
readHeaders,
|
||||
@@ -9,7 +9,7 @@ import {
|
||||
type ReadHeadersOptions,
|
||||
} from "../common/reader.js"
|
||||
import { formatBody, sendBody, writeHeaders } from "../common/writer.js"
|
||||
import type { Handler, ServerRequest, ServerResponse } from "./types.js"
|
||||
import type { Handler, ServerHijackOptions, ServerRequest, ServerResponse } from "./types.js"
|
||||
import { shouldClose } from "../common/connection.js"
|
||||
import { TimeoutError, withTimeout } from "../common/utils.js"
|
||||
|
||||
@@ -39,6 +39,7 @@ export class ServerConnection {
|
||||
#writeBuffer: WriteBuffer
|
||||
#encoder: TextEncoder
|
||||
#prevBody: Reader | null
|
||||
#hijacked: boolean
|
||||
|
||||
#options: ServerConnectionOptions
|
||||
|
||||
@@ -48,10 +49,17 @@ export class ServerConnection {
|
||||
this.#writeBuffer = new WriteBuffer(transport)
|
||||
this.#encoder = new TextEncoder()
|
||||
this.#prevBody = null
|
||||
this.#hijacked = false
|
||||
|
||||
this.#options = options
|
||||
}
|
||||
|
||||
#takeTransport(): RawTransport {
|
||||
const prefix = this.#readBuffer.drain()
|
||||
if (prefix.length > 0) return prependTransport(prefix, this.#transport)
|
||||
return this.#transport
|
||||
}
|
||||
|
||||
get #maxTargetLength(): number {
|
||||
return this.#options.maxTargetLength ?? 1024 * 16
|
||||
}
|
||||
@@ -151,14 +159,31 @@ export class ServerConnection {
|
||||
throw e
|
||||
}
|
||||
first = false
|
||||
const res = new ServerResponseImpl(req)
|
||||
|
||||
let res!: ServerResponseImpl
|
||||
const hijackFn = async (options?: ServerHijackOptions) => {
|
||||
this.#hijacked = true
|
||||
if (options?.sendResponse === false) {
|
||||
await this.#writeBuffer.flushAll()
|
||||
} else {
|
||||
await this.send(res)
|
||||
}
|
||||
return this.#takeTransport()
|
||||
}
|
||||
res = new ServerResponseImpl(req, hijackFn)
|
||||
|
||||
try {
|
||||
await handler({ req, res, transport: this.#transport })
|
||||
} catch (e) {
|
||||
console.error("[Connection.handle] caught error in handler", e)
|
||||
res.setStatus(500)
|
||||
res.body = "Internal Server Error"
|
||||
if (!res.hijacked) {
|
||||
console.error("[Connection.handle] caught error in handler", e)
|
||||
res.setStatus(500)
|
||||
res.body = "Internal Server Error"
|
||||
}
|
||||
}
|
||||
|
||||
if (res.hijacked) return
|
||||
|
||||
await this.send(res)
|
||||
|
||||
if (shouldClose(req, res)) {
|
||||
@@ -171,7 +196,7 @@ export class ServerConnection {
|
||||
// and go for another loop
|
||||
}
|
||||
} finally {
|
||||
await this.close()
|
||||
if (!this.#hijacked) await this.close()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ export {
|
||||
export { Server, type ListenOptions } from "./server.js"
|
||||
export type {
|
||||
RawListener,
|
||||
ServerHijackOptions,
|
||||
ServerRequest,
|
||||
ServerResponse,
|
||||
Context,
|
||||
|
||||
@@ -3,7 +3,9 @@ import { ReadableHttpImpl, WritableHttpImpl } from "../common/objects.js"
|
||||
import type { BodyReaderOptions } from "../common/reader.js"
|
||||
import { statusCodes } from "../common/spec.js"
|
||||
import type { RawTransport, Reader } from "../common/types.js"
|
||||
import type { ServerRequest, ServerResponse } from "./types.js"
|
||||
import type { ServerHijackOptions, ServerRequest, ServerResponse } from "./types.js"
|
||||
|
||||
type HijackFn = (options?: ServerHijackOptions) => Promise<RawTransport>
|
||||
|
||||
export class ServerRequestImpl extends ReadableHttpImpl implements ServerRequest {
|
||||
#transport: RawTransport
|
||||
@@ -66,12 +68,16 @@ export class ServerResponseImpl extends WritableHttpImpl implements ServerRespon
|
||||
#request: ServerRequestImpl
|
||||
#status: number
|
||||
#statusText: string
|
||||
#hijackFn: HijackFn | null
|
||||
#hijacked: boolean
|
||||
|
||||
constructor(request: ServerRequestImpl) {
|
||||
constructor(request: ServerRequestImpl, hijackFn: HijackFn | null = null) {
|
||||
super()
|
||||
this.#request = request
|
||||
this.#status = 200
|
||||
this.#statusText = statusCodes[200]
|
||||
this.#hijackFn = hijackFn
|
||||
this.#hijacked = false
|
||||
}
|
||||
|
||||
get request(): ServerRequestImpl {
|
||||
@@ -89,4 +95,16 @@ export class ServerResponseImpl extends WritableHttpImpl implements ServerRespon
|
||||
this.#status = status
|
||||
this.#statusText = statusText
|
||||
}
|
||||
|
||||
get hijacked(): boolean {
|
||||
return this.#hijacked
|
||||
}
|
||||
|
||||
async hijack(options?: ServerHijackOptions): Promise<RawTransport> {
|
||||
if (!this.#hijackFn) throw new Error("hijack not available on this response")
|
||||
const fn = this.#hijackFn
|
||||
this.#hijackFn = null
|
||||
this.#hijacked = true
|
||||
return fn(options)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,12 +26,22 @@ export interface ServerRequest extends ReadableHttp {
|
||||
readonly query: URLSearchParams
|
||||
}
|
||||
|
||||
export type ServerHijackOptions = {
|
||||
/**
|
||||
* Whether to send the current response before handing off the connection.
|
||||
* Set to false when the response has already been sent (e.g. via a 1xx interim response).
|
||||
* @defaultValue true
|
||||
*/
|
||||
sendResponse?: boolean
|
||||
}
|
||||
|
||||
export interface ServerResponse extends WritableHttp {
|
||||
readonly request: ServerRequest
|
||||
|
||||
readonly status: number
|
||||
readonly statusText: string
|
||||
setStatus(status: number, statusText?: string): void
|
||||
hijack(options?: ServerHijackOptions): Promise<RawTransport>
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-empty-object-type
|
||||
|
||||
Reference in New Issue
Block a user