WIP: rebase for 2026-05-18 #7
+189
-20
@@ -32,6 +32,7 @@ import (
|
||||
"tailscale.com/client/tailscale/apitype"
|
||||
"tailscale.com/cmd/tailscale/cli/ffcomplete"
|
||||
"tailscale.com/envknob"
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/ipn/ipnstate"
|
||||
"tailscale.com/net/tsaddr"
|
||||
"tailscale.com/tailcfg"
|
||||
@@ -78,14 +79,16 @@ var fileCpCmd = &ffcli.Command{
|
||||
fs.StringVar(&cpArgs.name, "name", "", "alternate filename to use, especially useful when <file> is \"-\" (stdin)")
|
||||
fs.BoolVar(&cpArgs.verbose, "verbose", false, "verbose output")
|
||||
fs.BoolVar(&cpArgs.targets, "targets", false, "list possible file cp targets")
|
||||
fs.DurationVar(&cpArgs.updateInterval, "update-interval", 250*time.Millisecond, "how often to repaint the progress line; zero or negative disables progress display entirely")
|
||||
return fs
|
||||
})(),
|
||||
}
|
||||
|
||||
var cpArgs struct {
|
||||
name string
|
||||
verbose bool
|
||||
targets bool
|
||||
name string
|
||||
verbose bool
|
||||
targets bool
|
||||
updateInterval time.Duration
|
||||
}
|
||||
|
||||
func runCp(ctx context.Context, args []string) error {
|
||||
@@ -119,9 +122,6 @@ func runCp(ctx context.Context, args []string) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't send to %s: %v", target, err)
|
||||
}
|
||||
if isOffline {
|
||||
fmt.Fprintf(Stderr, "# warning: %s is offline\n", target)
|
||||
}
|
||||
|
||||
if len(files) > 1 {
|
||||
if cpArgs.name != "" {
|
||||
@@ -132,7 +132,51 @@ func runCp(ctx context.Context, args []string) error {
|
||||
}
|
||||
}
|
||||
|
||||
for _, fileArg := range files {
|
||||
// outFiles tracks per-name push state, populated by a goroutine subscribed
|
||||
// to the IPN bus. tailscaled's OutgoingFile.Sent is the bytes-pulled-toward-
|
||||
// peerAPI signal; it stays at 0 until the peerAPI request body is actually
|
||||
// being read, which is what we want both for the progress display and for
|
||||
// disarming the offline warning. The CLI's local-side bytes counter would
|
||||
// say "100% sent" the moment net/http buffers a small body into the local
|
||||
// unix-socket conn to tailscaled, well before the peer has heard a thing.
|
||||
type pushState struct {
|
||||
sent atomic.Int64
|
||||
warnTimer *time.Timer // disarmed on first byte sent to peerAPI; nil after
|
||||
}
|
||||
var (
|
||||
outMu sync.Mutex
|
||||
outFiles = map[string]*pushState{} // keyed by file name
|
||||
)
|
||||
|
||||
busCtx, cancelBus := context.WithCancel(ctx)
|
||||
defer cancelBus()
|
||||
go watchOutgoingFiles(busCtx, stableID, func(name string, sent int64) {
|
||||
outMu.Lock()
|
||||
ps := outFiles[name]
|
||||
outMu.Unlock()
|
||||
if ps == nil {
|
||||
return
|
||||
}
|
||||
// Only ever advance ps.sent forward. Bus updates can arrive late
|
||||
// (after the success path below has already written contentLength
|
||||
// to ps.sent for an instant final-100% paint), so we'd otherwise
|
||||
// regress the count and the progress printer would compute a
|
||||
// negative delta on its next tick.
|
||||
for {
|
||||
old := ps.sent.Load()
|
||||
if sent <= old {
|
||||
return
|
||||
}
|
||||
if ps.sent.CompareAndSwap(old, sent) {
|
||||
if old == 0 && ps.warnTimer != nil {
|
||||
ps.warnTimer.Stop()
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
for i, fileArg := range files {
|
||||
var fileContents *countingReader
|
||||
var name = cpArgs.name
|
||||
var contentLength int64 = -1
|
||||
@@ -175,16 +219,57 @@ func runCp(ctx context.Context, args []string) error {
|
||||
log.Printf("sending %q to %v/%v/%v ...", name, target, ip, stableID)
|
||||
}
|
||||
|
||||
// Register this file with the watcher and, for the first file only,
|
||||
// arm a timer that warns the user if no bytes have flowed to peerAPI
|
||||
// after a few seconds. The watcher disarms it on first byte; PushFile
|
||||
// returning also disarms it (cleanup, below). We don't gate on the
|
||||
// netmap's Online bit (which can lag reality), but we do use it to
|
||||
// pick between two warning messages.
|
||||
ps := &pushState{}
|
||||
if i == 0 {
|
||||
ps.warnTimer = time.AfterFunc(3*time.Second, func() {
|
||||
// vtRestartLine clears whatever (possibly progress) was on
|
||||
// the current line, then we print the warning + \n so the
|
||||
// next progress redraw lands on a fresh line below.
|
||||
const vtRestartLine = "\r\x1b[K"
|
||||
if isOffline {
|
||||
fmt.Fprintf(Stderr, "%s# warning: %s is reportedly offline; trying anyway\n", vtRestartLine, target)
|
||||
} else {
|
||||
fmt.Fprintf(Stderr, "%s# warning: %s is not replying; trying anyway\n", vtRestartLine, target)
|
||||
}
|
||||
})
|
||||
}
|
||||
outMu.Lock()
|
||||
outFiles[name] = ps
|
||||
outMu.Unlock()
|
||||
|
||||
var group sync.WaitGroup
|
||||
ctxProgress, cancelProgress := context.WithCancel(ctx)
|
||||
defer cancelProgress()
|
||||
if isatty.IsTerminal(os.Stderr.Fd()) {
|
||||
group.Go(func() { progressPrinter(ctxProgress, name, fileContents.n.Load, contentLength) })
|
||||
if cpArgs.updateInterval > 0 && isatty.IsTerminal(os.Stderr.Fd()) {
|
||||
group.Go(func() {
|
||||
progressPrinter(ctxProgress, name, ps.sent.Load, contentLength, cpArgs.updateInterval)
|
||||
})
|
||||
}
|
||||
|
||||
err := localClient.PushFile(ctx, stableID, contentLength, name, fileContents)
|
||||
if err == nil {
|
||||
// PushFile can finish faster than the IPN bus delivers a final
|
||||
// OutgoingFile update, leaving the progress display stuck at 0%.
|
||||
// Synthesize a "fully done" count before stopping the printer so
|
||||
// its final paint shows 100%. For stdin (contentLength == -1) we
|
||||
// don't know the size, so fall back to the local read count.
|
||||
if contentLength >= 0 {
|
||||
ps.sent.Store(contentLength)
|
||||
} else {
|
||||
ps.sent.Store(fileContents.n.Load())
|
||||
}
|
||||
}
|
||||
cancelProgress()
|
||||
group.Wait() // wait for progress printer to stop before reporting the error
|
||||
if ps.warnTimer != nil {
|
||||
ps.warnTimer.Stop()
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -195,15 +280,71 @@ func runCp(ctx context.Context, args []string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func progressPrinter(ctx context.Context, name string, contentCount func() int64, contentLength int64) {
|
||||
// watchOutgoingFiles subscribes to the IPN bus and invokes onUpdate once
|
||||
// per OutgoingFile event for files going to peer. It runs until ctx is
|
||||
// done (which runCp does on return) and is best-effort: if the bus
|
||||
// subscription fails for any reason, onUpdate simply isn't called and the
|
||||
// caller's progress display stays at 0 — exactly the right degradation,
|
||||
// since the warning timer will then fire on its normal 3-second deadline.
|
||||
func watchOutgoingFiles(ctx context.Context, peer tailcfg.StableNodeID, onUpdate func(name string, sent int64)) {
|
||||
// NotifyPeerChanges asks the broadcaster to deliver incremental peer
|
||||
// updates as small PeerChanges blobs in place of the full NetMap, which
|
||||
// we don't read anyway. (See ipn/ipnlocal/local.go's notify-elision.)
|
||||
w, err := localClient.WatchIPNBus(ctx, ipn.NotifyInitialOutgoingFiles|ipn.NotifyPeerChanges)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer w.Close()
|
||||
for {
|
||||
n, err := w.Next()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
for _, of := range n.OutgoingFiles {
|
||||
if of.PeerID != peer {
|
||||
continue
|
||||
}
|
||||
// tailscaled keeps Finished entries in its OutgoingFiles map
|
||||
// across PushFile calls (see feature/taildrop/ext.go), so a
|
||||
// re-send of the same filename will see both the old completed
|
||||
// (Sent == DeclaredSize) entry and the new in-progress one.
|
||||
// Without this filter the watcher's monotonic CAS would latch
|
||||
// onto the old entry's max value and the new transfer would
|
||||
// appear stuck at 100% from the first bus tick.
|
||||
if of.Finished {
|
||||
continue
|
||||
}
|
||||
onUpdate(of.Name, of.Sent)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// progressPrinter repaints a single-line transfer progress display every
|
||||
// interval. interval must be > 0; runCp's caller gates on the
|
||||
// --update-interval flag and skips invoking us when it's <= 0.
|
||||
//
|
||||
// It returns when ctx is done OR when it detects the transfer is stuck —
|
||||
// "stuck" being: contentCount has equalled contentLength with a near-zero
|
||||
// rate for >2 seconds. The stuck case prints a final newline so subsequent
|
||||
// output (e.g. an error from PushFile) lands on a fresh line below the
|
||||
// frozen progress line, instead of being painted over by it.
|
||||
func progressPrinter(ctx context.Context, name string, contentCount func() int64, contentLength int64, interval time.Duration) {
|
||||
var rateValueFast, rateValueSlow tsrate.Value
|
||||
rateValueFast.HalfLife = 1 * time.Second // fast response for rate measurement
|
||||
rateValueSlow.HalfLife = 10 * time.Second // slow response for ETA measurement
|
||||
// tailscaled emits OutgoingFile.Sent updates at ~1 Hz, so most printer
|
||||
// ticks see no delta. With too short a half-life the displayed rate
|
||||
// roughly halves between updates and doubles back when one arrives,
|
||||
// looking jumpy. 5s keeps the swing under ~15% while still settling
|
||||
// within a few seconds of a real change.
|
||||
rateValueFast.HalfLife = 5 * time.Second // smoothed rate for display
|
||||
rateValueSlow.HalfLife = 10 * time.Second // even slower, for ETA measurement
|
||||
var prevContentCount int64
|
||||
print := func() {
|
||||
currContentCount := contentCount()
|
||||
rateValueFast.Add(float64(currContentCount - prevContentCount))
|
||||
rateValueSlow.Add(float64(currContentCount - prevContentCount))
|
||||
// Clamp so a regression (which shouldn't happen, but tsrate.Value.Add
|
||||
// panics on a negative count) can't take down the CLI.
|
||||
delta := max(currContentCount-prevContentCount, 0)
|
||||
rateValueFast.Add(float64(delta))
|
||||
rateValueSlow.Add(float64(delta))
|
||||
prevContentCount = currContentCount
|
||||
|
||||
const vtRestartLine = "\r\x1b[K"
|
||||
@@ -215,16 +356,23 @@ func progressPrinter(ctx context.Context, name string, contentCount func() int64
|
||||
if contentLength >= 0 {
|
||||
currContentCount = min(currContentCount, contentLength) // cap at 100%
|
||||
ratioRemain := float64(currContentCount) / float64(contentLength)
|
||||
bytesRemain := float64(contentLength - currContentCount)
|
||||
secsRemain := bytesRemain / rateValueSlow.Rate()
|
||||
secs := int(min(max(0, secsRemain), 99*60*60+59+60+59))
|
||||
etaStr := "ETA -"
|
||||
if rate := rateValueSlow.Rate(); rate > 0 {
|
||||
bytesRemain := float64(contentLength - currContentCount)
|
||||
secsRemain := bytesRemain / rate
|
||||
secs := int(min(max(0, secsRemain), 99*60*60+59+60+59))
|
||||
etaStr = fmt.Sprintf("ETA %02d:%02d:%02d", secs/60/60, (secs/60)%60, secs%60)
|
||||
}
|
||||
fmt.Fprintf(os.Stderr, " %s %s",
|
||||
leftPad(fmt.Sprintf("%0.2f%%", 100.0*ratioRemain), len("100.00%")),
|
||||
fmt.Sprintf("ETA %02d:%02d:%02d", secs/60/60, (secs/60)%60, secs%60))
|
||||
etaStr)
|
||||
}
|
||||
}
|
||||
|
||||
tc := time.NewTicker(250 * time.Millisecond)
|
||||
const stuckAfter = 2 * time.Second
|
||||
var fullStartedAt time.Time // when we first observed currCount==contentLength with ~zero rate
|
||||
|
||||
tc := time.NewTicker(interval)
|
||||
defer tc.Stop()
|
||||
print()
|
||||
for {
|
||||
@@ -235,6 +383,24 @@ func progressPrinter(ctx context.Context, name string, contentCount func() int64
|
||||
return
|
||||
case <-tc.C:
|
||||
print()
|
||||
if contentLength < 0 {
|
||||
continue
|
||||
}
|
||||
currCount := contentCount()
|
||||
rate := rateValueFast.Rate()
|
||||
if currCount >= contentLength && rate < 1 {
|
||||
if fullStartedAt.IsZero() {
|
||||
fullStartedAt = time.Now()
|
||||
} else if time.Since(fullStartedAt) >= stuckAfter {
|
||||
// Transfer is stuck at 100% with no movement. Stop
|
||||
// repainting so we don't keep clobbering anything the
|
||||
// rest of runCp prints (warnings, errors).
|
||||
fmt.Fprintln(os.Stderr)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
fullStartedAt = time.Time{}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -328,7 +494,10 @@ peerLoop:
|
||||
return "", isOffline, errors.New("cannot send files: missing required Taildrop capability")
|
||||
|
||||
case ipnstate.TaildropTargetOffline:
|
||||
return "", isOffline, errors.New("cannot send files: peer is offline")
|
||||
// Don't gate on the server-reported Online bit (which lags reality
|
||||
// and isn't always accurate). runCp probes reachability itself with
|
||||
// TSMP pings.
|
||||
return foundPeer.ID, isOffline, nil
|
||||
|
||||
case ipnstate.TaildropTargetNoPeerInfo:
|
||||
return "", isOffline, errors.New("cannot send files: invalid or unrecognized peer")
|
||||
|
||||
Reference in New Issue
Block a user