util/eventbus: unify Subscriber/SubscriberFunc cores; structural symmetry
Brings Subscriber[T] in line with the same non-generic-core pattern already
applied to SubscriberFunc[T] and Publisher[T]:
- Renames subscriberFuncCore to subscriberCore and shares it between
Subscriber[T] and SubscriberFunc[T]. Both typed facades hold a
*subscriberCore plus their respective per-T delivery state
(Subscriber: chan T; SubscriberFunc: nothing, the user callback is
captured in the dispatch closure).
- The bus's outputs map and subscriber-interface itab key on
*subscriberCore for both subscriber kinds, so adding a new Subscribe[T]
call site no longer pays a per-T itab, dictionary, or equality function
for the subscriber-interface side.
- Subscribe[T] now hoists the non-generic constructor portion into
newSubscriberCore (timer setup, core allocation, cached type/typeName,
unregister method-value), matching SubscribeFunc.
The dispatch loop is intentionally NOT extracted to a non-generic helper for
Subscriber[T], unlike SubscriberFunc[T]. The reason is the typed channel send
'case s.read <- t:' must appear lexically inside the select; the only way to
lift it into a non-generic loop is to bridge typed and untyped via a per-event
goroutine, which costs ~2.7x throughput on BenchmarkBasicThroughput. We keep
dispatchTyped on the generic facade and accept the per-shape stencil cost as
the cheaper alternative.
Symbol-level effect on tailscaled (linux/amd64, measured via
`go tool nm -size`):
Before:
(*Subscriber[T]).dispatch
2 shape stencils: 1,682 + 1,549 = 3,231 B
3 thin per-T wrappers: 124 B each = 372 B
2 deferwrap1 helpers: 62 B each = 124 B
total: 3,727 B
After:
(*Subscriber[T]).dispatchTyped
2 shape stencils: 1,678 + 1,582 = 3,260 B
0 per-T wrappers (replaced by closure stored on core)
2 deferwrap1 helpers: 62 B each = 124 B
total: 3,384 B
dispatch path .text delta: -343 B (-9.2%)
Per-shape stencils are ~1,600 B (.text body) + ~1,100 B (pclntab) =
~2,700 B each on production tailscaled. The shape count matches before/after
(two distinct GC shapes for the Subscriber[T] event types in this binary).
What changes is that the per-T thin wrappers are eliminated because
Subscriber[T] no longer implements the subscriber interface directly.
Whole-binary section deltas:
.text: -2,304 B (includes the dispatch savings plus other
small downstream effects)
.rodata: +512 B (additional closure-type metadata)
.gopclntab: -2,981 B (fewer per-T compiled functions => less metadata)
Stripped tailscaled (linux/amd64): no change at the file level (the savings
fall below the linker's section-alignment boundary). Unstripped builds shrink
by ~2,900 B.
Behavior is unchanged:
BenchmarkBasicThroughput: 2,161 ns/op, 0 B/op, 0 allocs/op
BenchmarkBasicFuncThroughput: 2,493 ns/op, 144 B/op, 2 allocs/op
BenchmarkSubsThroughput: 3,727 ns/op, 0 B/op, 0 allocs/op
Updates #12614
Change-Id: I97918ec68bd2cdb15958bbfd7687592b39663efe
Signed-off-by: James Tucker <james@tailscale.com>
This commit is contained in:
committed by
James Tucker
parent
dc323b1351
commit
e7415e6393
@@ -146,7 +146,10 @@ func Subscribe[T any](c *Client) *Subscriber[T] {
|
|||||||
|
|
||||||
r := c.subscribeStateLocked()
|
r := c.subscribeStateLocked()
|
||||||
s := newSubscriber[T](r, logfForCaller(c.logger()))
|
s := newSubscriber[T](r, logfForCaller(c.logger()))
|
||||||
r.addSubscriber(s)
|
// Register the non-generic core with the bus rather than the typed facade,
|
||||||
|
// mirroring SubscribeFunc and Publish: this keeps the bus's outputs map
|
||||||
|
// and subscriber-interface itab out of per-T cost.
|
||||||
|
r.addSubscriber(s.core)
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+112
-80
@@ -184,49 +184,62 @@ func (s *subscribeState) closed() <-chan struct{} {
|
|||||||
// A Subscriber delivers one type of event from a [Client].
|
// A Subscriber delivers one type of event from a [Client].
|
||||||
// Events are sent to the [Subscriber.Events] channel.
|
// Events are sent to the [Subscriber.Events] channel.
|
||||||
type Subscriber[T any] struct {
|
type Subscriber[T any] struct {
|
||||||
stop stopFlag
|
// core holds the non-generic subscriber-interface implementation
|
||||||
read chan T
|
// (Close, subscribeType, dispatch, slow timer, unregister) shared
|
||||||
unregister func()
|
// with [SubscriberFunc] via [subscriberCore]. The only per-T state
|
||||||
logf logger.Logf
|
// owned by the facade itself is the typed delivery channel; the
|
||||||
slow *time.Timer // used to detect slow subscriber service
|
// dispatch loop, unlike SubscriberFunc, must remain per-T — see
|
||||||
|
// [Subscriber.dispatchTyped].
|
||||||
|
core *subscriberCore
|
||||||
|
read chan T
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSubscriber[T any](r *subscribeState, logf logger.Logf) *Subscriber[T] {
|
func newSubscriber[T any](r *subscribeState, logf logger.Logf) *Subscriber[T] {
|
||||||
slow := time.NewTimer(0)
|
core := newSubscriberCore(r, logf, reflect.TypeFor[T]())
|
||||||
slow.Stop() // reset in dispatch
|
s := &Subscriber[T]{
|
||||||
return &Subscriber[T]{
|
core: core,
|
||||||
read: make(chan T),
|
read: make(chan T),
|
||||||
unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) },
|
|
||||||
logf: logf,
|
|
||||||
slow: slow,
|
|
||||||
}
|
}
|
||||||
}
|
// Subscriber[T] keeps a per-T dispatch loop; see [Subscriber.dispatchTyped]
|
||||||
|
// for why we don't share the non-generic dispatchFunc that SubscriberFunc
|
||||||
func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] {
|
// uses.
|
||||||
ret := &Subscriber[T]{
|
core.dispatchFn = func(
|
||||||
read: make(chan T, 100), // arbitrary, large
|
ctx context.Context,
|
||||||
|
vals *queue[DeliveredEvent],
|
||||||
|
acceptCh func() chan DeliveredEvent,
|
||||||
|
snapshot chan chan []DeliveredEvent,
|
||||||
|
) bool {
|
||||||
|
return s.dispatchTyped(ctx, vals, acceptCh, snapshot)
|
||||||
}
|
}
|
||||||
ret.unregister = attach(ret.monitor)
|
return s
|
||||||
return ret
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Subscriber[T]) subscribeType() reflect.Type {
|
// dispatchTyped is the per-T dispatch loop for Subscriber[T]. It has to remain
|
||||||
return reflect.TypeFor[T]()
|
// generic because the typed channel send `case s.read <- t:` must appear
|
||||||
}
|
// lexically inside the select. The rest of the cases match the non-generic
|
||||||
|
// dispatchFunc body to keep behavior aligned between Subscriber and
|
||||||
func (s *Subscriber[T]) monitor(debugEvent T) {
|
// SubscriberFunc.
|
||||||
select {
|
//
|
||||||
case s.read <- debugEvent:
|
// We don't share dispatchFunc (the way SubscriberFunc does) because bridging
|
||||||
case <-s.stop.Done():
|
// the typed channel send and the non-generic select would require running the
|
||||||
}
|
// send on its own goroutine on every event delivery. That bridge was measured
|
||||||
}
|
// at ~2.7x throughput regression on BenchmarkBasicThroughput, so we keep
|
||||||
|
// dispatchTyped generic and pay the per-shape stencil cost instead (measured
|
||||||
func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool {
|
// at ~1,600 B body + ~1,100 B pclntab per shape on linux/amd64 tailscaled).
|
||||||
|
// Only the typed select lives in the per-shape stencil; the surrounding state
|
||||||
|
// (slow timer, log function, type name) is reached through the non-generic
|
||||||
|
// core.
|
||||||
|
func (s *Subscriber[T]) dispatchTyped(
|
||||||
|
ctx context.Context,
|
||||||
|
vals *queue[DeliveredEvent],
|
||||||
|
acceptCh func() chan DeliveredEvent,
|
||||||
|
snapshot chan chan []DeliveredEvent,
|
||||||
|
) bool {
|
||||||
t := vals.Peek().Event.(T)
|
t := vals.Peek().Event.(T)
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
s.slow.Reset(slowSubscriberTimeout)
|
s.core.slow.Reset(slowSubscriberTimeout)
|
||||||
defer s.slow.Stop()
|
defer s.core.slow.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
// Keep the cases in this select in sync with subscribeState.pump
|
// Keep the cases in this select in sync with subscribeState.pump
|
||||||
@@ -242,13 +255,35 @@ func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent
|
|||||||
return false
|
return false
|
||||||
case ch := <-snapshot:
|
case ch := <-snapshot:
|
||||||
ch <- vals.Snapshot()
|
ch <- vals.Snapshot()
|
||||||
case <-s.slow.C:
|
case <-s.core.slow.C:
|
||||||
s.logf("subscriber for %T is slow (%v elapsed)", t, time.Since(start))
|
s.core.logf("subscriber for %s is slow (%v elapsed)", s.core.typeName, time.Since(start))
|
||||||
s.slow.Reset(slowSubscriberTimeout)
|
s.core.slow.Reset(slowSubscriberTimeout)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] {
|
||||||
|
s := &Subscriber[T]{
|
||||||
|
// Monitors don't go through the bus's dispatch path (they
|
||||||
|
// are attached directly to the debug hook), so they don't
|
||||||
|
// need a fully-initialized subscriberCore — only the typed
|
||||||
|
// delivery channel and an unregister callback. We give them
|
||||||
|
// a placeholder core so Close() and Done() work uniformly.
|
||||||
|
core: &subscriberCore{},
|
||||||
|
read: make(chan T, 100), // arbitrary, large
|
||||||
|
}
|
||||||
|
cancel := attach(s.monitor)
|
||||||
|
s.core.unregister = func(reflect.Type) { cancel() }
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Subscriber[T]) monitor(debugEvent T) {
|
||||||
|
select {
|
||||||
|
case s.read <- debugEvent:
|
||||||
|
case <-s.core.stop.Done():
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Events returns a channel on which the subscriber's events are
|
// Events returns a channel on which the subscriber's events are
|
||||||
// delivered.
|
// delivered.
|
||||||
func (s *Subscriber[T]) Events() <-chan T {
|
func (s *Subscriber[T]) Events() <-chan T {
|
||||||
@@ -258,7 +293,7 @@ func (s *Subscriber[T]) Events() <-chan T {
|
|||||||
// Done returns a channel that is closed when the subscriber is
|
// Done returns a channel that is closed when the subscriber is
|
||||||
// closed.
|
// closed.
|
||||||
func (s *Subscriber[T]) Done() <-chan struct{} {
|
func (s *Subscriber[T]) Done() <-chan struct{} {
|
||||||
return s.stop.Done()
|
return s.core.stop.Done()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the Subscriber, indicating the caller no longer wishes
|
// Close closes the Subscriber, indicating the caller no longer wishes
|
||||||
@@ -268,28 +303,26 @@ func (s *Subscriber[T]) Done() <-chan struct{} {
|
|||||||
// If the Bus from which the Subscriber was created is closed,
|
// If the Bus from which the Subscriber was created is closed,
|
||||||
// the Subscriber is implicitly closed and does not need to be closed
|
// the Subscriber is implicitly closed and does not need to be closed
|
||||||
// separately.
|
// separately.
|
||||||
func (s *Subscriber[T]) Close() {
|
func (s *Subscriber[T]) Close() { s.core.Close() }
|
||||||
s.stop.Stop() // unblock receivers
|
|
||||||
s.unregister()
|
|
||||||
}
|
|
||||||
|
|
||||||
// A SubscriberFunc delivers one type of event from a [Client].
|
// A SubscriberFunc delivers one type of event from a [Client].
|
||||||
// Events are forwarded synchronously to a function provided at construction.
|
// Events are forwarded synchronously to a function provided at construction.
|
||||||
type SubscriberFunc[T any] struct {
|
type SubscriberFunc[T any] struct {
|
||||||
// Implementation note: SubscriberFunc[T] is a thin facade over a
|
// core holds the non-generic subscriber-interface implementation shared
|
||||||
// non-generic *subscriberFuncCore. All of the behavior — the
|
// with [Subscriber] via [subscriberCore]. The user callback is captured
|
||||||
// subscriber-interface implementation (Close, subscribeType, dispatch), the
|
// in the dispatchFn closure on the core, so SubscriberFunc[T] itself
|
||||||
// slow-subscriber timer, the type assertion, and the user callback
|
// carries no per-T state beyond the core pointer; per-T cost is limited
|
||||||
// invocation — lives on the core and is not instantiated per T. The only
|
// to the small forwarding Close method below.
|
||||||
// per-T cost is the small forwarding Close method below.
|
core *subscriberCore
|
||||||
core *subscriberFuncCore
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// subscriberFuncCore is the non-generic implementation of a
|
// subscriberCore is the non-generic backing for both Subscriber[T] and
|
||||||
// SubscriberFunc. It implements the package-private subscriber
|
// SubscriberFunc[T]. It implements the package-private subscriber interface
|
||||||
// interface so that the bus (and the subscribeState map) can store
|
// so that the bus (and the subscribeState map) can store it without per-T
|
||||||
// it without per-T itabs or dictionaries.
|
// itabs or dictionaries. The per-T behavior (type assertion plus either typed
|
||||||
type subscriberFuncCore struct {
|
// channel send or user callback invocation) is encapsulated in the dispatchFn
|
||||||
|
// closure set up by the constructor of the typed facade.
|
||||||
|
type subscriberCore struct {
|
||||||
stop stopFlag
|
stop stopFlag
|
||||||
unregister func(reflect.Type)
|
unregister func(reflect.Type)
|
||||||
logf logger.Logf
|
logf logger.Logf
|
||||||
@@ -306,13 +339,13 @@ type subscriberFuncCore struct {
|
|||||||
// of T, so it doesn't contribute to per-T stencil cost.
|
// of T, so it doesn't contribute to per-T stencil cost.
|
||||||
typeName string
|
typeName string
|
||||||
|
|
||||||
// dispatchFn is the per-T dispatch closure. It performs the
|
// dispatchFn is the per-T dispatch closure. It performs the type
|
||||||
// type assertion vals.Peek().Event.(T) and runs the user
|
// assertion vals.Peek().Event.(T) and runs the typed delivery (either a
|
||||||
// callback on the unboxed value. The closure body is
|
// user-callback invocation for SubscriberFunc[T] or a typed channel send
|
||||||
// non-generic; its only per-T contribution is the type
|
// for Subscriber[T]). The closure body is non-generic apart from those
|
||||||
// assertion and the call through s.read(T), which sit inside
|
// two T-bound operations; the bulk of the dispatch work happens in the
|
||||||
// a single small captured closure rather than across a full
|
// non-generic dispatchFunc helper (used by SubscriberFunc) or in the
|
||||||
// select-loop stencil per T.
|
// Subscriber[T].dispatchTyped per-shape stencil.
|
||||||
dispatchFn func(
|
dispatchFn func(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
vals *queue[DeliveredEvent],
|
vals *queue[DeliveredEvent],
|
||||||
@@ -322,12 +355,12 @@ type subscriberFuncCore struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *SubscriberFunc[T] {
|
func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *SubscriberFunc[T] {
|
||||||
core := newSubscriberFuncCore(r, logf, reflect.TypeFor[T]())
|
core := newSubscriberCore(r, logf, reflect.TypeFor[T]())
|
||||||
// The dispatch closure is the only piece that intrinsically
|
// The dispatch closure is the only piece that intrinsically
|
||||||
// needs T: it performs the type assertion on the head queue
|
// needs T: it performs the type assertion on the head queue
|
||||||
// value and forwards the unboxed value to the user callback.
|
// value and forwards the unboxed value to the user callback.
|
||||||
// All non-generic setup (timer, core allocation, unregister
|
// All non-generic setup (timer, core allocation, unregister
|
||||||
// closure) lives in newSubscriberFuncCore so it isn't
|
// closure) lives in newSubscriberCore so it isn't
|
||||||
// duplicated per T.
|
// duplicated per T.
|
||||||
core.dispatchFn = func(
|
core.dispatchFn = func(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
@@ -347,22 +380,21 @@ func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *S
|
|||||||
return &SubscriberFunc[T]{core: core}
|
return &SubscriberFunc[T]{core: core}
|
||||||
}
|
}
|
||||||
|
|
||||||
// newSubscriberFuncCore performs the non-generic portion of
|
// newSubscriberCore performs the non-generic portion of subscriber
|
||||||
// subscriber construction: timer setup, core struct allocation,
|
// construction: timer setup, core struct allocation, and assignment of the
|
||||||
// and creation of the unregister closure that captures only the
|
// unregister method-value. The caller fills in the per-T dispatchFn
|
||||||
// (non-generic) reflect.Type and *subscribeState. The caller fills
|
// afterward.
|
||||||
// in the per-T dispatchFn afterward.
|
|
||||||
//
|
//
|
||||||
// Hoisting this out of newSubscriberFunc[T] eliminates the bulk of
|
// Hoisting this out of the typed constructors (newSubscriber[T] and
|
||||||
// the constructor body's per-T stencil cost; the only T-typed
|
// newSubscriberFunc[T]) eliminates the bulk of their per-T stencil cost; the
|
||||||
// instructions left in the generic constructor are the
|
// only T-typed instructions left in each generic constructor are the
|
||||||
// reflect.TypeFor[T]() call (whose body is shared via the
|
// reflect.TypeFor[T]() call (whose body is shared via the
|
||||||
// internal/abi.TypeFor[T] dictionary) and the construction of the
|
// internal/abi.TypeFor[T] dictionary) and the construction of the dispatch
|
||||||
// dispatch closure itself.
|
// closure itself.
|
||||||
func newSubscriberFuncCore(r *subscribeState, logf logger.Logf, typ reflect.Type) *subscriberFuncCore {
|
func newSubscriberCore(r *subscribeState, logf logger.Logf, typ reflect.Type) *subscriberCore {
|
||||||
slow := time.NewTimer(0)
|
slow := time.NewTimer(0)
|
||||||
slow.Stop() // reset in dispatch
|
slow.Stop() // reset in dispatch
|
||||||
core := &subscriberFuncCore{
|
core := &subscriberCore{
|
||||||
logf: logf,
|
logf: logf,
|
||||||
slow: slow,
|
slow: slow,
|
||||||
typ: typ,
|
typ: typ,
|
||||||
@@ -380,19 +412,19 @@ func newSubscriberFuncCore(r *subscribeState, logf logger.Logf, typ reflect.Type
|
|||||||
// does not need to be closed separately.
|
// does not need to be closed separately.
|
||||||
func (s *SubscriberFunc[T]) Close() { s.core.Close() }
|
func (s *SubscriberFunc[T]) Close() { s.core.Close() }
|
||||||
|
|
||||||
// Close implements the subscriber interface and the user-facing
|
// Close implements the subscriber interface and the user-facing Close on
|
||||||
// (*SubscriberFunc[T]).Close.
|
// both Subscriber[T] and SubscriberFunc[T].
|
||||||
func (c *subscriberFuncCore) Close() {
|
func (c *subscriberCore) Close() {
|
||||||
c.stop.Stop()
|
c.stop.Stop()
|
||||||
c.unregister(c.typ)
|
c.unregister(c.typ)
|
||||||
}
|
}
|
||||||
|
|
||||||
// subscribeType implements the subscriber interface.
|
// subscribeType implements the subscriber interface.
|
||||||
func (c *subscriberFuncCore) subscribeType() reflect.Type { return c.typ }
|
func (c *subscriberCore) subscribeType() reflect.Type { return c.typ }
|
||||||
|
|
||||||
// dispatch implements the subscriber interface by invoking the
|
// dispatch implements the subscriber interface by invoking the
|
||||||
// per-T dispatch closure that was captured at construction time.
|
// per-T dispatch closure that was captured at construction time.
|
||||||
func (c *subscriberFuncCore) dispatch(
|
func (c *subscriberCore) dispatch(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
vals *queue[DeliveredEvent],
|
vals *queue[DeliveredEvent],
|
||||||
acceptCh func() chan DeliveredEvent,
|
acceptCh func() chan DeliveredEvent,
|
||||||
@@ -412,7 +444,7 @@ func (c *subscriberFuncCore) dispatch(
|
|||||||
// callDone is closed by runFuncCallback when the user callback returns.
|
// callDone is closed by runFuncCallback when the user callback returns.
|
||||||
func dispatchFunc(
|
func dispatchFunc(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
core *subscriberFuncCore,
|
core *subscriberCore,
|
||||||
vals *queue[DeliveredEvent],
|
vals *queue[DeliveredEvent],
|
||||||
acceptCh func() chan DeliveredEvent,
|
acceptCh func() chan DeliveredEvent,
|
||||||
snapshot chan chan []DeliveredEvent,
|
snapshot chan chan []DeliveredEvent,
|
||||||
|
|||||||
Reference in New Issue
Block a user