diff --git a/cmd/tailscale/cli/file.go b/cmd/tailscale/cli/file.go index e7406bee3..a5c39b13d 100644 --- a/cmd/tailscale/cli/file.go +++ b/cmd/tailscale/cli/file.go @@ -32,6 +32,7 @@ import ( "tailscale.com/client/tailscale/apitype" "tailscale.com/cmd/tailscale/cli/ffcomplete" "tailscale.com/envknob" + "tailscale.com/ipn" "tailscale.com/ipn/ipnstate" "tailscale.com/net/tsaddr" "tailscale.com/tailcfg" @@ -78,14 +79,16 @@ var fileCpCmd = &ffcli.Command{ fs.StringVar(&cpArgs.name, "name", "", "alternate filename to use, especially useful when is \"-\" (stdin)") fs.BoolVar(&cpArgs.verbose, "verbose", false, "verbose output") fs.BoolVar(&cpArgs.targets, "targets", false, "list possible file cp targets") + fs.DurationVar(&cpArgs.updateInterval, "update-interval", 250*time.Millisecond, "how often to repaint the progress line; zero or negative disables progress display entirely") return fs })(), } var cpArgs struct { - name string - verbose bool - targets bool + name string + verbose bool + targets bool + updateInterval time.Duration } func runCp(ctx context.Context, args []string) error { @@ -119,9 +122,6 @@ func runCp(ctx context.Context, args []string) error { if err != nil { return fmt.Errorf("can't send to %s: %v", target, err) } - if isOffline { - fmt.Fprintf(Stderr, "# warning: %s is offline\n", target) - } if len(files) > 1 { if cpArgs.name != "" { @@ -132,7 +132,51 @@ func runCp(ctx context.Context, args []string) error { } } - for _, fileArg := range files { + // outFiles tracks per-name push state, populated by a goroutine subscribed + // to the IPN bus. tailscaled's OutgoingFile.Sent is the bytes-pulled-toward- + // peerAPI signal; it stays at 0 until the peerAPI request body is actually + // being read, which is what we want both for the progress display and for + // disarming the offline warning. The CLI's local-side bytes counter would + // say "100% sent" the moment net/http buffers a small body into the local + // unix-socket conn to tailscaled, well before the peer has heard a thing. + type pushState struct { + sent atomic.Int64 + warnTimer *time.Timer // disarmed on first byte sent to peerAPI; nil after + } + var ( + outMu sync.Mutex + outFiles = map[string]*pushState{} // keyed by file name + ) + + busCtx, cancelBus := context.WithCancel(ctx) + defer cancelBus() + go watchOutgoingFiles(busCtx, stableID, func(name string, sent int64) { + outMu.Lock() + ps := outFiles[name] + outMu.Unlock() + if ps == nil { + return + } + // Only ever advance ps.sent forward. Bus updates can arrive late + // (after the success path below has already written contentLength + // to ps.sent for an instant final-100% paint), so we'd otherwise + // regress the count and the progress printer would compute a + // negative delta on its next tick. + for { + old := ps.sent.Load() + if sent <= old { + return + } + if ps.sent.CompareAndSwap(old, sent) { + if old == 0 && ps.warnTimer != nil { + ps.warnTimer.Stop() + } + return + } + } + }) + + for i, fileArg := range files { var fileContents *countingReader var name = cpArgs.name var contentLength int64 = -1 @@ -175,16 +219,57 @@ func runCp(ctx context.Context, args []string) error { log.Printf("sending %q to %v/%v/%v ...", name, target, ip, stableID) } + // Register this file with the watcher and, for the first file only, + // arm a timer that warns the user if no bytes have flowed to peerAPI + // after a few seconds. The watcher disarms it on first byte; PushFile + // returning also disarms it (cleanup, below). We don't gate on the + // netmap's Online bit (which can lag reality), but we do use it to + // pick between two warning messages. + ps := &pushState{} + if i == 0 { + ps.warnTimer = time.AfterFunc(3*time.Second, func() { + // vtRestartLine clears whatever (possibly progress) was on + // the current line, then we print the warning + \n so the + // next progress redraw lands on a fresh line below. + const vtRestartLine = "\r\x1b[K" + if isOffline { + fmt.Fprintf(Stderr, "%s# warning: %s is reportedly offline; trying anyway\n", vtRestartLine, target) + } else { + fmt.Fprintf(Stderr, "%s# warning: %s is not replying; trying anyway\n", vtRestartLine, target) + } + }) + } + outMu.Lock() + outFiles[name] = ps + outMu.Unlock() + var group sync.WaitGroup ctxProgress, cancelProgress := context.WithCancel(ctx) defer cancelProgress() - if isatty.IsTerminal(os.Stderr.Fd()) { - group.Go(func() { progressPrinter(ctxProgress, name, fileContents.n.Load, contentLength) }) + if cpArgs.updateInterval > 0 && isatty.IsTerminal(os.Stderr.Fd()) { + group.Go(func() { + progressPrinter(ctxProgress, name, ps.sent.Load, contentLength, cpArgs.updateInterval) + }) } err := localClient.PushFile(ctx, stableID, contentLength, name, fileContents) + if err == nil { + // PushFile can finish faster than the IPN bus delivers a final + // OutgoingFile update, leaving the progress display stuck at 0%. + // Synthesize a "fully done" count before stopping the printer so + // its final paint shows 100%. For stdin (contentLength == -1) we + // don't know the size, so fall back to the local read count. + if contentLength >= 0 { + ps.sent.Store(contentLength) + } else { + ps.sent.Store(fileContents.n.Load()) + } + } cancelProgress() group.Wait() // wait for progress printer to stop before reporting the error + if ps.warnTimer != nil { + ps.warnTimer.Stop() + } if err != nil { return err } @@ -195,15 +280,71 @@ func runCp(ctx context.Context, args []string) error { return nil } -func progressPrinter(ctx context.Context, name string, contentCount func() int64, contentLength int64) { +// watchOutgoingFiles subscribes to the IPN bus and invokes onUpdate once +// per OutgoingFile event for files going to peer. It runs until ctx is +// done (which runCp does on return) and is best-effort: if the bus +// subscription fails for any reason, onUpdate simply isn't called and the +// caller's progress display stays at 0 — exactly the right degradation, +// since the warning timer will then fire on its normal 3-second deadline. +func watchOutgoingFiles(ctx context.Context, peer tailcfg.StableNodeID, onUpdate func(name string, sent int64)) { + // NotifyPeerChanges asks the broadcaster to deliver incremental peer + // updates as small PeerChanges blobs in place of the full NetMap, which + // we don't read anyway. (See ipn/ipnlocal/local.go's notify-elision.) + w, err := localClient.WatchIPNBus(ctx, ipn.NotifyInitialOutgoingFiles|ipn.NotifyPeerChanges) + if err != nil { + return + } + defer w.Close() + for { + n, err := w.Next() + if err != nil { + return + } + for _, of := range n.OutgoingFiles { + if of.PeerID != peer { + continue + } + // tailscaled keeps Finished entries in its OutgoingFiles map + // across PushFile calls (see feature/taildrop/ext.go), so a + // re-send of the same filename will see both the old completed + // (Sent == DeclaredSize) entry and the new in-progress one. + // Without this filter the watcher's monotonic CAS would latch + // onto the old entry's max value and the new transfer would + // appear stuck at 100% from the first bus tick. + if of.Finished { + continue + } + onUpdate(of.Name, of.Sent) + } + } +} + +// progressPrinter repaints a single-line transfer progress display every +// interval. interval must be > 0; runCp's caller gates on the +// --update-interval flag and skips invoking us when it's <= 0. +// +// It returns when ctx is done OR when it detects the transfer is stuck — +// "stuck" being: contentCount has equalled contentLength with a near-zero +// rate for >2 seconds. The stuck case prints a final newline so subsequent +// output (e.g. an error from PushFile) lands on a fresh line below the +// frozen progress line, instead of being painted over by it. +func progressPrinter(ctx context.Context, name string, contentCount func() int64, contentLength int64, interval time.Duration) { var rateValueFast, rateValueSlow tsrate.Value - rateValueFast.HalfLife = 1 * time.Second // fast response for rate measurement - rateValueSlow.HalfLife = 10 * time.Second // slow response for ETA measurement + // tailscaled emits OutgoingFile.Sent updates at ~1 Hz, so most printer + // ticks see no delta. With too short a half-life the displayed rate + // roughly halves between updates and doubles back when one arrives, + // looking jumpy. 5s keeps the swing under ~15% while still settling + // within a few seconds of a real change. + rateValueFast.HalfLife = 5 * time.Second // smoothed rate for display + rateValueSlow.HalfLife = 10 * time.Second // even slower, for ETA measurement var prevContentCount int64 print := func() { currContentCount := contentCount() - rateValueFast.Add(float64(currContentCount - prevContentCount)) - rateValueSlow.Add(float64(currContentCount - prevContentCount)) + // Clamp so a regression (which shouldn't happen, but tsrate.Value.Add + // panics on a negative count) can't take down the CLI. + delta := max(currContentCount-prevContentCount, 0) + rateValueFast.Add(float64(delta)) + rateValueSlow.Add(float64(delta)) prevContentCount = currContentCount const vtRestartLine = "\r\x1b[K" @@ -215,16 +356,23 @@ func progressPrinter(ctx context.Context, name string, contentCount func() int64 if contentLength >= 0 { currContentCount = min(currContentCount, contentLength) // cap at 100% ratioRemain := float64(currContentCount) / float64(contentLength) - bytesRemain := float64(contentLength - currContentCount) - secsRemain := bytesRemain / rateValueSlow.Rate() - secs := int(min(max(0, secsRemain), 99*60*60+59+60+59)) + etaStr := "ETA -" + if rate := rateValueSlow.Rate(); rate > 0 { + bytesRemain := float64(contentLength - currContentCount) + secsRemain := bytesRemain / rate + secs := int(min(max(0, secsRemain), 99*60*60+59+60+59)) + etaStr = fmt.Sprintf("ETA %02d:%02d:%02d", secs/60/60, (secs/60)%60, secs%60) + } fmt.Fprintf(os.Stderr, " %s %s", leftPad(fmt.Sprintf("%0.2f%%", 100.0*ratioRemain), len("100.00%")), - fmt.Sprintf("ETA %02d:%02d:%02d", secs/60/60, (secs/60)%60, secs%60)) + etaStr) } } - tc := time.NewTicker(250 * time.Millisecond) + const stuckAfter = 2 * time.Second + var fullStartedAt time.Time // when we first observed currCount==contentLength with ~zero rate + + tc := time.NewTicker(interval) defer tc.Stop() print() for { @@ -235,6 +383,24 @@ func progressPrinter(ctx context.Context, name string, contentCount func() int64 return case <-tc.C: print() + if contentLength < 0 { + continue + } + currCount := contentCount() + rate := rateValueFast.Rate() + if currCount >= contentLength && rate < 1 { + if fullStartedAt.IsZero() { + fullStartedAt = time.Now() + } else if time.Since(fullStartedAt) >= stuckAfter { + // Transfer is stuck at 100% with no movement. Stop + // repainting so we don't keep clobbering anything the + // rest of runCp prints (warnings, errors). + fmt.Fprintln(os.Stderr) + return + } + } else { + fullStartedAt = time.Time{} + } } } } @@ -328,7 +494,10 @@ peerLoop: return "", isOffline, errors.New("cannot send files: missing required Taildrop capability") case ipnstate.TaildropTargetOffline: - return "", isOffline, errors.New("cannot send files: peer is offline") + // Don't gate on the server-reported Online bit (which lags reality + // and isn't always accurate). runCp probes reachability itself with + // TSMP pings. + return foundPeer.ID, isOffline, nil case ipnstate.TaildropTargetNoPeerInfo: return "", isOffline, errors.New("cannot send files: invalid or unrecognized peer")