Files
webnet/packages/http/src/common/reader.ts
T

227 lines
6.6 KiB
TypeScript

import type { ReadBuffer } from "./buffer.js"
import type { Reader } from "./types.js"
import { Headers } from "./headers.js"
import { hasOwnProperty } from "./utils.js"
export interface BodyReader extends Reader {
onFinish(
okHandler?: () => void | Promise<void>,
koHandler?: (err: unknown) => void | Promise<void>,
anyHandler?: () => void | Promise<void>,
): void
}
export type BodyReaderOptions = {
/**
* @defaultValue Infinity
*/
maxBodyLength?: number
}
abstract class BaseBodyReader implements Pick<BodyReader, "onFinish"> {
#finishOkHandlers: (() => void | Promise<void>)[]
#finishKoHandlers: ((err: unknown) => void | Promise<void>)[]
constructor() {
this.#finishOkHandlers = []
this.#finishKoHandlers = []
}
onFinish(
okHandler?: () => void | Promise<void>,
koHandler?: (err: unknown) => void | Promise<void>,
anyHandler?: () => void | Promise<void>,
): void {
if (okHandler) this.#finishOkHandlers.push(okHandler)
if (koHandler) this.#finishKoHandlers.push(koHandler)
if (anyHandler) {
this.#finishOkHandlers.push(anyHandler)
this.#finishKoHandlers.push(anyHandler)
}
}
async _triggerFinishOk(): Promise<void> {
const snapshot = [...this.#finishOkHandlers]
this.#finishOkHandlers = []
this.#finishKoHandlers = []
for (const handler of snapshot) {
try {
await handler()
} catch (e) {
console.error(e)
}
}
}
async _triggerFinishKo(err: unknown): Promise<void> {
const snapshot = [...this.#finishKoHandlers]
this.#finishOkHandlers = []
this.#finishKoHandlers = []
for (const handler of snapshot) {
try {
await handler(err)
} catch (e) {
console.error(e)
}
}
}
}
export class BasicBodyReader extends BaseBodyReader implements BodyReader {
#remaining: number
#buffer: ReadBuffer
#chunks: Uint8Array[]
constructor(buffer: ReadBuffer, totalLength: number) {
super()
this.#buffer = buffer
this.#remaining = totalLength
this.#chunks = []
}
get closed(): boolean {
if (this.#buffer.ended) return true
return this.#remaining === 0
}
async read(): Promise<Uint8Array> {
try {
if (this.#chunks.length) {
const chunk = this.#chunks.shift()!
if (!this.#chunks.length && !this.#remaining) await this._triggerFinishOk()
return chunk
}
if (this.closed) throw new Error("BasicBodyReader is already closed")
while (!this.#buffer.len) await this.#buffer.readOnce()
const len = Math.min(this.#buffer.len, this.#remaining)
this.#remaining -= len
const data = this.#buffer.slice(0, len)
this.#buffer.forward(len)
for (const chunk of data) this.#chunks.push(chunk)
const chunk = this.#chunks.shift()!
if (!this.#chunks.length && !this.#remaining) await this._triggerFinishOk()
return chunk
} catch (e) {
await this._triggerFinishKo(e)
throw e
}
}
}
export class ChunkedBodyReader extends BaseBodyReader implements BodyReader {
#buffer: ReadBuffer
#chunks: Uint8Array[]
#finished: boolean
#bodyLength: number
#maxBodyLength: number
constructor(buffer: ReadBuffer, { maxBodyLength = Infinity }: BodyReaderOptions = {}) {
super()
this.#buffer = buffer
this.#chunks = []
this.#finished = false
this.#bodyLength = 0
this.#maxBodyLength = maxBodyLength
}
get closed(): boolean {
return this.#buffer.ended || this.#finished
}
async read(): Promise<Uint8Array> {
try {
if (this.#chunks.length) return this.#chunks.shift()!
if (this.closed) throw new Error("ChunkedBodyReader is already closed")
const chunkLine = await this.#buffer.readLine()
const len = Number.parseInt(chunkLine, 16)
if (!Number.isInteger(len) || len < 0)
throw new Error(`Invalid chunk size: ${JSON.stringify(chunkLine)}`)
this.#bodyLength += len
if (this.#bodyLength > this.#maxBodyLength)
throw new Error(`Body too large: ${this.#bodyLength} (max: ${this.#maxBodyLength})`)
if (!len) {
this.#finished = true
await this.#buffer.read(2)
this.#buffer.forward(2)
await this._triggerFinishOk()
return new Uint8Array(0)
}
await this.#buffer.read(len + 2)
if (len > this.#buffer.len)
throw new Error(`Chunk declared ${len} bytes but buffer only has ${this.#buffer.len}`)
const data = this.#buffer.slice(0, len)
this.#buffer.forward(len + 2)
for (const chunk of data) this.#chunks.push(chunk)
return this.#chunks.shift()!
} catch (e) {
await this._triggerFinishKo(e)
throw e
}
}
}
export type ReadHeadersOptions = {
/**
* @defaultValue 256
*/
maxHeaderCount?: number
/**
* @defaultValue Infinity
*/
maxTotalHeaderSize?: number
}
export async function readHeaders(
buffer: ReadBuffer,
{ maxHeaderCount = 256, maxTotalHeaderSize = Infinity }: ReadHeadersOptions = {},
): Promise<Headers> {
const rawHeaders: Record<string, string[]> = {}
let headerCount = 0
let totalHeaderSize = 0
while (true) {
const headerLine = await buffer.readLine()
if (!headerLine.length) {
break
}
headerCount++
if (headerCount > maxHeaderCount)
throw new Error(`Too many headers: ${headerCount} (max: ${maxHeaderCount})`)
totalHeaderSize += headerLine.length
if (totalHeaderSize > maxTotalHeaderSize)
throw new Error(
`Total max header size exceeded: ${totalHeaderSize} (max: ${maxTotalHeaderSize})`,
)
const colon = headerLine.indexOf(":")
if (colon === -1) throw new Error(`Invalid header line (no colon): ${headerLine.slice(0, 80)}`)
const key = headerLine.slice(0, colon)
const value = headerLine.slice(colon + 1).trim()
if (!hasOwnProperty(rawHeaders, key)) rawHeaders[key] = []
rawHeaders[key].push(value)
}
return new Headers(
Object.fromEntries(Object.entries(rawHeaders).map(([k, v]) => [k, v.length === 1 ? v[0] : v])),
)
}
export function bodyReader(
buffer: ReadBuffer,
headers: Headers,
{ maxBodyLength = Infinity }: BodyReaderOptions = {},
): BodyReader | null {
if (headers.get("Transfer-Encoding") === "chunked") {
return new ChunkedBodyReader(buffer, { maxBodyLength })
} else if (typeof headers.get("Content-Length") === "string") {
const len = +(headers.get("Content-Length") as string)
if (!Number.isInteger(len) || len < 0)
throw new Error(`Content-Length is invalid: ${headers.get("Content-Length")}`)
if (len > maxBodyLength)
throw new Error(`Content-Length is too big: ${len} (max: ${maxBodyLength})`)
return new BasicBodyReader(buffer, len)
} else {
return null
}
}