diff --git a/util/eventbus/client.go b/util/eventbus/client.go index 0168acdd4..54a841b27 100644 --- a/util/eventbus/client.go +++ b/util/eventbus/client.go @@ -146,7 +146,10 @@ func Subscribe[T any](c *Client) *Subscriber[T] { r := c.subscribeStateLocked() s := newSubscriber[T](r, logfForCaller(c.logger())) - r.addSubscriber(s) + // Register the non-generic core with the bus rather than the typed facade, + // mirroring SubscribeFunc and Publish: this keeps the bus's outputs map + // and subscriber-interface itab out of per-T cost. + r.addSubscriber(s.core) return s } diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index ef380b7ac..3c3dced1f 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -184,49 +184,62 @@ func (s *subscribeState) closed() <-chan struct{} { // A Subscriber delivers one type of event from a [Client]. // Events are sent to the [Subscriber.Events] channel. type Subscriber[T any] struct { - stop stopFlag - read chan T - unregister func() - logf logger.Logf - slow *time.Timer // used to detect slow subscriber service + // core holds the non-generic subscriber-interface implementation + // (Close, subscribeType, dispatch, slow timer, unregister) shared + // with [SubscriberFunc] via [subscriberCore]. The only per-T state + // owned by the facade itself is the typed delivery channel; the + // dispatch loop, unlike SubscriberFunc, must remain per-T — see + // [Subscriber.dispatchTyped]. + core *subscriberCore + read chan T } func newSubscriber[T any](r *subscribeState, logf logger.Logf) *Subscriber[T] { - slow := time.NewTimer(0) - slow.Stop() // reset in dispatch - return &Subscriber[T]{ - read: make(chan T), - unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) }, - logf: logf, - slow: slow, + core := newSubscriberCore(r, logf, reflect.TypeFor[T]()) + s := &Subscriber[T]{ + core: core, + read: make(chan T), } -} - -func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] { - ret := &Subscriber[T]{ - read: make(chan T, 100), // arbitrary, large + // Subscriber[T] keeps a per-T dispatch loop; see [Subscriber.dispatchTyped] + // for why we don't share the non-generic dispatchFunc that SubscriberFunc + // uses. + core.dispatchFn = func( + ctx context.Context, + vals *queue[DeliveredEvent], + acceptCh func() chan DeliveredEvent, + snapshot chan chan []DeliveredEvent, + ) bool { + return s.dispatchTyped(ctx, vals, acceptCh, snapshot) } - ret.unregister = attach(ret.monitor) - return ret + return s } -func (s *Subscriber[T]) subscribeType() reflect.Type { - return reflect.TypeFor[T]() -} - -func (s *Subscriber[T]) monitor(debugEvent T) { - select { - case s.read <- debugEvent: - case <-s.stop.Done(): - } -} - -func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool { +// dispatchTyped is the per-T dispatch loop for Subscriber[T]. It has to remain +// generic because the typed channel send `case s.read <- t:` must appear +// lexically inside the select. The rest of the cases match the non-generic +// dispatchFunc body to keep behavior aligned between Subscriber and +// SubscriberFunc. +// +// We don't share dispatchFunc (the way SubscriberFunc does) because bridging +// the typed channel send and the non-generic select would require running the +// send on its own goroutine on every event delivery. That bridge was measured +// at ~2.7x throughput regression on BenchmarkBasicThroughput, so we keep +// dispatchTyped generic and pay the per-shape stencil cost instead (measured +// at ~1,600 B body + ~1,100 B pclntab per shape on linux/amd64 tailscaled). +// Only the typed select lives in the per-shape stencil; the surrounding state +// (slow timer, log function, type name) is reached through the non-generic +// core. +func (s *Subscriber[T]) dispatchTyped( + ctx context.Context, + vals *queue[DeliveredEvent], + acceptCh func() chan DeliveredEvent, + snapshot chan chan []DeliveredEvent, +) bool { t := vals.Peek().Event.(T) start := time.Now() - s.slow.Reset(slowSubscriberTimeout) - defer s.slow.Stop() + s.core.slow.Reset(slowSubscriberTimeout) + defer s.core.slow.Stop() for { // Keep the cases in this select in sync with subscribeState.pump @@ -242,13 +255,35 @@ func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent return false case ch := <-snapshot: ch <- vals.Snapshot() - case <-s.slow.C: - s.logf("subscriber for %T is slow (%v elapsed)", t, time.Since(start)) - s.slow.Reset(slowSubscriberTimeout) + case <-s.core.slow.C: + s.core.logf("subscriber for %s is slow (%v elapsed)", s.core.typeName, time.Since(start)) + s.core.slow.Reset(slowSubscriberTimeout) } } } +func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] { + s := &Subscriber[T]{ + // Monitors don't go through the bus's dispatch path (they + // are attached directly to the debug hook), so they don't + // need a fully-initialized subscriberCore — only the typed + // delivery channel and an unregister callback. We give them + // a placeholder core so Close() and Done() work uniformly. + core: &subscriberCore{}, + read: make(chan T, 100), // arbitrary, large + } + cancel := attach(s.monitor) + s.core.unregister = func(reflect.Type) { cancel() } + return s +} + +func (s *Subscriber[T]) monitor(debugEvent T) { + select { + case s.read <- debugEvent: + case <-s.core.stop.Done(): + } +} + // Events returns a channel on which the subscriber's events are // delivered. func (s *Subscriber[T]) Events() <-chan T { @@ -258,7 +293,7 @@ func (s *Subscriber[T]) Events() <-chan T { // Done returns a channel that is closed when the subscriber is // closed. func (s *Subscriber[T]) Done() <-chan struct{} { - return s.stop.Done() + return s.core.stop.Done() } // Close closes the Subscriber, indicating the caller no longer wishes @@ -268,28 +303,26 @@ func (s *Subscriber[T]) Done() <-chan struct{} { // If the Bus from which the Subscriber was created is closed, // the Subscriber is implicitly closed and does not need to be closed // separately. -func (s *Subscriber[T]) Close() { - s.stop.Stop() // unblock receivers - s.unregister() -} +func (s *Subscriber[T]) Close() { s.core.Close() } // A SubscriberFunc delivers one type of event from a [Client]. // Events are forwarded synchronously to a function provided at construction. type SubscriberFunc[T any] struct { - // Implementation note: SubscriberFunc[T] is a thin facade over a - // non-generic *subscriberFuncCore. All of the behavior — the - // subscriber-interface implementation (Close, subscribeType, dispatch), the - // slow-subscriber timer, the type assertion, and the user callback - // invocation — lives on the core and is not instantiated per T. The only - // per-T cost is the small forwarding Close method below. - core *subscriberFuncCore + // core holds the non-generic subscriber-interface implementation shared + // with [Subscriber] via [subscriberCore]. The user callback is captured + // in the dispatchFn closure on the core, so SubscriberFunc[T] itself + // carries no per-T state beyond the core pointer; per-T cost is limited + // to the small forwarding Close method below. + core *subscriberCore } -// subscriberFuncCore is the non-generic implementation of a -// SubscriberFunc. It implements the package-private subscriber -// interface so that the bus (and the subscribeState map) can store -// it without per-T itabs or dictionaries. -type subscriberFuncCore struct { +// subscriberCore is the non-generic backing for both Subscriber[T] and +// SubscriberFunc[T]. It implements the package-private subscriber interface +// so that the bus (and the subscribeState map) can store it without per-T +// itabs or dictionaries. The per-T behavior (type assertion plus either typed +// channel send or user callback invocation) is encapsulated in the dispatchFn +// closure set up by the constructor of the typed facade. +type subscriberCore struct { stop stopFlag unregister func(reflect.Type) logf logger.Logf @@ -306,13 +339,13 @@ type subscriberFuncCore struct { // of T, so it doesn't contribute to per-T stencil cost. typeName string - // dispatchFn is the per-T dispatch closure. It performs the - // type assertion vals.Peek().Event.(T) and runs the user - // callback on the unboxed value. The closure body is - // non-generic; its only per-T contribution is the type - // assertion and the call through s.read(T), which sit inside - // a single small captured closure rather than across a full - // select-loop stencil per T. + // dispatchFn is the per-T dispatch closure. It performs the type + // assertion vals.Peek().Event.(T) and runs the typed delivery (either a + // user-callback invocation for SubscriberFunc[T] or a typed channel send + // for Subscriber[T]). The closure body is non-generic apart from those + // two T-bound operations; the bulk of the dispatch work happens in the + // non-generic dispatchFunc helper (used by SubscriberFunc) or in the + // Subscriber[T].dispatchTyped per-shape stencil. dispatchFn func( ctx context.Context, vals *queue[DeliveredEvent], @@ -322,12 +355,12 @@ type subscriberFuncCore struct { } func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *SubscriberFunc[T] { - core := newSubscriberFuncCore(r, logf, reflect.TypeFor[T]()) + core := newSubscriberCore(r, logf, reflect.TypeFor[T]()) // The dispatch closure is the only piece that intrinsically // needs T: it performs the type assertion on the head queue // value and forwards the unboxed value to the user callback. // All non-generic setup (timer, core allocation, unregister - // closure) lives in newSubscriberFuncCore so it isn't + // closure) lives in newSubscriberCore so it isn't // duplicated per T. core.dispatchFn = func( ctx context.Context, @@ -347,22 +380,21 @@ func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *S return &SubscriberFunc[T]{core: core} } -// newSubscriberFuncCore performs the non-generic portion of -// subscriber construction: timer setup, core struct allocation, -// and creation of the unregister closure that captures only the -// (non-generic) reflect.Type and *subscribeState. The caller fills -// in the per-T dispatchFn afterward. +// newSubscriberCore performs the non-generic portion of subscriber +// construction: timer setup, core struct allocation, and assignment of the +// unregister method-value. The caller fills in the per-T dispatchFn +// afterward. // -// Hoisting this out of newSubscriberFunc[T] eliminates the bulk of -// the constructor body's per-T stencil cost; the only T-typed -// instructions left in the generic constructor are the +// Hoisting this out of the typed constructors (newSubscriber[T] and +// newSubscriberFunc[T]) eliminates the bulk of their per-T stencil cost; the +// only T-typed instructions left in each generic constructor are the // reflect.TypeFor[T]() call (whose body is shared via the -// internal/abi.TypeFor[T] dictionary) and the construction of the -// dispatch closure itself. -func newSubscriberFuncCore(r *subscribeState, logf logger.Logf, typ reflect.Type) *subscriberFuncCore { +// internal/abi.TypeFor[T] dictionary) and the construction of the dispatch +// closure itself. +func newSubscriberCore(r *subscribeState, logf logger.Logf, typ reflect.Type) *subscriberCore { slow := time.NewTimer(0) slow.Stop() // reset in dispatch - core := &subscriberFuncCore{ + core := &subscriberCore{ logf: logf, slow: slow, typ: typ, @@ -380,19 +412,19 @@ func newSubscriberFuncCore(r *subscribeState, logf logger.Logf, typ reflect.Type // does not need to be closed separately. func (s *SubscriberFunc[T]) Close() { s.core.Close() } -// Close implements the subscriber interface and the user-facing -// (*SubscriberFunc[T]).Close. -func (c *subscriberFuncCore) Close() { +// Close implements the subscriber interface and the user-facing Close on +// both Subscriber[T] and SubscriberFunc[T]. +func (c *subscriberCore) Close() { c.stop.Stop() c.unregister(c.typ) } // subscribeType implements the subscriber interface. -func (c *subscriberFuncCore) subscribeType() reflect.Type { return c.typ } +func (c *subscriberCore) subscribeType() reflect.Type { return c.typ } // dispatch implements the subscriber interface by invoking the // per-T dispatch closure that was captured at construction time. -func (c *subscriberFuncCore) dispatch( +func (c *subscriberCore) dispatch( ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, @@ -412,7 +444,7 @@ func (c *subscriberFuncCore) dispatch( // callDone is closed by runFuncCallback when the user callback returns. func dispatchFunc( ctx context.Context, - core *subscriberFuncCore, + core *subscriberCore, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent,