From 21d0f11d85a8eafb55c4b02aa0545dbe9315c5cc Mon Sep 17 00:00:00 2001 From: Codinget Date: Thu, 11 Jun 2026 21:13:04 +0000 Subject: [PATCH] feat(taildrop): stream files via ReadableStream on send and receive Send: accept a ReadableStreamDefaultReader instead of a Uint8Array. jsStreamReader (new io.ReadCloser) awaits reader.read() Promises via the channel+FuncOf pattern, feeding chunks directly to the HTTP PUT body. No js.CopyBytesToGo of the full file. Receive: openWaitingFile now returns a pull-based ReadableStream backed by the Go io.ReadCloser (jsReadableStream helper). Each pull call reads up to 64 KiB and enqueues a Uint8Array chunk; no io.ReadAll. jsFileOps.OpenReader: JS now returns a ReadableStream instead of a Uint8Array; Go wraps it in jsStreamReader for streaming delivery. Co-Authored-By: Claude Sonnet 4.6 --- cmd/tsconnect/wasm/taildrop.go | 149 ++++++++++++++++++++++++++++----- cmd/tsconnect/wasm/wasm_js.go | 6 +- 2 files changed, 130 insertions(+), 25 deletions(-) diff --git a/cmd/tsconnect/wasm/taildrop.go b/cmd/tsconnect/wasm/taildrop.go index 8115f1723..fdcc4f7d0 100644 --- a/cmd/tsconnect/wasm/taildrop.go +++ b/cmd/tsconnect/wasm/taildrop.go @@ -9,7 +9,6 @@ package main import ( - "bytes" "encoding/json" "errors" "fmt" @@ -81,9 +80,11 @@ func (i *jsIPN) listFileTargets() js.Value { }) } -// sendFile sends data as filename to the peer identified by stableNodeID, +// sendFile sends stream as filename to the peer identified by stableNodeID, // reporting progress via notifyOutgoingFiles callbacks roughly once per second. -func (i *jsIPN) sendFile(stableNodeID, filename string, data js.Value) js.Value { +// declaredSize is the total byte count (-1 if unknown); it is used for progress +// reporting and sets Content-Length on the PUT request (chunked TE when -1). +func (i *jsIPN) sendFile(stableNodeID, filename string, stream js.Value, declaredSize int) js.Value { return makePromise(func() (any, error) { ext, err := i.taildropExt() if err != nil { @@ -107,14 +108,15 @@ func (i *jsIPN) sendFile(stableNodeID, filename string, data js.Value) js.Value if err != nil { return nil, fmt.Errorf("bogus peer URL: %w", err) } - b := make([]byte, data.Get("byteLength").Int()) - js.CopyBytesToGo(b, data) + + reader := stream.Call("getReader") + body := &jsStreamReader{reader: reader} outgoing := &ipn.OutgoingFile{ ID: rands.HexString(30), PeerID: tailcfg.StableNodeID(stableNodeID), Name: filename, - DeclaredSize: int64(len(b)), + DeclaredSize: int64(declaredSize), Started: time.Now(), } updates := map[string]*ipn.OutgoingFile{outgoing.ID: outgoing} @@ -127,17 +129,17 @@ func (i *jsIPN) sendFile(stableNodeID, filename string, data js.Value) js.Value ext.UpdateOutgoingFiles(updates) }() - body := progresstracking.NewReader(bytes.NewReader(b), time.Second, func(n int, _ error) { + progressBody := progresstracking.NewReader(body, time.Second, func(n int, _ error) { outgoing.Sent = int64(n) ext.UpdateOutgoingFiles(updates) }) - req, err := http.NewRequest("PUT", dstURL.String()+"/v0/put/"+url.PathEscape(filename), body) + req, err := http.NewRequest("PUT", dstURL.String()+"/v0/put/"+url.PathEscape(filename), progressBody) if err != nil { sendErr = err return nil, err } - req.ContentLength = int64(len(b)) + req.ContentLength = int64(declaredSize) client := &http.Client{Transport: i.lb.Dialer().PeerAPITransport()} resp, err := client.Do(req) if err != nil { @@ -147,7 +149,13 @@ func (i *jsIPN) sendFile(stableNodeID, filename string, data js.Value) js.Value defer resp.Body.Close() if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { respBody, _ := io.ReadAll(resp.Body) - sendErr = fmt.Errorf("send file: %s: %s", resp.Status, bytes.TrimSpace(respBody)) + b := make([]byte, len(respBody)) + copy(b, respBody) + // trim trailing whitespace + for len(b) > 0 && (b[len(b)-1] == '\n' || b[len(b)-1] == '\r' || b[len(b)-1] == ' ') { + b = b[:len(b)-1] + } + sendErr = fmt.Errorf("send file: %s: %s", resp.Status, b) return nil, sendErr } return nil, nil @@ -182,7 +190,8 @@ func (i *jsIPN) waitingFiles() js.Value { }) } -// openWaitingFile returns the contents of a received file as a Uint8Array. +// openWaitingFile returns the contents of a received file as a ReadableStream. +// The stream emits Uint8Array chunks and closes when the file is fully read. func (i *jsIPN) openWaitingFile(name string) js.Value { return makePromise(func() (any, error) { ext, err := i.taildropExt() @@ -193,14 +202,7 @@ func (i *jsIPN) openWaitingFile(name string) js.Value { if err != nil { return nil, err } - defer rc.Close() - data, err := io.ReadAll(rc) - if err != nil { - return nil, err - } - buf := js.Global().Get("Uint8Array").New(len(data)) - js.CopyBytesToJS(buf, data) - return buf, nil + return jsReadableStream(rc), nil }) } @@ -234,6 +236,109 @@ func wireTaildropFileOps(lb *ipnlocal.LocalBackend, jsObj js.Value) { ext.SetStagedFileOps(&jsFileOps{v: jsObj}) } +// jsStreamReader implements io.ReadCloser by pulling chunks from a JS +// ReadableStreamDefaultReader. Each Read call awaits one reader.read() Promise, +// using the channel+FuncOf pattern so Go blocks until JS delivers the chunk. +type jsStreamReader struct { + reader js.Value + buf []byte + done bool +} + +func (r *jsStreamReader) Read(p []byte) (int, error) { + if r.done { + return 0, io.EOF + } + if len(r.buf) > 0 { + n := copy(p, r.buf) + r.buf = r.buf[n:] + return n, nil + } + type chunkResult struct { + data []byte + done bool + } + ch := make(chan chunkResult, 1) + thenFn := js.FuncOf(func(this js.Value, args []js.Value) any { + result := args[0] + if result.Get("done").Bool() { + ch <- chunkResult{done: true} + } else { + value := result.Get("value") + b := make([]byte, value.Get("byteLength").Int()) + js.CopyBytesToGo(b, value) + ch <- chunkResult{data: b} + } + return nil + }) + defer thenFn.Release() + r.reader.Call("read").Call("then", thenFn) + result := <-ch + if result.done { + r.done = true + return 0, io.EOF + } + n := copy(p, result.data) + r.buf = result.data[n:] + return n, nil +} + +func (r *jsStreamReader) Close() error { + r.reader.Call("cancel") + return nil +} + +// jsReadableStream wraps rc in a pull-based JS ReadableStream. Each pull call +// reads up to 64 KiB from rc and enqueues a Uint8Array chunk; the stream +// closes on EOF or signals an error on any other read failure. +func jsReadableStream(rc io.ReadCloser) js.Value { + var pullFn, cancelFn js.Func + + cancelFn = js.FuncOf(func(this js.Value, args []js.Value) any { + rc.Close() + pullFn.Release() + cancelFn.Release() + return nil + }) + + pullFn = js.FuncOf(func(this js.Value, args []js.Value) any { + controller := args[0] + var execFn js.Func + execFn = js.FuncOf(func(this js.Value, rr []js.Value) any { + resolve := rr[0] + go func() { + defer execFn.Release() + buf := make([]byte, 65536) + n, err := rc.Read(buf) + if n > 0 { + chunk := js.Global().Get("Uint8Array").New(n) + js.CopyBytesToJS(chunk, buf[:n]) + controller.Call("enqueue", chunk) + } + if err == io.EOF { + rc.Close() + pullFn.Release() + cancelFn.Release() + controller.Call("close") + } else if err != nil { + rc.Close() + pullFn.Release() + cancelFn.Release() + controller.Call("error", err.Error()) + } + resolve.Invoke() + }() + return nil + }) + return js.Global().Get("Promise").New(execFn) + }) + + return js.Global().Get("ReadableStream").New(map[string]any{ + "pull": pullFn, + "cancel": cancelFn, + }) +} + // jsFileOps implements [taildrop.FileOps] by delegating to JS callbacks. // JS methods use one of two callback conventions: // @@ -385,9 +490,9 @@ func (j jsFileOps) OpenReader(name string) (io.ReadCloser, error) { if err != nil { return nil, err } - b := make([]byte, val.Get("byteLength").Int()) - js.CopyBytesToGo(b, val) - return io.NopCloser(bytes.NewReader(b)), nil + // val is a ReadableStream; wrap its reader for streaming delivery to Go. + reader := val.Call("getReader") + return &jsStreamReader{reader: reader}, nil } // jsFileInfo is a minimal [fs.FileInfo] backed by a name and a size. diff --git a/cmd/tsconnect/wasm/wasm_js.go b/cmd/tsconnect/wasm/wasm_js.go index d37921211..bd42564cb 100644 --- a/cmd/tsconnect/wasm/wasm_js.go +++ b/cmd/tsconnect/wasm/wasm_js.go @@ -281,11 +281,11 @@ func newIPN(jsConfig js.Value) map[string]any { return jsIPN.listFileTargets() }), "sendFile": js.FuncOf(func(this js.Value, args []js.Value) any { - if len(args) != 3 { - log.Printf("Usage: sendFile(stableNodeID, filename, data)") + if len(args) != 4 { + log.Printf("Usage: sendFile(stableNodeID, filename, stream, declaredSize)") return nil } - return jsIPN.sendFile(args[0].String(), args[1].String(), args[2]) + return jsIPN.sendFile(args[0].String(), args[1].String(), args[2], args[3].Int()) }), "waitingFiles": js.FuncOf(func(this js.Value, args []js.Value) any { return jsIPN.waitingFiles()