2 Commits

Author SHA1 Message Date
codinget 6e83d5291b feat(tsconnect/wasm): add shutdown() to jsIPN
Expose a shutdown() method on the JS-side IPN object that stops the
LocalBackend, closes the safesocket listener (which unblocks srv.Run),
and signals main() to return so the Go runtime exits cleanly.

This allows the host environment (Node.js process or browser service
worker) to terminate normally once the Tailscale WASM module is no
longer needed, instead of being kept alive indefinitely by open handles,
goroutines, or the Go runtime's blocking main goroutine.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 00:22:56 +00:00
codinget 21d0f11d85 feat(taildrop): stream files via ReadableStream on send and receive
Send: accept a ReadableStreamDefaultReader instead of a Uint8Array.
jsStreamReader (new io.ReadCloser) awaits reader.read() Promises via the
channel+FuncOf pattern, feeding chunks directly to the HTTP PUT body.
No js.CopyBytesToGo of the full file.

Receive: openWaitingFile now returns a pull-based ReadableStream backed by
the Go io.ReadCloser (jsReadableStream helper). Each pull call reads up
to 64 KiB and enqueues a Uint8Array chunk; no io.ReadAll.

jsFileOps.OpenReader: JS now returns a ReadableStream instead of a
Uint8Array; Go wraps it in jsStreamReader for streaming delivery.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-11 21:13:04 +00:00
3 changed files with 172 additions and 111 deletions
+127 -22
View File
@@ -9,7 +9,6 @@
package main
import (
"bytes"
"encoding/json"
"errors"
"fmt"
@@ -81,9 +80,11 @@ func (i *jsIPN) listFileTargets() js.Value {
})
}
// sendFile sends data as filename to the peer identified by stableNodeID,
// sendFile sends stream as filename to the peer identified by stableNodeID,
// reporting progress via notifyOutgoingFiles callbacks roughly once per second.
func (i *jsIPN) sendFile(stableNodeID, filename string, data js.Value) js.Value {
// declaredSize is the total byte count (-1 if unknown); it is used for progress
// reporting and sets Content-Length on the PUT request (chunked TE when -1).
func (i *jsIPN) sendFile(stableNodeID, filename string, stream js.Value, declaredSize int) js.Value {
return makePromise(func() (any, error) {
ext, err := i.taildropExt()
if err != nil {
@@ -107,14 +108,15 @@ func (i *jsIPN) sendFile(stableNodeID, filename string, data js.Value) js.Value
if err != nil {
return nil, fmt.Errorf("bogus peer URL: %w", err)
}
b := make([]byte, data.Get("byteLength").Int())
js.CopyBytesToGo(b, data)
reader := stream.Call("getReader")
body := &jsStreamReader{reader: reader}
outgoing := &ipn.OutgoingFile{
ID: rands.HexString(30),
PeerID: tailcfg.StableNodeID(stableNodeID),
Name: filename,
DeclaredSize: int64(len(b)),
DeclaredSize: int64(declaredSize),
Started: time.Now(),
}
updates := map[string]*ipn.OutgoingFile{outgoing.ID: outgoing}
@@ -127,17 +129,17 @@ func (i *jsIPN) sendFile(stableNodeID, filename string, data js.Value) js.Value
ext.UpdateOutgoingFiles(updates)
}()
body := progresstracking.NewReader(bytes.NewReader(b), time.Second, func(n int, _ error) {
progressBody := progresstracking.NewReader(body, time.Second, func(n int, _ error) {
outgoing.Sent = int64(n)
ext.UpdateOutgoingFiles(updates)
})
req, err := http.NewRequest("PUT", dstURL.String()+"/v0/put/"+url.PathEscape(filename), body)
req, err := http.NewRequest("PUT", dstURL.String()+"/v0/put/"+url.PathEscape(filename), progressBody)
if err != nil {
sendErr = err
return nil, err
}
req.ContentLength = int64(len(b))
req.ContentLength = int64(declaredSize)
client := &http.Client{Transport: i.lb.Dialer().PeerAPITransport()}
resp, err := client.Do(req)
if err != nil {
@@ -147,7 +149,13 @@ func (i *jsIPN) sendFile(stableNodeID, filename string, data js.Value) js.Value
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
respBody, _ := io.ReadAll(resp.Body)
sendErr = fmt.Errorf("send file: %s: %s", resp.Status, bytes.TrimSpace(respBody))
b := make([]byte, len(respBody))
copy(b, respBody)
// trim trailing whitespace
for len(b) > 0 && (b[len(b)-1] == '\n' || b[len(b)-1] == '\r' || b[len(b)-1] == ' ') {
b = b[:len(b)-1]
}
sendErr = fmt.Errorf("send file: %s: %s", resp.Status, b)
return nil, sendErr
}
return nil, nil
@@ -182,7 +190,8 @@ func (i *jsIPN) waitingFiles() js.Value {
})
}
// openWaitingFile returns the contents of a received file as a Uint8Array.
// openWaitingFile returns the contents of a received file as a ReadableStream.
// The stream emits Uint8Array chunks and closes when the file is fully read.
func (i *jsIPN) openWaitingFile(name string) js.Value {
return makePromise(func() (any, error) {
ext, err := i.taildropExt()
@@ -193,14 +202,7 @@ func (i *jsIPN) openWaitingFile(name string) js.Value {
if err != nil {
return nil, err
}
defer rc.Close()
data, err := io.ReadAll(rc)
if err != nil {
return nil, err
}
buf := js.Global().Get("Uint8Array").New(len(data))
js.CopyBytesToJS(buf, data)
return buf, nil
return jsReadableStream(rc), nil
})
}
@@ -234,6 +236,109 @@ func wireTaildropFileOps(lb *ipnlocal.LocalBackend, jsObj js.Value) {
ext.SetStagedFileOps(&jsFileOps{v: jsObj})
}
// jsStreamReader implements io.ReadCloser by pulling chunks from a JS
// ReadableStreamDefaultReader. Each Read call awaits one reader.read() Promise,
// using the channel+FuncOf pattern so Go blocks until JS delivers the chunk.
type jsStreamReader struct {
reader js.Value
buf []byte
done bool
}
func (r *jsStreamReader) Read(p []byte) (int, error) {
if r.done {
return 0, io.EOF
}
if len(r.buf) > 0 {
n := copy(p, r.buf)
r.buf = r.buf[n:]
return n, nil
}
type chunkResult struct {
data []byte
done bool
}
ch := make(chan chunkResult, 1)
thenFn := js.FuncOf(func(this js.Value, args []js.Value) any {
result := args[0]
if result.Get("done").Bool() {
ch <- chunkResult{done: true}
} else {
value := result.Get("value")
b := make([]byte, value.Get("byteLength").Int())
js.CopyBytesToGo(b, value)
ch <- chunkResult{data: b}
}
return nil
})
defer thenFn.Release()
r.reader.Call("read").Call("then", thenFn)
result := <-ch
if result.done {
r.done = true
return 0, io.EOF
}
n := copy(p, result.data)
r.buf = result.data[n:]
return n, nil
}
func (r *jsStreamReader) Close() error {
r.reader.Call("cancel")
return nil
}
// jsReadableStream wraps rc in a pull-based JS ReadableStream. Each pull call
// reads up to 64 KiB from rc and enqueues a Uint8Array chunk; the stream
// closes on EOF or signals an error on any other read failure.
func jsReadableStream(rc io.ReadCloser) js.Value {
var pullFn, cancelFn js.Func
cancelFn = js.FuncOf(func(this js.Value, args []js.Value) any {
rc.Close()
pullFn.Release()
cancelFn.Release()
return nil
})
pullFn = js.FuncOf(func(this js.Value, args []js.Value) any {
controller := args[0]
var execFn js.Func
execFn = js.FuncOf(func(this js.Value, rr []js.Value) any {
resolve := rr[0]
go func() {
defer execFn.Release()
buf := make([]byte, 65536)
n, err := rc.Read(buf)
if n > 0 {
chunk := js.Global().Get("Uint8Array").New(n)
js.CopyBytesToJS(chunk, buf[:n])
controller.Call("enqueue", chunk)
}
if err == io.EOF {
rc.Close()
pullFn.Release()
cancelFn.Release()
controller.Call("close")
} else if err != nil {
rc.Close()
pullFn.Release()
cancelFn.Release()
controller.Call("error", err.Error())
}
resolve.Invoke()
}()
return nil
})
return js.Global().Get("Promise").New(execFn)
})
return js.Global().Get("ReadableStream").New(map[string]any{
"pull": pullFn,
"cancel": cancelFn,
})
}
// jsFileOps implements [taildrop.FileOps] by delegating to JS callbacks.
// JS methods use one of two callback conventions:
//
@@ -385,9 +490,9 @@ func (j jsFileOps) OpenReader(name string) (io.ReadCloser, error) {
if err != nil {
return nil, err
}
b := make([]byte, val.Get("byteLength").Int())
js.CopyBytesToGo(b, val)
return io.NopCloser(bytes.NewReader(b)), nil
// val is a ReadableStream; wrap its reader for streaming delivery to Go.
reader := val.Call("getReader")
return &jsStreamReader{reader: reader}, nil
}
// jsFileInfo is a minimal [fs.FileInfo] backed by a name and a size.
+44 -68
View File
@@ -66,19 +66,20 @@ import (
var ControlURL = ipn.DefaultControlURL
func main() {
shutdownCh := make(chan struct{})
js.Global().Set("newIPN", js.FuncOf(func(this js.Value, args []js.Value) any {
if len(args) != 1 {
log.Fatal("Usage: newIPN(config)")
return nil
}
return newIPN(args[0])
return newIPN(args[0], shutdownCh)
}))
// Keep Go runtime alive, otherwise it will be shut down before newIPN gets
// called.
<-make(chan bool)
// Block until shutdown() is called on the IPN, then let main return so the
// Go runtime (and all its goroutines) can be collected by the JS engine.
<-shutdownCh
}
func newIPN(jsConfig js.Value) map[string]any {
func newIPN(jsConfig js.Value, shutdownCh chan struct{}) map[string]any {
netns.SetEnabled(false)
var store ipn.StateStore
@@ -179,6 +180,7 @@ func newIPN(jsConfig js.Value) map[string]any {
hostname: hostname,
logID: logid,
funnelPorts: make(map[uint16]*funnelListenerEntry),
shutdownCh: shutdownCh,
}
lb.SetTCPHandlerForFunnelFlow(jsIPN.handleFunnelTCP)
@@ -281,11 +283,11 @@ func newIPN(jsConfig js.Value) map[string]any {
return jsIPN.listFileTargets()
}),
"sendFile": js.FuncOf(func(this js.Value, args []js.Value) any {
if len(args) != 3 {
log.Printf("Usage: sendFile(stableNodeID, filename, data)")
if len(args) != 4 {
log.Printf("Usage: sendFile(stableNodeID, filename, stream, declaredSize)")
return nil
}
return jsIPN.sendFile(args[0].String(), args[1].String(), args[2])
return jsIPN.sendFile(args[0].String(), args[1].String(), args[2], args[3].Int())
}),
"waitingFiles": js.FuncOf(func(this js.Value, args []js.Value) any {
return jsIPN.waitingFiles()
@@ -361,12 +363,8 @@ func newIPN(jsConfig js.Value) map[string]any {
"suggestExitNode": js.FuncOf(func(this js.Value, args []js.Value) any {
return jsIPN.suggestExitNode()
}),
"setServices": js.FuncOf(func(this js.Value, args []js.Value) any {
if len(args) != 1 {
log.Printf("Usage: setServices(services)")
return nil
}
return jsIPN.setServices(args[0])
"shutdown": js.FuncOf(func(this js.Value, args []js.Value) any {
return jsIPN.shutdown()
}),
"localAPI": js.FuncOf(func(this js.Value, args []js.Value) any {
if len(args) < 2 {
@@ -394,6 +392,12 @@ type jsIPN struct {
funnelMu sync.Mutex
funnelPorts map[uint16]*funnelListenerEntry
// ln is the safesocket listener created by run(); stored here so shutdown
// can close it and unblock srv.Run.
ln net.Listener
shutdownCh chan struct{} // closed by shutdown() to unblock main()
shutdownOnce sync.Once
}
// funnelListenerEntry is the per-port state for routing Funnel connections to a listenTLS listener.
@@ -474,7 +478,6 @@ func (i *jsIPN) run(jsCallbacks js.Value) {
NodeKey: nm.NodeKey.String(),
MachineKey: nm.MachineKey.String(),
PeerAPIURL: selfPeerAPIURL,
Services: userServicesFromView(nm.SelfNode.Hostinfo().Services()),
},
MachineStatus: jsMachineStatus[nm.GetMachineStatus()],
},
@@ -524,7 +527,6 @@ func (i *jsIPN) run(jsCallbacks js.Value) {
MachineKey: p.Machine().String(),
NodeKey: p.Key().String(),
PeerAPIURL: peerURL,
Services: userServicesFromView(p.Hostinfo().Services()),
},
Online: p.Online().Clone(),
TailscaleSSHEnabled: p.Hostinfo().TailscaleSSHEnabled(),
@@ -603,14 +605,17 @@ func (i *jsIPN) run(jsCallbacks js.Value) {
}
}()
go func() {
ln, err := safesocket.Listen("")
if err != nil {
log.Fatalf("safesocket.Listen: %v", err)
}
ln, err := safesocket.Listen("")
if err != nil {
log.Fatalf("safesocket.Listen: %v", err)
}
i.ln = ln
err = i.srv.Run(context.Background(), ln)
log.Fatalf("ipnserver.Run exited: %v", err)
go func() {
err := i.srv.Run(context.Background(), ln)
if err != nil && !errors.Is(err, net.ErrClosed) {
log.Fatalf("ipnserver.Run exited: %v", err)
}
}()
}
@@ -629,6 +634,17 @@ func (i *jsIPN) logout() {
}()
}
func (i *jsIPN) shutdown() js.Value {
return makePromise(func() (any, error) {
i.shutdownOnce.Do(func() {
i.lb.Shutdown()
i.ln.Close()
close(i.shutdownCh)
})
return nil, nil
})
}
func (i *jsIPN) ssh(host, username string, termConfig js.Value) map[string]any {
jsSSHSession := &jsSSHSession{
jsIPN: i,
@@ -1337,39 +1353,6 @@ func (i *jsIPN) suggestExitNode() js.Value {
})
}
func (i *jsIPN) setServices(jsServices js.Value) js.Value {
return makePromise(func() (any, error) {
n := jsServices.Length()
svcs := make([]tailcfg.Service, 0, n)
for idx := range n {
s := jsServices.Index(idx)
proto := tailcfg.ServiceProto(s.Get("proto").String())
port := uint16(s.Get("port").Int())
var desc string
if d := s.Get("description"); d.Type() == js.TypeString {
desc = d.String()
}
svcs = append(svcs, tailcfg.Service{Proto: proto, Port: port, Description: desc})
}
i.lb.SetExplicitServices(svcs)
return nil, nil
})
}
// userServicesFromView converts a hostinfo services slice to jsService entries,
// filtering out internal peerapi protocol entries (already reflected in peerAPIURL).
func userServicesFromView(svcs views.Slice[tailcfg.Service]) []jsService {
var out []jsService
for _, s := range svcs.All() {
switch s.Proto {
case tailcfg.PeerAPI4, tailcfg.PeerAPI6, tailcfg.PeerAPIDNS:
continue
}
out = append(out, jsService{Proto: string(s.Proto), Port: s.Port, Description: s.Description})
}
return out
}
func (i *jsIPN) localAPI(method, path, body string) js.Value {
return makePromise(func() (any, error) {
h := localapi.NewHandler(localapi.HandlerConfig{
@@ -1606,19 +1589,12 @@ type jsNetMap struct {
LockedOut bool `json:"lockedOut"`
}
type jsService struct {
Proto string `json:"proto"`
Port uint16 `json:"port"`
Description string `json:"description,omitempty"`
}
type jsNetMapNode struct {
Name string `json:"name"`
Addresses []string `json:"addresses"`
MachineKey string `json:"machineKey"`
NodeKey string `json:"nodeKey"`
PeerAPIURL string `json:"peerAPIURL,omitempty"`
Services []jsService `json:"services,omitempty"`
Name string `json:"name"`
Addresses []string `json:"addresses"`
MachineKey string `json:"machineKey"`
NodeKey string `json:"nodeKey"`
PeerAPIURL string `json:"peerAPIURL,omitempty"`
}
type jsNetMapSelfNode struct {
+1 -21
View File
@@ -294,7 +294,6 @@ type LocalBackend struct {
capTailnetLock bool // whether netMap contains the tailnet lock capability
// hostinfo is mutated in-place while mu is held.
hostinfo *tailcfg.Hostinfo // TODO(nickkhyl): move to nodeBackend
explicitServices []tailcfg.Service // services set explicitly via SetExplicitServices; always uploaded
nmExpiryTimer tstime.TimerController // for updating netMap on node expiry; can be nil; TODO(nickkhyl): move to nodeBackend
activeLogin string // last logged LoginName from netMap; TODO(nickkhyl): move to nodeBackend (or remove? it's in [ipn.LoginProfile]).
engineStatus ipn.EngineStatus
@@ -4968,23 +4967,6 @@ func (b *LocalBackend) setPortlistServices(sl []tailcfg.Service) {
b.doSetHostinfoFilterServices()
}
// SetExplicitServices sets the services this node advertises on the netmap.
// Unlike the OS port-scan path (setPortlistServices), services set here are
// always uploaded to the control server regardless of the ShouldUploadServices
// hook — suitable for environments like browser WASM where OS port scanning is
// unavailable and services are declared programmatically.
func (b *LocalBackend) SetExplicitServices(sl []tailcfg.Service) {
b.mu.Lock()
if b.hostinfo == nil {
b.hostinfo = new(tailcfg.Hostinfo)
}
b.hostinfo.Services = sl
b.explicitServices = sl
b.mu.Unlock()
b.doSetHostinfoFilterServices()
}
// doSetHostinfoFilterServices calls SetHostinfo on the controlclient,
// possibly after mangling the given hostinfo.
//
@@ -5029,9 +5011,7 @@ func (b *LocalBackend) hostInfoWithServicesLocked() *tailcfg.Hostinfo {
// Make a shallow copy of hostinfo so we can mutate
// at the Service field.
if f, ok := b.extHost.Hooks().ShouldUploadServices.GetOk(); !ok || !f() {
if len(b.explicitServices) == 0 {
hi.Services = []tailcfg.Service{}
}
hi.Services = []tailcfg.Service{}
}
// Don't mutate hi.Service's underlying array. Append to