Limit spamming GUIs with boring updates to once in 3 seconds, unless the notification is relatively interesting and the GUI should update immediately. This is basically @barnstar's #14119 but with the logic moved to be per-watch-session (since the bit is per session), rather than globally. And this distinguishes notable Notify messages (such as state changes) and makes them send immediately. Updates tailscale/corp#24553 Change-Id: I79cac52cce85280ce351e65e76ea11e107b00b49 Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>main
parent
c2a7f17f2b
commit
93db503565
@ -0,0 +1,161 @@ |
|||||||
|
// Copyright (c) Tailscale Inc & AUTHORS
|
||||||
|
// SPDX-License-Identifier: BSD-3-Clause
|
||||||
|
|
||||||
|
package ipnlocal |
||||||
|
|
||||||
|
import ( |
||||||
|
"context" |
||||||
|
"time" |
||||||
|
|
||||||
|
"tailscale.com/ipn" |
||||||
|
"tailscale.com/tstime" |
||||||
|
) |
||||||
|
|
||||||
|
type rateLimitingBusSender struct { |
||||||
|
fn func(*ipn.Notify) (keepGoing bool) |
||||||
|
lastFlush time.Time // last call to fn, or zero value if none
|
||||||
|
interval time.Duration // 0 to flush immediately; non-zero to rate limit sends
|
||||||
|
clock tstime.DefaultClock // non-nil for testing
|
||||||
|
didSendTestHook func() // non-nil for testing
|
||||||
|
|
||||||
|
// pending, if non-nil, is the pending notification that we
|
||||||
|
// haven't sent yet. We own this memory to mutate.
|
||||||
|
pending *ipn.Notify |
||||||
|
|
||||||
|
// flushTimer is non-nil if the timer is armed.
|
||||||
|
flushTimer tstime.TimerController // effectively a *time.Timer
|
||||||
|
flushTimerC <-chan time.Time // ... said ~Timer's C chan
|
||||||
|
} |
||||||
|
|
||||||
|
func (s *rateLimitingBusSender) close() { |
||||||
|
if s.flushTimer != nil { |
||||||
|
s.flushTimer.Stop() |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
func (s *rateLimitingBusSender) flushChan() <-chan time.Time { |
||||||
|
return s.flushTimerC |
||||||
|
} |
||||||
|
|
||||||
|
func (s *rateLimitingBusSender) flush() (keepGoing bool) { |
||||||
|
if n := s.pending; n != nil { |
||||||
|
s.pending = nil |
||||||
|
return s.flushNotify(n) |
||||||
|
} |
||||||
|
return true |
||||||
|
} |
||||||
|
|
||||||
|
func (s *rateLimitingBusSender) flushNotify(n *ipn.Notify) (keepGoing bool) { |
||||||
|
s.lastFlush = s.clock.Now() |
||||||
|
return s.fn(n) |
||||||
|
} |
||||||
|
|
||||||
|
// send conditionally sends n to the underlying fn, possibly rate
|
||||||
|
// limiting it, depending on whether s.interval is set, and whether
|
||||||
|
// n is a notable notification that the client (typically a GUI) would
|
||||||
|
// want to act on (render) immediately.
|
||||||
|
//
|
||||||
|
// It returns whether the caller should keep looping.
|
||||||
|
//
|
||||||
|
// The passed-in memory 'n' is owned by the caller and should
|
||||||
|
// not be mutated.
|
||||||
|
func (s *rateLimitingBusSender) send(n *ipn.Notify) (keepGoing bool) { |
||||||
|
if s.interval <= 0 { |
||||||
|
// No rate limiting case.
|
||||||
|
return s.fn(n) |
||||||
|
} |
||||||
|
if isNotableNotify(n) { |
||||||
|
// Notable notifications are always sent immediately.
|
||||||
|
// But first send any boring one that was pending.
|
||||||
|
// TODO(bradfitz): there might be a boring one pending
|
||||||
|
// with a NetMap or Engine field that is redundant
|
||||||
|
// with the new one (n) with NetMap or Engine populated.
|
||||||
|
// We should clear the pending one's NetMap/Engine in
|
||||||
|
// that case. Or really, merge the two, but mergeBoringNotifies
|
||||||
|
// only handles the case of both sides being boring.
|
||||||
|
// So for now, flush both.
|
||||||
|
if !s.flush() { |
||||||
|
return false |
||||||
|
} |
||||||
|
return s.flushNotify(n) |
||||||
|
} |
||||||
|
s.pending = mergeBoringNotifies(s.pending, n) |
||||||
|
d := s.clock.Now().Sub(s.lastFlush) |
||||||
|
if d > s.interval { |
||||||
|
return s.flush() |
||||||
|
} |
||||||
|
nextFlushIn := s.interval - d |
||||||
|
if s.flushTimer == nil { |
||||||
|
s.flushTimer, s.flushTimerC = s.clock.NewTimer(nextFlushIn) |
||||||
|
} else { |
||||||
|
s.flushTimer.Reset(nextFlushIn) |
||||||
|
} |
||||||
|
return true |
||||||
|
} |
||||||
|
|
||||||
|
func (s *rateLimitingBusSender) Run(ctx context.Context, ch <-chan *ipn.Notify) { |
||||||
|
for { |
||||||
|
select { |
||||||
|
case <-ctx.Done(): |
||||||
|
return |
||||||
|
case n, ok := <-ch: |
||||||
|
if !ok { |
||||||
|
return |
||||||
|
} |
||||||
|
if !s.send(n) { |
||||||
|
return |
||||||
|
} |
||||||
|
if f := s.didSendTestHook; f != nil { |
||||||
|
f() |
||||||
|
} |
||||||
|
case <-s.flushChan(): |
||||||
|
if !s.flush() { |
||||||
|
return |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// mergeBoringNotify merges new notify 'src' into possibly-nil 'dst',
|
||||||
|
// either mutating 'dst' or allocating a new one if 'dst' is nil,
|
||||||
|
// returning the merged result.
|
||||||
|
//
|
||||||
|
// dst and src must both be "boring" (i.e. not notable per isNotifiableNotify).
|
||||||
|
func mergeBoringNotifies(dst, src *ipn.Notify) *ipn.Notify { |
||||||
|
if dst == nil { |
||||||
|
dst = &ipn.Notify{Version: src.Version} |
||||||
|
} |
||||||
|
if src.NetMap != nil { |
||||||
|
dst.NetMap = src.NetMap |
||||||
|
} |
||||||
|
if src.Engine != nil { |
||||||
|
dst.Engine = src.Engine |
||||||
|
} |
||||||
|
return dst |
||||||
|
} |
||||||
|
|
||||||
|
// isNotableNotify reports whether n is a "notable" notification that
|
||||||
|
// should be sent on the IPN bus immediately (e.g. to GUIs) without
|
||||||
|
// rate limiting it for a few seconds.
|
||||||
|
//
|
||||||
|
// It effectively reports whether n contains any field set that's
|
||||||
|
// not NetMap or Engine.
|
||||||
|
func isNotableNotify(n *ipn.Notify) bool { |
||||||
|
if n == nil { |
||||||
|
return false |
||||||
|
} |
||||||
|
return n.State != nil || |
||||||
|
n.SessionID != "" || |
||||||
|
n.BackendLogID != nil || |
||||||
|
n.BrowseToURL != nil || |
||||||
|
n.LocalTCPPort != nil || |
||||||
|
n.ClientVersion != nil || |
||||||
|
n.Prefs != nil || |
||||||
|
n.ErrMessage != nil || |
||||||
|
n.LoginFinished != nil || |
||||||
|
!n.DriveShares.IsNil() || |
||||||
|
n.Health != nil || |
||||||
|
len(n.IncomingFiles) > 0 || |
||||||
|
len(n.OutgoingFiles) > 0 || |
||||||
|
n.FilesWaiting != nil |
||||||
|
} |
||||||
@ -0,0 +1,220 @@ |
|||||||
|
// Copyright (c) Tailscale Inc & AUTHORS
|
||||||
|
// SPDX-License-Identifier: BSD-3-Clause
|
||||||
|
|
||||||
|
package ipnlocal |
||||||
|
|
||||||
|
import ( |
||||||
|
"context" |
||||||
|
"reflect" |
||||||
|
"slices" |
||||||
|
"testing" |
||||||
|
"time" |
||||||
|
|
||||||
|
"tailscale.com/drive" |
||||||
|
"tailscale.com/ipn" |
||||||
|
"tailscale.com/tstest" |
||||||
|
"tailscale.com/tstime" |
||||||
|
"tailscale.com/types/logger" |
||||||
|
"tailscale.com/types/netmap" |
||||||
|
"tailscale.com/types/views" |
||||||
|
) |
||||||
|
|
||||||
|
func TestIsNotableNotify(t *testing.T) { |
||||||
|
tests := []struct { |
||||||
|
name string |
||||||
|
notify *ipn.Notify |
||||||
|
want bool |
||||||
|
}{ |
||||||
|
{"nil", nil, false}, |
||||||
|
{"empty", &ipn.Notify{}, false}, |
||||||
|
{"version", &ipn.Notify{Version: "foo"}, false}, |
||||||
|
{"netmap", &ipn.Notify{NetMap: new(netmap.NetworkMap)}, false}, |
||||||
|
{"engine", &ipn.Notify{Engine: new(ipn.EngineStatus)}, false}, |
||||||
|
} |
||||||
|
|
||||||
|
// Then for all other fields, assume they're notable.
|
||||||
|
// We use reflect to catch fields that might be added in the future without
|
||||||
|
// remembering to update the [isNotableNotify] function.
|
||||||
|
rt := reflect.TypeFor[ipn.Notify]() |
||||||
|
for i := range rt.NumField() { |
||||||
|
n := &ipn.Notify{} |
||||||
|
sf := rt.Field(i) |
||||||
|
switch sf.Name { |
||||||
|
case "_", "NetMap", "Engine", "Version": |
||||||
|
// Already covered above or not applicable.
|
||||||
|
continue |
||||||
|
case "DriveShares": |
||||||
|
n.DriveShares = views.SliceOfViews[*drive.Share, drive.ShareView](make([]*drive.Share, 1)) |
||||||
|
default: |
||||||
|
rf := reflect.ValueOf(n).Elem().Field(i) |
||||||
|
switch rf.Kind() { |
||||||
|
case reflect.Pointer: |
||||||
|
rf.Set(reflect.New(rf.Type().Elem())) |
||||||
|
case reflect.String: |
||||||
|
rf.SetString("foo") |
||||||
|
case reflect.Slice: |
||||||
|
rf.Set(reflect.MakeSlice(rf.Type(), 1, 1)) |
||||||
|
default: |
||||||
|
t.Errorf("unhandled field kind %v for %q", rf.Kind(), sf.Name) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
tests = append(tests, struct { |
||||||
|
name string |
||||||
|
notify *ipn.Notify |
||||||
|
want bool |
||||||
|
}{ |
||||||
|
name: "field-" + rt.Field(i).Name, |
||||||
|
notify: n, |
||||||
|
want: true, |
||||||
|
}) |
||||||
|
} |
||||||
|
|
||||||
|
for _, tt := range tests { |
||||||
|
if got := isNotableNotify(tt.notify); got != tt.want { |
||||||
|
t.Errorf("%v: got %v; want %v", tt.name, got, tt.want) |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
type rateLimitingBusSenderTester struct { |
||||||
|
tb testing.TB |
||||||
|
got []*ipn.Notify |
||||||
|
clock *tstest.Clock |
||||||
|
s *rateLimitingBusSender |
||||||
|
} |
||||||
|
|
||||||
|
func (st *rateLimitingBusSenderTester) init() { |
||||||
|
if st.s != nil { |
||||||
|
return |
||||||
|
} |
||||||
|
st.clock = tstest.NewClock(tstest.ClockOpts{ |
||||||
|
Start: time.Unix(1731777537, 0), // time I wrote this test :)
|
||||||
|
}) |
||||||
|
st.s = &rateLimitingBusSender{ |
||||||
|
clock: tstime.DefaultClock{Clock: st.clock}, |
||||||
|
fn: func(n *ipn.Notify) bool { |
||||||
|
st.got = append(st.got, n) |
||||||
|
return true |
||||||
|
}, |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
func (st *rateLimitingBusSenderTester) send(n *ipn.Notify) { |
||||||
|
st.tb.Helper() |
||||||
|
st.init() |
||||||
|
if !st.s.send(n) { |
||||||
|
st.tb.Fatal("unexpected send failed") |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
func (st *rateLimitingBusSenderTester) advance(d time.Duration) { |
||||||
|
st.tb.Helper() |
||||||
|
st.clock.Advance(d) |
||||||
|
select { |
||||||
|
case <-st.s.flushChan(): |
||||||
|
if !st.s.flush() { |
||||||
|
st.tb.Fatal("unexpected flush failed") |
||||||
|
} |
||||||
|
default: |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
func TestRateLimitingBusSender(t *testing.T) { |
||||||
|
nm1 := &ipn.Notify{NetMap: new(netmap.NetworkMap)} |
||||||
|
nm2 := &ipn.Notify{NetMap: new(netmap.NetworkMap)} |
||||||
|
eng1 := &ipn.Notify{Engine: new(ipn.EngineStatus)} |
||||||
|
eng2 := &ipn.Notify{Engine: new(ipn.EngineStatus)} |
||||||
|
|
||||||
|
t.Run("unbuffered", func(t *testing.T) { |
||||||
|
st := &rateLimitingBusSenderTester{tb: t} |
||||||
|
st.send(nm1) |
||||||
|
st.send(nm2) |
||||||
|
st.send(eng1) |
||||||
|
st.send(eng2) |
||||||
|
if !slices.Equal(st.got, []*ipn.Notify{nm1, nm2, eng1, eng2}) { |
||||||
|
t.Errorf("got %d items; want 4 specific ones, unmodified", len(st.got)) |
||||||
|
} |
||||||
|
}) |
||||||
|
|
||||||
|
t.Run("buffered", func(t *testing.T) { |
||||||
|
st := &rateLimitingBusSenderTester{tb: t} |
||||||
|
st.init() |
||||||
|
st.s.interval = 1 * time.Second |
||||||
|
st.send(&ipn.Notify{Version: "initial"}) |
||||||
|
if len(st.got) != 1 { |
||||||
|
t.Fatalf("got %d items; expected 1 (first to flush immediately)", len(st.got)) |
||||||
|
} |
||||||
|
st.send(nm1) |
||||||
|
st.send(nm2) |
||||||
|
st.send(eng1) |
||||||
|
st.send(eng2) |
||||||
|
if len(st.got) != 1 { |
||||||
|
if len(st.got) != 1 { |
||||||
|
t.Fatalf("got %d items; expected still just that first 1", len(st.got)) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// But moving the clock should flush the rest, collasced into one new one.
|
||||||
|
st.advance(5 * time.Second) |
||||||
|
if len(st.got) != 2 { |
||||||
|
t.Fatalf("got %d items; want 2", len(st.got)) |
||||||
|
} |
||||||
|
gotn := st.got[1] |
||||||
|
if gotn.NetMap != nm2.NetMap { |
||||||
|
t.Errorf("got wrong NetMap; got %p", gotn.NetMap) |
||||||
|
} |
||||||
|
if gotn.Engine != eng2.Engine { |
||||||
|
t.Errorf("got wrong Engine; got %p", gotn.Engine) |
||||||
|
} |
||||||
|
if t.Failed() { |
||||||
|
t.Logf("failed Notify was: %v", logger.AsJSON(gotn)) |
||||||
|
} |
||||||
|
}) |
||||||
|
|
||||||
|
// Test the Run method
|
||||||
|
t.Run("run", func(t *testing.T) { |
||||||
|
st := &rateLimitingBusSenderTester{tb: t} |
||||||
|
st.init() |
||||||
|
st.s.interval = 1 * time.Second |
||||||
|
st.s.lastFlush = st.clock.Now() // pretend we just flushed
|
||||||
|
|
||||||
|
flushc := make(chan *ipn.Notify, 1) |
||||||
|
st.s.fn = func(n *ipn.Notify) bool { |
||||||
|
flushc <- n |
||||||
|
return true |
||||||
|
} |
||||||
|
didSend := make(chan bool, 2) |
||||||
|
st.s.didSendTestHook = func() { didSend <- true } |
||||||
|
waitSend := func() { |
||||||
|
select { |
||||||
|
case <-didSend: |
||||||
|
case <-time.After(5 * time.Second): |
||||||
|
t.Error("timeout waiting for call to send") |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background()) |
||||||
|
defer cancel() |
||||||
|
|
||||||
|
incoming := make(chan *ipn.Notify, 2) |
||||||
|
go func() { |
||||||
|
incoming <- nm1 |
||||||
|
waitSend() |
||||||
|
incoming <- nm2 |
||||||
|
waitSend() |
||||||
|
st.advance(5 * time.Second) |
||||||
|
select { |
||||||
|
case n := <-flushc: |
||||||
|
if n.NetMap != nm2.NetMap { |
||||||
|
t.Errorf("got wrong NetMap; got %p", n.NetMap) |
||||||
|
} |
||||||
|
case <-time.After(10 * time.Second): |
||||||
|
t.Error("timeout") |
||||||
|
} |
||||||
|
cancel() |
||||||
|
}() |
||||||
|
|
||||||
|
st.s.Run(ctx, incoming) |
||||||
|
}) |
||||||
|
} |
||||||
Loading…
Reference in new issue