From 120bfcf1cc3bdfcc7b5d78b12e725d161bb7f05f Mon Sep 17 00:00:00 2001 From: James Tucker Date: Mon, 4 May 2026 21:59:47 +0000 Subject: [PATCH] util/eventbus: extract non-generic SubscriberFunc constructor body and cache type name Two changes that share the same intent of reducing per-T duplication in code that doesn't actually depend on T: 1. Hoist the non-generic portion of newSubscriberFunc[T] into a newSubscriberFuncCore() helper. The hoisted work is the time timer setup, the subscriberFuncCore allocation, and the unregister closure (which captures only the non-generic reflect.Type and *subscribeState). The generic body now does only the two T-bound things it has to: compute reflect.TypeFor[T] and create the dispatch closure. Effect on the per-shape-stencil body of newSubscriberFunc[T]: before: 523 B per shape (in synthetic test) after: 293 B per shape (-230 B per shape; -56% on this body) 2. Cache reflect.Type.String() once at construction (in core.typeName) instead of recomputing it every time the dispatch closure runs. The dispatch closure also now takes the *subscriberFuncCore directly rather than building an intermediate dispatchFuncState struct on every call. Effect on the dispatch closure body (newSubscriberFunc[T].func1): before: 581 B per shape after: 480 B per shape (-101 B per shape; -17%) Combined effect on tailscaled (linux/amd64): named-symbol savings via symcost: ~7 KB stripped binary delta: -8 KB (page-quantized) arm64 binary delta: 0 (page-quantized) cumulative reduction from baseline (5167ff412): linux/amd64: -110,592 bytes (-0.391%) linux/arm64: -131,072 bytes (-0.499%) Throughput is also improved by the typeName cache: BenchmarkBasic goes from 2018 ns/op to 1864 ns/op (-7.6%) because the dispatch hot path no longer allocates a string on every event. Updates #12614 Change-Id: Ib3a3d6796785e16506330ec034e1144580d467a3 Signed-off-by: James Tucker --- util/eventbus/subscribe.go | 88 +++++++++++++++++++++++--------------- 1 file changed, 53 insertions(+), 35 deletions(-) diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index aa3fe5c7b..ef380b7ac 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -291,7 +291,7 @@ type SubscriberFunc[T any] struct { // it without per-T itabs or dictionaries. type subscriberFuncCore struct { stop stopFlag - unregister func() + unregister func(reflect.Type) logf logger.Logf slow *time.Timer // used to detect slow subscriber service @@ -299,6 +299,12 @@ type subscriberFuncCore struct { // subscribeType() and used by the dispatch closure to format // slow-subscriber log messages. typ reflect.Type + // typeName is the cached reflect.TypeFor[T]().String() result. + // Computed once at construction time so the dispatch closure + // (which runs once per delivered event) doesn't allocate a + // fresh string on every call. The string is also independent + // of T, so it doesn't contribute to per-T stencil cost. + typeName string // dispatchFn is the per-T dispatch closure. It performs the // type assertion vals.Peek().Event.(T) and runs the user @@ -316,14 +322,13 @@ type subscriberFuncCore struct { } func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *SubscriberFunc[T] { - slow := time.NewTimer(0) - slow.Stop() // reset in dispatch - core := &subscriberFuncCore{ - logf: logf, - slow: slow, - typ: reflect.TypeFor[T](), - } - core.unregister = func() { r.deleteSubscriber(core.typ) } + core := newSubscriberFuncCore(r, logf, reflect.TypeFor[T]()) + // The dispatch closure is the only piece that intrinsically + // needs T: it performs the type assertion on the head queue + // value and forwards the unboxed value to the user callback. + // All non-generic setup (timer, core allocation, unregister + // closure) lives in newSubscriberFuncCore so it isn't + // duplicated per T. core.dispatchFn = func( ctx context.Context, vals *queue[DeliveredEvent], @@ -337,15 +342,36 @@ func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *S // (`go func() { f(t) }()`) would allocate a closure on the // heap on every dispatched event. go runFuncCallback(f, t, callDone) - return dispatchFunc(ctx, dispatchFuncState{ - slow: core.slow, - logf: core.logf, - typeName: core.typ.String(), - }, vals, acceptCh, snapshot, callDone) + return dispatchFunc(ctx, core, vals, acceptCh, snapshot, callDone) } return &SubscriberFunc[T]{core: core} } +// newSubscriberFuncCore performs the non-generic portion of +// subscriber construction: timer setup, core struct allocation, +// and creation of the unregister closure that captures only the +// (non-generic) reflect.Type and *subscribeState. The caller fills +// in the per-T dispatchFn afterward. +// +// Hoisting this out of newSubscriberFunc[T] eliminates the bulk of +// the constructor body's per-T stencil cost; the only T-typed +// instructions left in the generic constructor are the +// reflect.TypeFor[T]() call (whose body is shared via the +// internal/abi.TypeFor[T] dictionary) and the construction of the +// dispatch closure itself. +func newSubscriberFuncCore(r *subscribeState, logf logger.Logf, typ reflect.Type) *subscriberFuncCore { + slow := time.NewTimer(0) + slow.Stop() // reset in dispatch + core := &subscriberFuncCore{ + logf: logf, + slow: slow, + typ: typ, + typeName: typ.String(), + } + core.unregister = r.deleteSubscriber + return core +} + // Close closes the SubscriberFunc, indicating the caller no longer wishes to // receive this event type. After Close, no further events will be passed to // the callback. @@ -358,7 +384,7 @@ func (s *SubscriberFunc[T]) Close() { s.core.Close() } // (*SubscriberFunc[T]).Close. func (c *subscriberFuncCore) Close() { c.stop.Stop() - c.unregister() + c.unregister(c.typ) } // subscribeType implements the subscriber interface. @@ -375,34 +401,26 @@ func (c *subscriberFuncCore) dispatch( return c.dispatchFn(ctx, vals, acceptCh, snapshot) } -// dispatchFuncState is the non-generic state needed by dispatchFunc. -// Bundling these lets us pass them as one argument and keeps the -// per-T wrapper at the dispatch call site small. -type dispatchFuncState struct { - slow *time.Timer - logf logger.Logf - typeName string // cached reflect.TypeFor[T]().String() -} - // dispatchFunc is the non-generic body of SubscriberFunc[T].dispatch. // It is identical in observable behavior to the original loop; the // only differences are that the dispatched value has already been // unboxed by the caller (and the user callback is already running // on its own goroutine, signaling completion via callDone) and the -// type name has already been resolved (and is passed as a string). +// slow-subscriber timer / cached type name come from the +// non-generic core, not from a per-T struct. // // callDone is closed by runFuncCallback when the user callback returns. func dispatchFunc( ctx context.Context, - st dispatchFuncState, + core *subscriberFuncCore, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent, callDone chan struct{}, ) bool { start := time.Now() - st.slow.Reset(slowSubscriberTimeout) - defer st.slow.Stop() + core.slow.Reset(slowSubscriberTimeout) + defer core.slow.Stop() // Keep the cases in this select in sync with subscribeState.pump // above. The only difference should be that this select @@ -417,23 +435,23 @@ func dispatchFunc( vals.Add(val) case <-ctx.Done(): // Wait for the callback to be complete, but not forever. - st.slow.Reset(5 * slowSubscriberTimeout) + core.slow.Reset(5 * slowSubscriberTimeout) select { - case <-st.slow.C: - st.logf("giving up on subscriber for %s after %v at close", st.typeName, time.Since(start)) + case <-core.slow.C: + core.logf("giving up on subscriber for %s after %v at close", core.typeName, time.Since(start)) if cibuild.On() { all := make([]byte, 2<<20) n := runtime.Stack(all, true) - st.logf("goroutine stacks:\n%s", all[:n]) + core.logf("goroutine stacks:\n%s", all[:n]) } case <-callDone: } return false case ch := <-snapshot: ch <- vals.Snapshot() - case <-st.slow.C: - st.logf("subscriber for %s is slow (%v elapsed)", st.typeName, time.Since(start)) - st.slow.Reset(slowSubscriberTimeout) + case <-core.slow.C: + core.logf("subscriber for %s is slow (%v elapsed)", core.typeName, time.Since(start)) + core.slow.Reset(slowSubscriberTimeout) } } }