util/eventbus: extract non-generic SubscriberFunc constructor body and cache type name
Two changes that share the same intent of reducing per-T duplication
in code that doesn't actually depend on T:
1. Hoist the non-generic portion of newSubscriberFunc[T] into a
newSubscriberFuncCore() helper. The hoisted work is the time
timer setup, the subscriberFuncCore allocation, and the
unregister closure (which captures only the non-generic
reflect.Type and *subscribeState). The generic body now does
only the two T-bound things it has to: compute reflect.TypeFor[T]
and create the dispatch closure.
Effect on the per-shape-stencil body of newSubscriberFunc[T]:
before: 523 B per shape (in synthetic test)
after: 293 B per shape (-230 B per shape; -56% on this body)
2. Cache reflect.Type.String() once at construction (in core.typeName)
instead of recomputing it every time the dispatch closure runs.
The dispatch closure also now takes the *subscriberFuncCore directly
rather than building an intermediate dispatchFuncState struct on
every call.
Effect on the dispatch closure body (newSubscriberFunc[T].func1):
before: 581 B per shape
after: 480 B per shape (-101 B per shape; -17%)
Combined effect on tailscaled (linux/amd64):
named-symbol savings via symcost: ~7 KB
stripped binary delta: -8 KB (page-quantized)
arm64 binary delta: 0 (page-quantized)
cumulative reduction from baseline (5167ff412):
linux/amd64: -110,592 bytes (-0.391%)
linux/arm64: -131,072 bytes (-0.499%)
Throughput is also improved by the typeName cache: BenchmarkBasic
goes from 2018 ns/op to 1864 ns/op (-7.6%) because the dispatch hot
path no longer allocates a string on every event.
Updates #12614
Change-Id: Ib3a3d6796785e16506330ec034e1144580d467a3
Signed-off-by: James Tucker <james@tailscale.com>
This commit is contained in:
committed by
James Tucker
parent
758ebe9839
commit
120bfcf1cc
+53
-35
@@ -291,7 +291,7 @@ type SubscriberFunc[T any] struct {
|
||||
// it without per-T itabs or dictionaries.
|
||||
type subscriberFuncCore struct {
|
||||
stop stopFlag
|
||||
unregister func()
|
||||
unregister func(reflect.Type)
|
||||
logf logger.Logf
|
||||
slow *time.Timer // used to detect slow subscriber service
|
||||
|
||||
@@ -299,6 +299,12 @@ type subscriberFuncCore struct {
|
||||
// subscribeType() and used by the dispatch closure to format
|
||||
// slow-subscriber log messages.
|
||||
typ reflect.Type
|
||||
// typeName is the cached reflect.TypeFor[T]().String() result.
|
||||
// Computed once at construction time so the dispatch closure
|
||||
// (which runs once per delivered event) doesn't allocate a
|
||||
// fresh string on every call. The string is also independent
|
||||
// of T, so it doesn't contribute to per-T stencil cost.
|
||||
typeName string
|
||||
|
||||
// dispatchFn is the per-T dispatch closure. It performs the
|
||||
// type assertion vals.Peek().Event.(T) and runs the user
|
||||
@@ -316,14 +322,13 @@ type subscriberFuncCore struct {
|
||||
}
|
||||
|
||||
func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *SubscriberFunc[T] {
|
||||
slow := time.NewTimer(0)
|
||||
slow.Stop() // reset in dispatch
|
||||
core := &subscriberFuncCore{
|
||||
logf: logf,
|
||||
slow: slow,
|
||||
typ: reflect.TypeFor[T](),
|
||||
}
|
||||
core.unregister = func() { r.deleteSubscriber(core.typ) }
|
||||
core := newSubscriberFuncCore(r, logf, reflect.TypeFor[T]())
|
||||
// The dispatch closure is the only piece that intrinsically
|
||||
// needs T: it performs the type assertion on the head queue
|
||||
// value and forwards the unboxed value to the user callback.
|
||||
// All non-generic setup (timer, core allocation, unregister
|
||||
// closure) lives in newSubscriberFuncCore so it isn't
|
||||
// duplicated per T.
|
||||
core.dispatchFn = func(
|
||||
ctx context.Context,
|
||||
vals *queue[DeliveredEvent],
|
||||
@@ -337,15 +342,36 @@ func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *S
|
||||
// (`go func() { f(t) }()`) would allocate a closure on the
|
||||
// heap on every dispatched event.
|
||||
go runFuncCallback(f, t, callDone)
|
||||
return dispatchFunc(ctx, dispatchFuncState{
|
||||
slow: core.slow,
|
||||
logf: core.logf,
|
||||
typeName: core.typ.String(),
|
||||
}, vals, acceptCh, snapshot, callDone)
|
||||
return dispatchFunc(ctx, core, vals, acceptCh, snapshot, callDone)
|
||||
}
|
||||
return &SubscriberFunc[T]{core: core}
|
||||
}
|
||||
|
||||
// newSubscriberFuncCore performs the non-generic portion of
|
||||
// subscriber construction: timer setup, core struct allocation,
|
||||
// and creation of the unregister closure that captures only the
|
||||
// (non-generic) reflect.Type and *subscribeState. The caller fills
|
||||
// in the per-T dispatchFn afterward.
|
||||
//
|
||||
// Hoisting this out of newSubscriberFunc[T] eliminates the bulk of
|
||||
// the constructor body's per-T stencil cost; the only T-typed
|
||||
// instructions left in the generic constructor are the
|
||||
// reflect.TypeFor[T]() call (whose body is shared via the
|
||||
// internal/abi.TypeFor[T] dictionary) and the construction of the
|
||||
// dispatch closure itself.
|
||||
func newSubscriberFuncCore(r *subscribeState, logf logger.Logf, typ reflect.Type) *subscriberFuncCore {
|
||||
slow := time.NewTimer(0)
|
||||
slow.Stop() // reset in dispatch
|
||||
core := &subscriberFuncCore{
|
||||
logf: logf,
|
||||
slow: slow,
|
||||
typ: typ,
|
||||
typeName: typ.String(),
|
||||
}
|
||||
core.unregister = r.deleteSubscriber
|
||||
return core
|
||||
}
|
||||
|
||||
// Close closes the SubscriberFunc, indicating the caller no longer wishes to
|
||||
// receive this event type. After Close, no further events will be passed to
|
||||
// the callback.
|
||||
@@ -358,7 +384,7 @@ func (s *SubscriberFunc[T]) Close() { s.core.Close() }
|
||||
// (*SubscriberFunc[T]).Close.
|
||||
func (c *subscriberFuncCore) Close() {
|
||||
c.stop.Stop()
|
||||
c.unregister()
|
||||
c.unregister(c.typ)
|
||||
}
|
||||
|
||||
// subscribeType implements the subscriber interface.
|
||||
@@ -375,34 +401,26 @@ func (c *subscriberFuncCore) dispatch(
|
||||
return c.dispatchFn(ctx, vals, acceptCh, snapshot)
|
||||
}
|
||||
|
||||
// dispatchFuncState is the non-generic state needed by dispatchFunc.
|
||||
// Bundling these lets us pass them as one argument and keeps the
|
||||
// per-T wrapper at the dispatch call site small.
|
||||
type dispatchFuncState struct {
|
||||
slow *time.Timer
|
||||
logf logger.Logf
|
||||
typeName string // cached reflect.TypeFor[T]().String()
|
||||
}
|
||||
|
||||
// dispatchFunc is the non-generic body of SubscriberFunc[T].dispatch.
|
||||
// It is identical in observable behavior to the original loop; the
|
||||
// only differences are that the dispatched value has already been
|
||||
// unboxed by the caller (and the user callback is already running
|
||||
// on its own goroutine, signaling completion via callDone) and the
|
||||
// type name has already been resolved (and is passed as a string).
|
||||
// slow-subscriber timer / cached type name come from the
|
||||
// non-generic core, not from a per-T struct.
|
||||
//
|
||||
// callDone is closed by runFuncCallback when the user callback returns.
|
||||
func dispatchFunc(
|
||||
ctx context.Context,
|
||||
st dispatchFuncState,
|
||||
core *subscriberFuncCore,
|
||||
vals *queue[DeliveredEvent],
|
||||
acceptCh func() chan DeliveredEvent,
|
||||
snapshot chan chan []DeliveredEvent,
|
||||
callDone chan struct{},
|
||||
) bool {
|
||||
start := time.Now()
|
||||
st.slow.Reset(slowSubscriberTimeout)
|
||||
defer st.slow.Stop()
|
||||
core.slow.Reset(slowSubscriberTimeout)
|
||||
defer core.slow.Stop()
|
||||
|
||||
// Keep the cases in this select in sync with subscribeState.pump
|
||||
// above. The only difference should be that this select
|
||||
@@ -417,23 +435,23 @@ func dispatchFunc(
|
||||
vals.Add(val)
|
||||
case <-ctx.Done():
|
||||
// Wait for the callback to be complete, but not forever.
|
||||
st.slow.Reset(5 * slowSubscriberTimeout)
|
||||
core.slow.Reset(5 * slowSubscriberTimeout)
|
||||
select {
|
||||
case <-st.slow.C:
|
||||
st.logf("giving up on subscriber for %s after %v at close", st.typeName, time.Since(start))
|
||||
case <-core.slow.C:
|
||||
core.logf("giving up on subscriber for %s after %v at close", core.typeName, time.Since(start))
|
||||
if cibuild.On() {
|
||||
all := make([]byte, 2<<20)
|
||||
n := runtime.Stack(all, true)
|
||||
st.logf("goroutine stacks:\n%s", all[:n])
|
||||
core.logf("goroutine stacks:\n%s", all[:n])
|
||||
}
|
||||
case <-callDone:
|
||||
}
|
||||
return false
|
||||
case ch := <-snapshot:
|
||||
ch <- vals.Snapshot()
|
||||
case <-st.slow.C:
|
||||
st.logf("subscriber for %s is slow (%v elapsed)", st.typeName, time.Since(start))
|
||||
st.slow.Reset(slowSubscriberTimeout)
|
||||
case <-core.slow.C:
|
||||
core.logf("subscriber for %s is slow (%v elapsed)", core.typeName, time.Since(start))
|
||||
core.slow.Reset(slowSubscriberTimeout)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user