Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| dd9c9f6844 |
+22
-127
@@ -9,6 +9,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -80,11 +81,9 @@ func (i *jsIPN) listFileTargets() js.Value {
|
||||
})
|
||||
}
|
||||
|
||||
// sendFile sends stream as filename to the peer identified by stableNodeID,
|
||||
// sendFile sends data as filename to the peer identified by stableNodeID,
|
||||
// reporting progress via notifyOutgoingFiles callbacks roughly once per second.
|
||||
// declaredSize is the total byte count (-1 if unknown); it is used for progress
|
||||
// reporting and sets Content-Length on the PUT request (chunked TE when -1).
|
||||
func (i *jsIPN) sendFile(stableNodeID, filename string, stream js.Value, declaredSize int) js.Value {
|
||||
func (i *jsIPN) sendFile(stableNodeID, filename string, data js.Value) js.Value {
|
||||
return makePromise(func() (any, error) {
|
||||
ext, err := i.taildropExt()
|
||||
if err != nil {
|
||||
@@ -108,15 +107,14 @@ func (i *jsIPN) sendFile(stableNodeID, filename string, stream js.Value, declare
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("bogus peer URL: %w", err)
|
||||
}
|
||||
|
||||
reader := stream.Call("getReader")
|
||||
body := &jsStreamReader{reader: reader}
|
||||
b := make([]byte, data.Get("byteLength").Int())
|
||||
js.CopyBytesToGo(b, data)
|
||||
|
||||
outgoing := &ipn.OutgoingFile{
|
||||
ID: rands.HexString(30),
|
||||
PeerID: tailcfg.StableNodeID(stableNodeID),
|
||||
Name: filename,
|
||||
DeclaredSize: int64(declaredSize),
|
||||
DeclaredSize: int64(len(b)),
|
||||
Started: time.Now(),
|
||||
}
|
||||
updates := map[string]*ipn.OutgoingFile{outgoing.ID: outgoing}
|
||||
@@ -129,17 +127,17 @@ func (i *jsIPN) sendFile(stableNodeID, filename string, stream js.Value, declare
|
||||
ext.UpdateOutgoingFiles(updates)
|
||||
}()
|
||||
|
||||
progressBody := progresstracking.NewReader(body, time.Second, func(n int, _ error) {
|
||||
body := progresstracking.NewReader(bytes.NewReader(b), time.Second, func(n int, _ error) {
|
||||
outgoing.Sent = int64(n)
|
||||
ext.UpdateOutgoingFiles(updates)
|
||||
})
|
||||
|
||||
req, err := http.NewRequest("PUT", dstURL.String()+"/v0/put/"+url.PathEscape(filename), progressBody)
|
||||
req, err := http.NewRequest("PUT", dstURL.String()+"/v0/put/"+url.PathEscape(filename), body)
|
||||
if err != nil {
|
||||
sendErr = err
|
||||
return nil, err
|
||||
}
|
||||
req.ContentLength = int64(declaredSize)
|
||||
req.ContentLength = int64(len(b))
|
||||
client := &http.Client{Transport: i.lb.Dialer().PeerAPITransport()}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
@@ -149,13 +147,7 @@ func (i *jsIPN) sendFile(stableNodeID, filename string, stream js.Value, declare
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
b := make([]byte, len(respBody))
|
||||
copy(b, respBody)
|
||||
// trim trailing whitespace
|
||||
for len(b) > 0 && (b[len(b)-1] == '\n' || b[len(b)-1] == '\r' || b[len(b)-1] == ' ') {
|
||||
b = b[:len(b)-1]
|
||||
}
|
||||
sendErr = fmt.Errorf("send file: %s: %s", resp.Status, b)
|
||||
sendErr = fmt.Errorf("send file: %s: %s", resp.Status, bytes.TrimSpace(respBody))
|
||||
return nil, sendErr
|
||||
}
|
||||
return nil, nil
|
||||
@@ -190,8 +182,7 @@ func (i *jsIPN) waitingFiles() js.Value {
|
||||
})
|
||||
}
|
||||
|
||||
// openWaitingFile returns the contents of a received file as a ReadableStream.
|
||||
// The stream emits Uint8Array chunks and closes when the file is fully read.
|
||||
// openWaitingFile returns the contents of a received file as a Uint8Array.
|
||||
func (i *jsIPN) openWaitingFile(name string) js.Value {
|
||||
return makePromise(func() (any, error) {
|
||||
ext, err := i.taildropExt()
|
||||
@@ -202,7 +193,14 @@ func (i *jsIPN) openWaitingFile(name string) js.Value {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return jsReadableStream(rc), nil
|
||||
defer rc.Close()
|
||||
data, err := io.ReadAll(rc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
buf := js.Global().Get("Uint8Array").New(len(data))
|
||||
js.CopyBytesToJS(buf, data)
|
||||
return buf, nil
|
||||
})
|
||||
}
|
||||
|
||||
@@ -236,109 +234,6 @@ func wireTaildropFileOps(lb *ipnlocal.LocalBackend, jsObj js.Value) {
|
||||
ext.SetStagedFileOps(&jsFileOps{v: jsObj})
|
||||
}
|
||||
|
||||
// jsStreamReader implements io.ReadCloser by pulling chunks from a JS
|
||||
// ReadableStreamDefaultReader. Each Read call awaits one reader.read() Promise,
|
||||
// using the channel+FuncOf pattern so Go blocks until JS delivers the chunk.
|
||||
type jsStreamReader struct {
|
||||
reader js.Value
|
||||
buf []byte
|
||||
done bool
|
||||
}
|
||||
|
||||
func (r *jsStreamReader) Read(p []byte) (int, error) {
|
||||
if r.done {
|
||||
return 0, io.EOF
|
||||
}
|
||||
if len(r.buf) > 0 {
|
||||
n := copy(p, r.buf)
|
||||
r.buf = r.buf[n:]
|
||||
return n, nil
|
||||
}
|
||||
type chunkResult struct {
|
||||
data []byte
|
||||
done bool
|
||||
}
|
||||
ch := make(chan chunkResult, 1)
|
||||
thenFn := js.FuncOf(func(this js.Value, args []js.Value) any {
|
||||
result := args[0]
|
||||
if result.Get("done").Bool() {
|
||||
ch <- chunkResult{done: true}
|
||||
} else {
|
||||
value := result.Get("value")
|
||||
b := make([]byte, value.Get("byteLength").Int())
|
||||
js.CopyBytesToGo(b, value)
|
||||
ch <- chunkResult{data: b}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
defer thenFn.Release()
|
||||
r.reader.Call("read").Call("then", thenFn)
|
||||
result := <-ch
|
||||
if result.done {
|
||||
r.done = true
|
||||
return 0, io.EOF
|
||||
}
|
||||
n := copy(p, result.data)
|
||||
r.buf = result.data[n:]
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (r *jsStreamReader) Close() error {
|
||||
r.reader.Call("cancel")
|
||||
return nil
|
||||
}
|
||||
|
||||
// jsReadableStream wraps rc in a pull-based JS ReadableStream. Each pull call
|
||||
// reads up to 64 KiB from rc and enqueues a Uint8Array chunk; the stream
|
||||
// closes on EOF or signals an error on any other read failure.
|
||||
func jsReadableStream(rc io.ReadCloser) js.Value {
|
||||
var pullFn, cancelFn js.Func
|
||||
|
||||
cancelFn = js.FuncOf(func(this js.Value, args []js.Value) any {
|
||||
rc.Close()
|
||||
pullFn.Release()
|
||||
cancelFn.Release()
|
||||
return nil
|
||||
})
|
||||
|
||||
pullFn = js.FuncOf(func(this js.Value, args []js.Value) any {
|
||||
controller := args[0]
|
||||
var execFn js.Func
|
||||
execFn = js.FuncOf(func(this js.Value, rr []js.Value) any {
|
||||
resolve := rr[0]
|
||||
go func() {
|
||||
defer execFn.Release()
|
||||
buf := make([]byte, 65536)
|
||||
n, err := rc.Read(buf)
|
||||
if n > 0 {
|
||||
chunk := js.Global().Get("Uint8Array").New(n)
|
||||
js.CopyBytesToJS(chunk, buf[:n])
|
||||
controller.Call("enqueue", chunk)
|
||||
}
|
||||
if err == io.EOF {
|
||||
rc.Close()
|
||||
pullFn.Release()
|
||||
cancelFn.Release()
|
||||
controller.Call("close")
|
||||
} else if err != nil {
|
||||
rc.Close()
|
||||
pullFn.Release()
|
||||
cancelFn.Release()
|
||||
controller.Call("error", err.Error())
|
||||
}
|
||||
resolve.Invoke()
|
||||
}()
|
||||
return nil
|
||||
})
|
||||
return js.Global().Get("Promise").New(execFn)
|
||||
})
|
||||
|
||||
return js.Global().Get("ReadableStream").New(map[string]any{
|
||||
"pull": pullFn,
|
||||
"cancel": cancelFn,
|
||||
})
|
||||
}
|
||||
|
||||
// jsFileOps implements [taildrop.FileOps] by delegating to JS callbacks.
|
||||
// JS methods use one of two callback conventions:
|
||||
//
|
||||
@@ -490,9 +385,9 @@ func (j jsFileOps) OpenReader(name string) (io.ReadCloser, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// val is a ReadableStream; wrap its reader for streaming delivery to Go.
|
||||
reader := val.Call("getReader")
|
||||
return &jsStreamReader{reader: reader}, nil
|
||||
b := make([]byte, val.Get("byteLength").Int())
|
||||
js.CopyBytesToGo(b, val)
|
||||
return io.NopCloser(bytes.NewReader(b)), nil
|
||||
}
|
||||
|
||||
// jsFileInfo is a minimal [fs.FileInfo] backed by a name and a size.
|
||||
|
||||
@@ -66,20 +66,19 @@ import (
|
||||
var ControlURL = ipn.DefaultControlURL
|
||||
|
||||
func main() {
|
||||
shutdownCh := make(chan struct{})
|
||||
js.Global().Set("newIPN", js.FuncOf(func(this js.Value, args []js.Value) any {
|
||||
if len(args) != 1 {
|
||||
log.Fatal("Usage: newIPN(config)")
|
||||
return nil
|
||||
}
|
||||
return newIPN(args[0], shutdownCh)
|
||||
return newIPN(args[0])
|
||||
}))
|
||||
// Block until shutdown() is called on the IPN, then let main return so the
|
||||
// Go runtime (and all its goroutines) can be collected by the JS engine.
|
||||
<-shutdownCh
|
||||
// Keep Go runtime alive, otherwise it will be shut down before newIPN gets
|
||||
// called.
|
||||
<-make(chan bool)
|
||||
}
|
||||
|
||||
func newIPN(jsConfig js.Value, shutdownCh chan struct{}) map[string]any {
|
||||
func newIPN(jsConfig js.Value) map[string]any {
|
||||
netns.SetEnabled(false)
|
||||
|
||||
var store ipn.StateStore
|
||||
@@ -180,7 +179,6 @@ func newIPN(jsConfig js.Value, shutdownCh chan struct{}) map[string]any {
|
||||
hostname: hostname,
|
||||
logID: logid,
|
||||
funnelPorts: make(map[uint16]*funnelListenerEntry),
|
||||
shutdownCh: shutdownCh,
|
||||
}
|
||||
lb.SetTCPHandlerForFunnelFlow(jsIPN.handleFunnelTCP)
|
||||
|
||||
@@ -283,11 +281,11 @@ func newIPN(jsConfig js.Value, shutdownCh chan struct{}) map[string]any {
|
||||
return jsIPN.listFileTargets()
|
||||
}),
|
||||
"sendFile": js.FuncOf(func(this js.Value, args []js.Value) any {
|
||||
if len(args) != 4 {
|
||||
log.Printf("Usage: sendFile(stableNodeID, filename, stream, declaredSize)")
|
||||
if len(args) != 3 {
|
||||
log.Printf("Usage: sendFile(stableNodeID, filename, data)")
|
||||
return nil
|
||||
}
|
||||
return jsIPN.sendFile(args[0].String(), args[1].String(), args[2], args[3].Int())
|
||||
return jsIPN.sendFile(args[0].String(), args[1].String(), args[2])
|
||||
}),
|
||||
"waitingFiles": js.FuncOf(func(this js.Value, args []js.Value) any {
|
||||
return jsIPN.waitingFiles()
|
||||
@@ -363,8 +361,12 @@ func newIPN(jsConfig js.Value, shutdownCh chan struct{}) map[string]any {
|
||||
"suggestExitNode": js.FuncOf(func(this js.Value, args []js.Value) any {
|
||||
return jsIPN.suggestExitNode()
|
||||
}),
|
||||
"shutdown": js.FuncOf(func(this js.Value, args []js.Value) any {
|
||||
return jsIPN.shutdown()
|
||||
"setServices": js.FuncOf(func(this js.Value, args []js.Value) any {
|
||||
if len(args) != 1 {
|
||||
log.Printf("Usage: setServices(services)")
|
||||
return nil
|
||||
}
|
||||
return jsIPN.setServices(args[0])
|
||||
}),
|
||||
"localAPI": js.FuncOf(func(this js.Value, args []js.Value) any {
|
||||
if len(args) < 2 {
|
||||
@@ -392,12 +394,6 @@ type jsIPN struct {
|
||||
|
||||
funnelMu sync.Mutex
|
||||
funnelPorts map[uint16]*funnelListenerEntry
|
||||
|
||||
// ln is the safesocket listener created by run(); stored here so shutdown
|
||||
// can close it and unblock srv.Run.
|
||||
ln net.Listener
|
||||
shutdownCh chan struct{} // closed by shutdown() to unblock main()
|
||||
shutdownOnce sync.Once
|
||||
}
|
||||
|
||||
// funnelListenerEntry is the per-port state for routing Funnel connections to a listenTLS listener.
|
||||
@@ -478,6 +474,7 @@ func (i *jsIPN) run(jsCallbacks js.Value) {
|
||||
NodeKey: nm.NodeKey.String(),
|
||||
MachineKey: nm.MachineKey.String(),
|
||||
PeerAPIURL: selfPeerAPIURL,
|
||||
Services: userServicesFromView(nm.SelfNode.Hostinfo().Services()),
|
||||
},
|
||||
MachineStatus: jsMachineStatus[nm.GetMachineStatus()],
|
||||
},
|
||||
@@ -527,6 +524,7 @@ func (i *jsIPN) run(jsCallbacks js.Value) {
|
||||
MachineKey: p.Machine().String(),
|
||||
NodeKey: p.Key().String(),
|
||||
PeerAPIURL: peerURL,
|
||||
Services: userServicesFromView(p.Hostinfo().Services()),
|
||||
},
|
||||
Online: p.Online().Clone(),
|
||||
TailscaleSSHEnabled: p.Hostinfo().TailscaleSSHEnabled(),
|
||||
@@ -605,17 +603,14 @@ func (i *jsIPN) run(jsCallbacks js.Value) {
|
||||
}
|
||||
}()
|
||||
|
||||
ln, err := safesocket.Listen("")
|
||||
if err != nil {
|
||||
log.Fatalf("safesocket.Listen: %v", err)
|
||||
}
|
||||
i.ln = ln
|
||||
|
||||
go func() {
|
||||
err := i.srv.Run(context.Background(), ln)
|
||||
if err != nil && !errors.Is(err, net.ErrClosed) {
|
||||
log.Fatalf("ipnserver.Run exited: %v", err)
|
||||
ln, err := safesocket.Listen("")
|
||||
if err != nil {
|
||||
log.Fatalf("safesocket.Listen: %v", err)
|
||||
}
|
||||
|
||||
err = i.srv.Run(context.Background(), ln)
|
||||
log.Fatalf("ipnserver.Run exited: %v", err)
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -634,17 +629,6 @@ func (i *jsIPN) logout() {
|
||||
}()
|
||||
}
|
||||
|
||||
func (i *jsIPN) shutdown() js.Value {
|
||||
return makePromise(func() (any, error) {
|
||||
i.shutdownOnce.Do(func() {
|
||||
i.lb.Shutdown()
|
||||
i.ln.Close()
|
||||
close(i.shutdownCh)
|
||||
})
|
||||
return nil, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (i *jsIPN) ssh(host, username string, termConfig js.Value) map[string]any {
|
||||
jsSSHSession := &jsSSHSession{
|
||||
jsIPN: i,
|
||||
@@ -1353,6 +1337,39 @@ func (i *jsIPN) suggestExitNode() js.Value {
|
||||
})
|
||||
}
|
||||
|
||||
func (i *jsIPN) setServices(jsServices js.Value) js.Value {
|
||||
return makePromise(func() (any, error) {
|
||||
n := jsServices.Length()
|
||||
svcs := make([]tailcfg.Service, 0, n)
|
||||
for idx := range n {
|
||||
s := jsServices.Index(idx)
|
||||
proto := tailcfg.ServiceProto(s.Get("proto").String())
|
||||
port := uint16(s.Get("port").Int())
|
||||
var desc string
|
||||
if d := s.Get("description"); d.Type() == js.TypeString {
|
||||
desc = d.String()
|
||||
}
|
||||
svcs = append(svcs, tailcfg.Service{Proto: proto, Port: port, Description: desc})
|
||||
}
|
||||
i.lb.SetExplicitServices(svcs)
|
||||
return nil, nil
|
||||
})
|
||||
}
|
||||
|
||||
// userServicesFromView converts a hostinfo services slice to jsService entries,
|
||||
// filtering out internal peerapi protocol entries (already reflected in peerAPIURL).
|
||||
func userServicesFromView(svcs views.Slice[tailcfg.Service]) []jsService {
|
||||
var out []jsService
|
||||
for _, s := range svcs.All() {
|
||||
switch s.Proto {
|
||||
case tailcfg.PeerAPI4, tailcfg.PeerAPI6, tailcfg.PeerAPIDNS:
|
||||
continue
|
||||
}
|
||||
out = append(out, jsService{Proto: string(s.Proto), Port: s.Port, Description: s.Description})
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (i *jsIPN) localAPI(method, path, body string) js.Value {
|
||||
return makePromise(func() (any, error) {
|
||||
h := localapi.NewHandler(localapi.HandlerConfig{
|
||||
@@ -1589,12 +1606,19 @@ type jsNetMap struct {
|
||||
LockedOut bool `json:"lockedOut"`
|
||||
}
|
||||
|
||||
type jsService struct {
|
||||
Proto string `json:"proto"`
|
||||
Port uint16 `json:"port"`
|
||||
Description string `json:"description,omitempty"`
|
||||
}
|
||||
|
||||
type jsNetMapNode struct {
|
||||
Name string `json:"name"`
|
||||
Addresses []string `json:"addresses"`
|
||||
MachineKey string `json:"machineKey"`
|
||||
NodeKey string `json:"nodeKey"`
|
||||
PeerAPIURL string `json:"peerAPIURL,omitempty"`
|
||||
Name string `json:"name"`
|
||||
Addresses []string `json:"addresses"`
|
||||
MachineKey string `json:"machineKey"`
|
||||
NodeKey string `json:"nodeKey"`
|
||||
PeerAPIURL string `json:"peerAPIURL,omitempty"`
|
||||
Services []jsService `json:"services,omitempty"`
|
||||
}
|
||||
|
||||
type jsNetMapSelfNode struct {
|
||||
|
||||
+21
-1
@@ -294,6 +294,7 @@ type LocalBackend struct {
|
||||
capTailnetLock bool // whether netMap contains the tailnet lock capability
|
||||
// hostinfo is mutated in-place while mu is held.
|
||||
hostinfo *tailcfg.Hostinfo // TODO(nickkhyl): move to nodeBackend
|
||||
explicitServices []tailcfg.Service // services set explicitly via SetExplicitServices; always uploaded
|
||||
nmExpiryTimer tstime.TimerController // for updating netMap on node expiry; can be nil; TODO(nickkhyl): move to nodeBackend
|
||||
activeLogin string // last logged LoginName from netMap; TODO(nickkhyl): move to nodeBackend (or remove? it's in [ipn.LoginProfile]).
|
||||
engineStatus ipn.EngineStatus
|
||||
@@ -4967,6 +4968,23 @@ func (b *LocalBackend) setPortlistServices(sl []tailcfg.Service) {
|
||||
b.doSetHostinfoFilterServices()
|
||||
}
|
||||
|
||||
// SetExplicitServices sets the services this node advertises on the netmap.
|
||||
// Unlike the OS port-scan path (setPortlistServices), services set here are
|
||||
// always uploaded to the control server regardless of the ShouldUploadServices
|
||||
// hook — suitable for environments like browser WASM where OS port scanning is
|
||||
// unavailable and services are declared programmatically.
|
||||
func (b *LocalBackend) SetExplicitServices(sl []tailcfg.Service) {
|
||||
b.mu.Lock()
|
||||
if b.hostinfo == nil {
|
||||
b.hostinfo = new(tailcfg.Hostinfo)
|
||||
}
|
||||
b.hostinfo.Services = sl
|
||||
b.explicitServices = sl
|
||||
b.mu.Unlock()
|
||||
|
||||
b.doSetHostinfoFilterServices()
|
||||
}
|
||||
|
||||
// doSetHostinfoFilterServices calls SetHostinfo on the controlclient,
|
||||
// possibly after mangling the given hostinfo.
|
||||
//
|
||||
@@ -5011,7 +5029,9 @@ func (b *LocalBackend) hostInfoWithServicesLocked() *tailcfg.Hostinfo {
|
||||
// Make a shallow copy of hostinfo so we can mutate
|
||||
// at the Service field.
|
||||
if f, ok := b.extHost.Hooks().ShouldUploadServices.GetOk(); !ok || !f() {
|
||||
hi.Services = []tailcfg.Service{}
|
||||
if len(b.explicitServices) == 0 {
|
||||
hi.Services = []tailcfg.Service{}
|
||||
}
|
||||
}
|
||||
|
||||
// Don't mutate hi.Service's underlying array. Append to
|
||||
|
||||
Reference in New Issue
Block a user