diff --git a/util/eventbus/client.go b/util/eventbus/client.go index f405146ce..e2806ffd8 100644 --- a/util/eventbus/client.go +++ b/util/eventbus/client.go @@ -169,7 +169,11 @@ func SubscribeFunc[T any](c *Client, f func(T)) *SubscriberFunc[T] { r := c.subscribeStateLocked() s := newSubscriberFunc[T](r, f, logfForCaller(c.logger())) - r.addSubscriber(s) + // Register the non-generic core, not the typed facade. Doing + // so means the bus's outputs map and the subscriber interface + // itab are not parameterized by T, eliminating per-T itab and + // dictionary cost. + r.addSubscriber(s.core) return s } diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index d53bce8d4..aa3fe5c7b 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -276,22 +276,74 @@ func (s *Subscriber[T]) Close() { // A SubscriberFunc delivers one type of event from a [Client]. // Events are forwarded synchronously to a function provided at construction. type SubscriberFunc[T any] struct { + // Implementation note: SubscriberFunc[T] is a thin facade over a + // non-generic *subscriberFuncCore. All of the behavior — the + // subscriber-interface implementation (Close, subscribeType, dispatch), the + // slow-subscriber timer, the type assertion, and the user callback + // invocation — lives on the core and is not instantiated per T. The only + // per-T cost is the small forwarding Close method below. + core *subscriberFuncCore +} + +// subscriberFuncCore is the non-generic implementation of a +// SubscriberFunc. It implements the package-private subscriber +// interface so that the bus (and the subscribeState map) can store +// it without per-T itabs or dictionaries. +type subscriberFuncCore struct { stop stopFlag - read func(T) unregister func() logf logger.Logf slow *time.Timer // used to detect slow subscriber service + + // typ is the cached reflect.Type of T. Returned by + // subscribeType() and used by the dispatch closure to format + // slow-subscriber log messages. + typ reflect.Type + + // dispatchFn is the per-T dispatch closure. It performs the + // type assertion vals.Peek().Event.(T) and runs the user + // callback on the unboxed value. The closure body is + // non-generic; its only per-T contribution is the type + // assertion and the call through s.read(T), which sit inside + // a single small captured closure rather than across a full + // select-loop stencil per T. + dispatchFn func( + ctx context.Context, + vals *queue[DeliveredEvent], + acceptCh func() chan DeliveredEvent, + snapshot chan chan []DeliveredEvent, + ) bool } func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *SubscriberFunc[T] { slow := time.NewTimer(0) slow.Stop() // reset in dispatch - return &SubscriberFunc[T]{ - read: f, - unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) }, - logf: logf, - slow: slow, + core := &subscriberFuncCore{ + logf: logf, + slow: slow, + typ: reflect.TypeFor[T](), } + core.unregister = func() { r.deleteSubscriber(core.typ) } + core.dispatchFn = func( + ctx context.Context, + vals *queue[DeliveredEvent], + acceptCh func() chan DeliveredEvent, + snapshot chan chan []DeliveredEvent, + ) bool { + t := vals.Peek().Event.(T) + callDone := make(chan struct{}) + // `go runFuncCallback(f, t, callDone)` binds its arguments + // directly to the new goroutine's frame; using a closure + // (`go func() { f(t) }()`) would allocate a closure on the + // heap on every dispatched event. + go runFuncCallback(f, t, callDone) + return dispatchFunc(ctx, dispatchFuncState{ + slow: core.slow, + logf: core.logf, + typeName: core.typ.String(), + }, vals, acceptCh, snapshot, callDone) + } + return &SubscriberFunc[T]{core: core} } // Close closes the SubscriberFunc, indicating the caller no longer wishes to @@ -300,38 +352,27 @@ func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *S // // If the [Bus] from which s was created is closed, s is implicitly closed and // does not need to be closed separately. -func (s *SubscriberFunc[T]) Close() { s.stop.Stop(); s.unregister() } +func (s *SubscriberFunc[T]) Close() { s.core.Close() } -// subscribeType implements part of the subscriber interface. -func (s *SubscriberFunc[T]) subscribeType() reflect.Type { return reflect.TypeFor[T]() } +// Close implements the subscriber interface and the user-facing +// (*SubscriberFunc[T]).Close. +func (c *subscriberFuncCore) Close() { + c.stop.Stop() + c.unregister() +} -// dispatch implements part of the subscriber interface. -// -// We deliberately keep this method body small and delegate the -// dispatch loop to dispatchFunc, a non-generic helper. Each -// instantiation of SubscriberFunc[T] otherwise produces a fresh -// stencil of the full ~40-line select loop (including the slow- -// subscriber timer, snapshot handling, and CI stack-dump branch), -// which is responsible for hundreds of bytes of binary size per -// distinct T. By isolating the per-T work to the type assertion -// and the callback closure, only a small fixed-size wrapper is -// emitted per T. -func (s *SubscriberFunc[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool { - t := vals.Peek().Event.(T) - callDone := make(chan struct{}) - // Launch the user callback on a goroutine via a generic worker - // rather than via a closure (`go func() { s.read(t) }()` would - // allocate the closure object on the heap on every dispatched - // event). `go runFuncCallback(s.read, t, callDone)` binds its - // arguments directly to the goroutine's frame, leaving allocation - // behavior identical to the original (*SubscriberFunc[T]).runCallback - // method. - go runFuncCallback(s.read, t, callDone) - return dispatchFunc(ctx, dispatchFuncState{ - slow: s.slow, - logf: s.logf, - typeName: reflect.TypeFor[T]().String(), - }, vals, acceptCh, snapshot, callDone) +// subscribeType implements the subscriber interface. +func (c *subscriberFuncCore) subscribeType() reflect.Type { return c.typ } + +// dispatch implements the subscriber interface by invoking the +// per-T dispatch closure that was captured at construction time. +func (c *subscriberFuncCore) dispatch( + ctx context.Context, + vals *queue[DeliveredEvent], + acceptCh func() chan DeliveredEvent, + snapshot chan chan []DeliveredEvent, +) bool { + return c.dispatchFn(ctx, vals, acceptCh, snapshot) } // dispatchFuncState is the non-generic state needed by dispatchFunc.