feat(taildrop): stream files via ReadableStream on send and receive

Send: accept a ReadableStreamDefaultReader instead of a Uint8Array.
jsStreamReader (new io.ReadCloser) awaits reader.read() Promises via the
channel+FuncOf pattern, feeding chunks directly to the HTTP PUT body.
No js.CopyBytesToGo of the full file.

Receive: openWaitingFile now returns a pull-based ReadableStream backed by
the Go io.ReadCloser (jsReadableStream helper). Each pull call reads up
to 64 KiB and enqueues a Uint8Array chunk; no io.ReadAll.

jsFileOps.OpenReader: JS now returns a ReadableStream instead of a
Uint8Array; Go wraps it in jsStreamReader for streaming delivery.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit was merged in pull request #11.
This commit is contained in:
2026-06-11 21:13:04 +00:00
parent 0df765eb60
commit 21d0f11d85
2 changed files with 130 additions and 25 deletions
+127 -22
View File
@@ -9,7 +9,6 @@
package main
import (
"bytes"
"encoding/json"
"errors"
"fmt"
@@ -81,9 +80,11 @@ func (i *jsIPN) listFileTargets() js.Value {
})
}
// sendFile sends data as filename to the peer identified by stableNodeID,
// sendFile sends stream as filename to the peer identified by stableNodeID,
// reporting progress via notifyOutgoingFiles callbacks roughly once per second.
func (i *jsIPN) sendFile(stableNodeID, filename string, data js.Value) js.Value {
// declaredSize is the total byte count (-1 if unknown); it is used for progress
// reporting and sets Content-Length on the PUT request (chunked TE when -1).
func (i *jsIPN) sendFile(stableNodeID, filename string, stream js.Value, declaredSize int) js.Value {
return makePromise(func() (any, error) {
ext, err := i.taildropExt()
if err != nil {
@@ -107,14 +108,15 @@ func (i *jsIPN) sendFile(stableNodeID, filename string, data js.Value) js.Value
if err != nil {
return nil, fmt.Errorf("bogus peer URL: %w", err)
}
b := make([]byte, data.Get("byteLength").Int())
js.CopyBytesToGo(b, data)
reader := stream.Call("getReader")
body := &jsStreamReader{reader: reader}
outgoing := &ipn.OutgoingFile{
ID: rands.HexString(30),
PeerID: tailcfg.StableNodeID(stableNodeID),
Name: filename,
DeclaredSize: int64(len(b)),
DeclaredSize: int64(declaredSize),
Started: time.Now(),
}
updates := map[string]*ipn.OutgoingFile{outgoing.ID: outgoing}
@@ -127,17 +129,17 @@ func (i *jsIPN) sendFile(stableNodeID, filename string, data js.Value) js.Value
ext.UpdateOutgoingFiles(updates)
}()
body := progresstracking.NewReader(bytes.NewReader(b), time.Second, func(n int, _ error) {
progressBody := progresstracking.NewReader(body, time.Second, func(n int, _ error) {
outgoing.Sent = int64(n)
ext.UpdateOutgoingFiles(updates)
})
req, err := http.NewRequest("PUT", dstURL.String()+"/v0/put/"+url.PathEscape(filename), body)
req, err := http.NewRequest("PUT", dstURL.String()+"/v0/put/"+url.PathEscape(filename), progressBody)
if err != nil {
sendErr = err
return nil, err
}
req.ContentLength = int64(len(b))
req.ContentLength = int64(declaredSize)
client := &http.Client{Transport: i.lb.Dialer().PeerAPITransport()}
resp, err := client.Do(req)
if err != nil {
@@ -147,7 +149,13 @@ func (i *jsIPN) sendFile(stableNodeID, filename string, data js.Value) js.Value
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
respBody, _ := io.ReadAll(resp.Body)
sendErr = fmt.Errorf("send file: %s: %s", resp.Status, bytes.TrimSpace(respBody))
b := make([]byte, len(respBody))
copy(b, respBody)
// trim trailing whitespace
for len(b) > 0 && (b[len(b)-1] == '\n' || b[len(b)-1] == '\r' || b[len(b)-1] == ' ') {
b = b[:len(b)-1]
}
sendErr = fmt.Errorf("send file: %s: %s", resp.Status, b)
return nil, sendErr
}
return nil, nil
@@ -182,7 +190,8 @@ func (i *jsIPN) waitingFiles() js.Value {
})
}
// openWaitingFile returns the contents of a received file as a Uint8Array.
// openWaitingFile returns the contents of a received file as a ReadableStream.
// The stream emits Uint8Array chunks and closes when the file is fully read.
func (i *jsIPN) openWaitingFile(name string) js.Value {
return makePromise(func() (any, error) {
ext, err := i.taildropExt()
@@ -193,14 +202,7 @@ func (i *jsIPN) openWaitingFile(name string) js.Value {
if err != nil {
return nil, err
}
defer rc.Close()
data, err := io.ReadAll(rc)
if err != nil {
return nil, err
}
buf := js.Global().Get("Uint8Array").New(len(data))
js.CopyBytesToJS(buf, data)
return buf, nil
return jsReadableStream(rc), nil
})
}
@@ -234,6 +236,109 @@ func wireTaildropFileOps(lb *ipnlocal.LocalBackend, jsObj js.Value) {
ext.SetStagedFileOps(&jsFileOps{v: jsObj})
}
// jsStreamReader implements io.ReadCloser by pulling chunks from a JS
// ReadableStreamDefaultReader. Each Read call awaits one reader.read() Promise,
// using the channel+FuncOf pattern so Go blocks until JS delivers the chunk.
type jsStreamReader struct {
reader js.Value
buf []byte
done bool
}
func (r *jsStreamReader) Read(p []byte) (int, error) {
if r.done {
return 0, io.EOF
}
if len(r.buf) > 0 {
n := copy(p, r.buf)
r.buf = r.buf[n:]
return n, nil
}
type chunkResult struct {
data []byte
done bool
}
ch := make(chan chunkResult, 1)
thenFn := js.FuncOf(func(this js.Value, args []js.Value) any {
result := args[0]
if result.Get("done").Bool() {
ch <- chunkResult{done: true}
} else {
value := result.Get("value")
b := make([]byte, value.Get("byteLength").Int())
js.CopyBytesToGo(b, value)
ch <- chunkResult{data: b}
}
return nil
})
defer thenFn.Release()
r.reader.Call("read").Call("then", thenFn)
result := <-ch
if result.done {
r.done = true
return 0, io.EOF
}
n := copy(p, result.data)
r.buf = result.data[n:]
return n, nil
}
func (r *jsStreamReader) Close() error {
r.reader.Call("cancel")
return nil
}
// jsReadableStream wraps rc in a pull-based JS ReadableStream. Each pull call
// reads up to 64 KiB from rc and enqueues a Uint8Array chunk; the stream
// closes on EOF or signals an error on any other read failure.
func jsReadableStream(rc io.ReadCloser) js.Value {
var pullFn, cancelFn js.Func
cancelFn = js.FuncOf(func(this js.Value, args []js.Value) any {
rc.Close()
pullFn.Release()
cancelFn.Release()
return nil
})
pullFn = js.FuncOf(func(this js.Value, args []js.Value) any {
controller := args[0]
var execFn js.Func
execFn = js.FuncOf(func(this js.Value, rr []js.Value) any {
resolve := rr[0]
go func() {
defer execFn.Release()
buf := make([]byte, 65536)
n, err := rc.Read(buf)
if n > 0 {
chunk := js.Global().Get("Uint8Array").New(n)
js.CopyBytesToJS(chunk, buf[:n])
controller.Call("enqueue", chunk)
}
if err == io.EOF {
rc.Close()
pullFn.Release()
cancelFn.Release()
controller.Call("close")
} else if err != nil {
rc.Close()
pullFn.Release()
cancelFn.Release()
controller.Call("error", err.Error())
}
resolve.Invoke()
}()
return nil
})
return js.Global().Get("Promise").New(execFn)
})
return js.Global().Get("ReadableStream").New(map[string]any{
"pull": pullFn,
"cancel": cancelFn,
})
}
// jsFileOps implements [taildrop.FileOps] by delegating to JS callbacks.
// JS methods use one of two callback conventions:
//
@@ -385,9 +490,9 @@ func (j jsFileOps) OpenReader(name string) (io.ReadCloser, error) {
if err != nil {
return nil, err
}
b := make([]byte, val.Get("byteLength").Int())
js.CopyBytesToGo(b, val)
return io.NopCloser(bytes.NewReader(b)), nil
// val is a ReadableStream; wrap its reader for streaming delivery to Go.
reader := val.Call("getReader")
return &jsStreamReader{reader: reader}, nil
}
// jsFileInfo is a minimal [fs.FileInfo] backed by a name and a size.
+3 -3
View File
@@ -281,11 +281,11 @@ func newIPN(jsConfig js.Value) map[string]any {
return jsIPN.listFileTargets()
}),
"sendFile": js.FuncOf(func(this js.Value, args []js.Value) any {
if len(args) != 3 {
log.Printf("Usage: sendFile(stableNodeID, filename, data)")
if len(args) != 4 {
log.Printf("Usage: sendFile(stableNodeID, filename, stream, declaredSize)")
return nil
}
return jsIPN.sendFile(args[0].String(), args[1].String(), args[2])
return jsIPN.sendFile(args[0].String(), args[1].String(), args[2], args[3].Int())
}),
"waitingFiles": js.FuncOf(func(this js.Value, args []js.Value) any {
return jsIPN.waitingFiles()