2 Commits

Author SHA1 Message Date
codinget 6e83d5291b feat(tsconnect/wasm): add shutdown() to jsIPN
Expose a shutdown() method on the JS-side IPN object that stops the
LocalBackend, closes the safesocket listener (which unblocks srv.Run),
and signals main() to return so the Go runtime exits cleanly.

This allows the host environment (Node.js process or browser service
worker) to terminate normally once the Tailscale WASM module is no
longer needed, instead of being kept alive indefinitely by open handles,
goroutines, or the Go runtime's blocking main goroutine.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 00:22:56 +00:00
codinget 21d0f11d85 feat(taildrop): stream files via ReadableStream on send and receive
Send: accept a ReadableStreamDefaultReader instead of a Uint8Array.
jsStreamReader (new io.ReadCloser) awaits reader.read() Promises via the
channel+FuncOf pattern, feeding chunks directly to the HTTP PUT body.
No js.CopyBytesToGo of the full file.

Receive: openWaitingFile now returns a pull-based ReadableStream backed by
the Go io.ReadCloser (jsReadableStream helper). Each pull call reads up
to 64 KiB and enqueues a Uint8Array chunk; no io.ReadAll.

jsFileOps.OpenReader: JS now returns a ReadableStream instead of a
Uint8Array; Go wraps it in jsStreamReader for streaming delivery.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-11 21:13:04 +00:00
2 changed files with 167 additions and 37 deletions
+127 -22
View File
@@ -9,7 +9,6 @@
package main package main
import ( import (
"bytes"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@@ -81,9 +80,11 @@ func (i *jsIPN) listFileTargets() js.Value {
}) })
} }
// sendFile sends data as filename to the peer identified by stableNodeID, // sendFile sends stream as filename to the peer identified by stableNodeID,
// reporting progress via notifyOutgoingFiles callbacks roughly once per second. // reporting progress via notifyOutgoingFiles callbacks roughly once per second.
func (i *jsIPN) sendFile(stableNodeID, filename string, data js.Value) js.Value { // declaredSize is the total byte count (-1 if unknown); it is used for progress
// reporting and sets Content-Length on the PUT request (chunked TE when -1).
func (i *jsIPN) sendFile(stableNodeID, filename string, stream js.Value, declaredSize int) js.Value {
return makePromise(func() (any, error) { return makePromise(func() (any, error) {
ext, err := i.taildropExt() ext, err := i.taildropExt()
if err != nil { if err != nil {
@@ -107,14 +108,15 @@ func (i *jsIPN) sendFile(stableNodeID, filename string, data js.Value) js.Value
if err != nil { if err != nil {
return nil, fmt.Errorf("bogus peer URL: %w", err) return nil, fmt.Errorf("bogus peer URL: %w", err)
} }
b := make([]byte, data.Get("byteLength").Int())
js.CopyBytesToGo(b, data) reader := stream.Call("getReader")
body := &jsStreamReader{reader: reader}
outgoing := &ipn.OutgoingFile{ outgoing := &ipn.OutgoingFile{
ID: rands.HexString(30), ID: rands.HexString(30),
PeerID: tailcfg.StableNodeID(stableNodeID), PeerID: tailcfg.StableNodeID(stableNodeID),
Name: filename, Name: filename,
DeclaredSize: int64(len(b)), DeclaredSize: int64(declaredSize),
Started: time.Now(), Started: time.Now(),
} }
updates := map[string]*ipn.OutgoingFile{outgoing.ID: outgoing} updates := map[string]*ipn.OutgoingFile{outgoing.ID: outgoing}
@@ -127,17 +129,17 @@ func (i *jsIPN) sendFile(stableNodeID, filename string, data js.Value) js.Value
ext.UpdateOutgoingFiles(updates) ext.UpdateOutgoingFiles(updates)
}() }()
body := progresstracking.NewReader(bytes.NewReader(b), time.Second, func(n int, _ error) { progressBody := progresstracking.NewReader(body, time.Second, func(n int, _ error) {
outgoing.Sent = int64(n) outgoing.Sent = int64(n)
ext.UpdateOutgoingFiles(updates) ext.UpdateOutgoingFiles(updates)
}) })
req, err := http.NewRequest("PUT", dstURL.String()+"/v0/put/"+url.PathEscape(filename), body) req, err := http.NewRequest("PUT", dstURL.String()+"/v0/put/"+url.PathEscape(filename), progressBody)
if err != nil { if err != nil {
sendErr = err sendErr = err
return nil, err return nil, err
} }
req.ContentLength = int64(len(b)) req.ContentLength = int64(declaredSize)
client := &http.Client{Transport: i.lb.Dialer().PeerAPITransport()} client := &http.Client{Transport: i.lb.Dialer().PeerAPITransport()}
resp, err := client.Do(req) resp, err := client.Do(req)
if err != nil { if err != nil {
@@ -147,7 +149,13 @@ func (i *jsIPN) sendFile(stableNodeID, filename string, data js.Value) js.Value
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
respBody, _ := io.ReadAll(resp.Body) respBody, _ := io.ReadAll(resp.Body)
sendErr = fmt.Errorf("send file: %s: %s", resp.Status, bytes.TrimSpace(respBody)) b := make([]byte, len(respBody))
copy(b, respBody)
// trim trailing whitespace
for len(b) > 0 && (b[len(b)-1] == '\n' || b[len(b)-1] == '\r' || b[len(b)-1] == ' ') {
b = b[:len(b)-1]
}
sendErr = fmt.Errorf("send file: %s: %s", resp.Status, b)
return nil, sendErr return nil, sendErr
} }
return nil, nil return nil, nil
@@ -182,7 +190,8 @@ func (i *jsIPN) waitingFiles() js.Value {
}) })
} }
// openWaitingFile returns the contents of a received file as a Uint8Array. // openWaitingFile returns the contents of a received file as a ReadableStream.
// The stream emits Uint8Array chunks and closes when the file is fully read.
func (i *jsIPN) openWaitingFile(name string) js.Value { func (i *jsIPN) openWaitingFile(name string) js.Value {
return makePromise(func() (any, error) { return makePromise(func() (any, error) {
ext, err := i.taildropExt() ext, err := i.taildropExt()
@@ -193,14 +202,7 @@ func (i *jsIPN) openWaitingFile(name string) js.Value {
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer rc.Close() return jsReadableStream(rc), nil
data, err := io.ReadAll(rc)
if err != nil {
return nil, err
}
buf := js.Global().Get("Uint8Array").New(len(data))
js.CopyBytesToJS(buf, data)
return buf, nil
}) })
} }
@@ -234,6 +236,109 @@ func wireTaildropFileOps(lb *ipnlocal.LocalBackend, jsObj js.Value) {
ext.SetStagedFileOps(&jsFileOps{v: jsObj}) ext.SetStagedFileOps(&jsFileOps{v: jsObj})
} }
// jsStreamReader implements io.ReadCloser by pulling chunks from a JS
// ReadableStreamDefaultReader. Each Read call awaits one reader.read() Promise,
// using the channel+FuncOf pattern so Go blocks until JS delivers the chunk.
type jsStreamReader struct {
reader js.Value
buf []byte
done bool
}
func (r *jsStreamReader) Read(p []byte) (int, error) {
if r.done {
return 0, io.EOF
}
if len(r.buf) > 0 {
n := copy(p, r.buf)
r.buf = r.buf[n:]
return n, nil
}
type chunkResult struct {
data []byte
done bool
}
ch := make(chan chunkResult, 1)
thenFn := js.FuncOf(func(this js.Value, args []js.Value) any {
result := args[0]
if result.Get("done").Bool() {
ch <- chunkResult{done: true}
} else {
value := result.Get("value")
b := make([]byte, value.Get("byteLength").Int())
js.CopyBytesToGo(b, value)
ch <- chunkResult{data: b}
}
return nil
})
defer thenFn.Release()
r.reader.Call("read").Call("then", thenFn)
result := <-ch
if result.done {
r.done = true
return 0, io.EOF
}
n := copy(p, result.data)
r.buf = result.data[n:]
return n, nil
}
func (r *jsStreamReader) Close() error {
r.reader.Call("cancel")
return nil
}
// jsReadableStream wraps rc in a pull-based JS ReadableStream. Each pull call
// reads up to 64 KiB from rc and enqueues a Uint8Array chunk; the stream
// closes on EOF or signals an error on any other read failure.
func jsReadableStream(rc io.ReadCloser) js.Value {
var pullFn, cancelFn js.Func
cancelFn = js.FuncOf(func(this js.Value, args []js.Value) any {
rc.Close()
pullFn.Release()
cancelFn.Release()
return nil
})
pullFn = js.FuncOf(func(this js.Value, args []js.Value) any {
controller := args[0]
var execFn js.Func
execFn = js.FuncOf(func(this js.Value, rr []js.Value) any {
resolve := rr[0]
go func() {
defer execFn.Release()
buf := make([]byte, 65536)
n, err := rc.Read(buf)
if n > 0 {
chunk := js.Global().Get("Uint8Array").New(n)
js.CopyBytesToJS(chunk, buf[:n])
controller.Call("enqueue", chunk)
}
if err == io.EOF {
rc.Close()
pullFn.Release()
cancelFn.Release()
controller.Call("close")
} else if err != nil {
rc.Close()
pullFn.Release()
cancelFn.Release()
controller.Call("error", err.Error())
}
resolve.Invoke()
}()
return nil
})
return js.Global().Get("Promise").New(execFn)
})
return js.Global().Get("ReadableStream").New(map[string]any{
"pull": pullFn,
"cancel": cancelFn,
})
}
// jsFileOps implements [taildrop.FileOps] by delegating to JS callbacks. // jsFileOps implements [taildrop.FileOps] by delegating to JS callbacks.
// JS methods use one of two callback conventions: // JS methods use one of two callback conventions:
// //
@@ -385,9 +490,9 @@ func (j jsFileOps) OpenReader(name string) (io.ReadCloser, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
b := make([]byte, val.Get("byteLength").Int()) // val is a ReadableStream; wrap its reader for streaming delivery to Go.
js.CopyBytesToGo(b, val) reader := val.Call("getReader")
return io.NopCloser(bytes.NewReader(b)), nil return &jsStreamReader{reader: reader}, nil
} }
// jsFileInfo is a minimal [fs.FileInfo] backed by a name and a size. // jsFileInfo is a minimal [fs.FileInfo] backed by a name and a size.
+40 -15
View File
@@ -66,19 +66,20 @@ import (
var ControlURL = ipn.DefaultControlURL var ControlURL = ipn.DefaultControlURL
func main() { func main() {
shutdownCh := make(chan struct{})
js.Global().Set("newIPN", js.FuncOf(func(this js.Value, args []js.Value) any { js.Global().Set("newIPN", js.FuncOf(func(this js.Value, args []js.Value) any {
if len(args) != 1 { if len(args) != 1 {
log.Fatal("Usage: newIPN(config)") log.Fatal("Usage: newIPN(config)")
return nil return nil
} }
return newIPN(args[0]) return newIPN(args[0], shutdownCh)
})) }))
// Keep Go runtime alive, otherwise it will be shut down before newIPN gets // Block until shutdown() is called on the IPN, then let main return so the
// called. // Go runtime (and all its goroutines) can be collected by the JS engine.
<-make(chan bool) <-shutdownCh
} }
func newIPN(jsConfig js.Value) map[string]any { func newIPN(jsConfig js.Value, shutdownCh chan struct{}) map[string]any {
netns.SetEnabled(false) netns.SetEnabled(false)
var store ipn.StateStore var store ipn.StateStore
@@ -179,6 +180,7 @@ func newIPN(jsConfig js.Value) map[string]any {
hostname: hostname, hostname: hostname,
logID: logid, logID: logid,
funnelPorts: make(map[uint16]*funnelListenerEntry), funnelPorts: make(map[uint16]*funnelListenerEntry),
shutdownCh: shutdownCh,
} }
lb.SetTCPHandlerForFunnelFlow(jsIPN.handleFunnelTCP) lb.SetTCPHandlerForFunnelFlow(jsIPN.handleFunnelTCP)
@@ -281,11 +283,11 @@ func newIPN(jsConfig js.Value) map[string]any {
return jsIPN.listFileTargets() return jsIPN.listFileTargets()
}), }),
"sendFile": js.FuncOf(func(this js.Value, args []js.Value) any { "sendFile": js.FuncOf(func(this js.Value, args []js.Value) any {
if len(args) != 3 { if len(args) != 4 {
log.Printf("Usage: sendFile(stableNodeID, filename, data)") log.Printf("Usage: sendFile(stableNodeID, filename, stream, declaredSize)")
return nil return nil
} }
return jsIPN.sendFile(args[0].String(), args[1].String(), args[2]) return jsIPN.sendFile(args[0].String(), args[1].String(), args[2], args[3].Int())
}), }),
"waitingFiles": js.FuncOf(func(this js.Value, args []js.Value) any { "waitingFiles": js.FuncOf(func(this js.Value, args []js.Value) any {
return jsIPN.waitingFiles() return jsIPN.waitingFiles()
@@ -361,6 +363,9 @@ func newIPN(jsConfig js.Value) map[string]any {
"suggestExitNode": js.FuncOf(func(this js.Value, args []js.Value) any { "suggestExitNode": js.FuncOf(func(this js.Value, args []js.Value) any {
return jsIPN.suggestExitNode() return jsIPN.suggestExitNode()
}), }),
"shutdown": js.FuncOf(func(this js.Value, args []js.Value) any {
return jsIPN.shutdown()
}),
"localAPI": js.FuncOf(func(this js.Value, args []js.Value) any { "localAPI": js.FuncOf(func(this js.Value, args []js.Value) any {
if len(args) < 2 { if len(args) < 2 {
log.Printf("Usage: localAPI(method, path[, body])") log.Printf("Usage: localAPI(method, path[, body])")
@@ -387,6 +392,12 @@ type jsIPN struct {
funnelMu sync.Mutex funnelMu sync.Mutex
funnelPorts map[uint16]*funnelListenerEntry funnelPorts map[uint16]*funnelListenerEntry
// ln is the safesocket listener created by run(); stored here so shutdown
// can close it and unblock srv.Run.
ln net.Listener
shutdownCh chan struct{} // closed by shutdown() to unblock main()
shutdownOnce sync.Once
} }
// funnelListenerEntry is the per-port state for routing Funnel connections to a listenTLS listener. // funnelListenerEntry is the per-port state for routing Funnel connections to a listenTLS listener.
@@ -594,14 +605,17 @@ func (i *jsIPN) run(jsCallbacks js.Value) {
} }
}() }()
go func() { ln, err := safesocket.Listen("")
ln, err := safesocket.Listen("") if err != nil {
if err != nil { log.Fatalf("safesocket.Listen: %v", err)
log.Fatalf("safesocket.Listen: %v", err) }
} i.ln = ln
err = i.srv.Run(context.Background(), ln) go func() {
log.Fatalf("ipnserver.Run exited: %v", err) err := i.srv.Run(context.Background(), ln)
if err != nil && !errors.Is(err, net.ErrClosed) {
log.Fatalf("ipnserver.Run exited: %v", err)
}
}() }()
} }
@@ -620,6 +634,17 @@ func (i *jsIPN) logout() {
}() }()
} }
func (i *jsIPN) shutdown() js.Value {
return makePromise(func() (any, error) {
i.shutdownOnce.Do(func() {
i.lb.Shutdown()
i.ln.Close()
close(i.shutdownCh)
})
return nil, nil
})
}
func (i *jsIPN) ssh(host, username string, termConfig js.Value) map[string]any { func (i *jsIPN) ssh(host, username string, termConfig js.Value) map[string]any {
jsSSHSession := &jsSSHSession{ jsSSHSession := &jsSSHSession{
jsIPN: i, jsIPN: i,