util/eventbus: move SubscriberFunc subscriber-interface impl to a non-generic core
Splits SubscriberFunc[T] into:
- SubscriberFunc[T]: a thin user-facing facade that holds only a
pointer to a non-generic core. It exposes Close() to user code,
which forwards to the core.
- subscriberFuncCore: a non-generic struct that owns all the
subscriber state (stop flag, unregister, logf, slow timer,
cached reflect.Type) and implements the bus's package-private
subscriber interface. Its dispatch() invokes a closure
captured at construction time that performs the
vals.Peek().Event.(T) type assertion and runs the user
callback on the unboxed value.
The bus's outputs map and subscriber-interface itab are
parameterized only by *subscriberFuncCore, not by T, eliminating
both the per-T itab and the per-T generic dictionary that
previously scaled with the number of subscribed event types.
Measured impact (util/eventbus/sizetest):
total per-flow binary cost:
linux/amd64: 3039.2 B/flow -> 2252.8 B/flow (-786.4 B / -25.9%)
linux/arm64: 3145.7 B/flow -> 2228.2 B/flow (-917.5 B / -29.2%)
SubscriberFunc per-receiver attribution:
linux/amd64: 840.8 B/flow -> 300.8 B/flow (-540.0 B / -64.2%)
linux/arm64: 849.9 B/flow -> 303.8 B/flow (-546.1 B / -64.3%)
Dropped per-T symbols (200-flow eventbus binary):
- (*SubscriberFunc[T]).dispatch was 26,639 B total (130 B/T)
- (*SubscriberFunc[T]).subscribeType was 3,600 B total ( 18 B/T)
- .dict.SubscriberFunc[T] was 14,400 B total ( 72 B/T)
- go:itab.*SubscriberFunc[T],... was 9,600 B total ( 48 B/T)
Of the original 913 B/flow attributed to SubscriberFunc, 540 B/flow
is now gone, dropping the receiver to 300 B/flow.
Behavior is unchanged: BenchmarkBasicThroughput is within noise
(1955 -> 1941 ns/op on the test box) and all eventbus tests pass.
Updates #12614
Change-Id: I646b3b05fd8d95f9afead59bfd0f69cd18b7a709
Signed-off-by: James Tucker <james@tailscale.com>
This commit is contained in:
committed by
James Tucker
parent
ead5ce65a3
commit
d72cde1a6b
@@ -169,7 +169,11 @@ func SubscribeFunc[T any](c *Client, f func(T)) *SubscriberFunc[T] {
|
||||
|
||||
r := c.subscribeStateLocked()
|
||||
s := newSubscriberFunc[T](r, f, logfForCaller(c.logger()))
|
||||
r.addSubscriber(s)
|
||||
// Register the non-generic core, not the typed facade. Doing
|
||||
// so means the bus's outputs map and the subscriber interface
|
||||
// itab are not parameterized by T, eliminating per-T itab and
|
||||
// dictionary cost.
|
||||
r.addSubscriber(s.core)
|
||||
return s
|
||||
}
|
||||
|
||||
|
||||
+77
-36
@@ -276,22 +276,74 @@ func (s *Subscriber[T]) Close() {
|
||||
// A SubscriberFunc delivers one type of event from a [Client].
|
||||
// Events are forwarded synchronously to a function provided at construction.
|
||||
type SubscriberFunc[T any] struct {
|
||||
// Implementation note: SubscriberFunc[T] is a thin facade over a
|
||||
// non-generic *subscriberFuncCore. All of the behavior — the
|
||||
// subscriber-interface implementation (Close, subscribeType, dispatch), the
|
||||
// slow-subscriber timer, the type assertion, and the user callback
|
||||
// invocation — lives on the core and is not instantiated per T. The only
|
||||
// per-T cost is the small forwarding Close method below.
|
||||
core *subscriberFuncCore
|
||||
}
|
||||
|
||||
// subscriberFuncCore is the non-generic implementation of a
|
||||
// SubscriberFunc. It implements the package-private subscriber
|
||||
// interface so that the bus (and the subscribeState map) can store
|
||||
// it without per-T itabs or dictionaries.
|
||||
type subscriberFuncCore struct {
|
||||
stop stopFlag
|
||||
read func(T)
|
||||
unregister func()
|
||||
logf logger.Logf
|
||||
slow *time.Timer // used to detect slow subscriber service
|
||||
|
||||
// typ is the cached reflect.Type of T. Returned by
|
||||
// subscribeType() and used by the dispatch closure to format
|
||||
// slow-subscriber log messages.
|
||||
typ reflect.Type
|
||||
|
||||
// dispatchFn is the per-T dispatch closure. It performs the
|
||||
// type assertion vals.Peek().Event.(T) and runs the user
|
||||
// callback on the unboxed value. The closure body is
|
||||
// non-generic; its only per-T contribution is the type
|
||||
// assertion and the call through s.read(T), which sit inside
|
||||
// a single small captured closure rather than across a full
|
||||
// select-loop stencil per T.
|
||||
dispatchFn func(
|
||||
ctx context.Context,
|
||||
vals *queue[DeliveredEvent],
|
||||
acceptCh func() chan DeliveredEvent,
|
||||
snapshot chan chan []DeliveredEvent,
|
||||
) bool
|
||||
}
|
||||
|
||||
func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *SubscriberFunc[T] {
|
||||
slow := time.NewTimer(0)
|
||||
slow.Stop() // reset in dispatch
|
||||
return &SubscriberFunc[T]{
|
||||
read: f,
|
||||
unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) },
|
||||
logf: logf,
|
||||
slow: slow,
|
||||
core := &subscriberFuncCore{
|
||||
logf: logf,
|
||||
slow: slow,
|
||||
typ: reflect.TypeFor[T](),
|
||||
}
|
||||
core.unregister = func() { r.deleteSubscriber(core.typ) }
|
||||
core.dispatchFn = func(
|
||||
ctx context.Context,
|
||||
vals *queue[DeliveredEvent],
|
||||
acceptCh func() chan DeliveredEvent,
|
||||
snapshot chan chan []DeliveredEvent,
|
||||
) bool {
|
||||
t := vals.Peek().Event.(T)
|
||||
callDone := make(chan struct{})
|
||||
// `go runFuncCallback(f, t, callDone)` binds its arguments
|
||||
// directly to the new goroutine's frame; using a closure
|
||||
// (`go func() { f(t) }()`) would allocate a closure on the
|
||||
// heap on every dispatched event.
|
||||
go runFuncCallback(f, t, callDone)
|
||||
return dispatchFunc(ctx, dispatchFuncState{
|
||||
slow: core.slow,
|
||||
logf: core.logf,
|
||||
typeName: core.typ.String(),
|
||||
}, vals, acceptCh, snapshot, callDone)
|
||||
}
|
||||
return &SubscriberFunc[T]{core: core}
|
||||
}
|
||||
|
||||
// Close closes the SubscriberFunc, indicating the caller no longer wishes to
|
||||
@@ -300,38 +352,27 @@ func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *S
|
||||
//
|
||||
// If the [Bus] from which s was created is closed, s is implicitly closed and
|
||||
// does not need to be closed separately.
|
||||
func (s *SubscriberFunc[T]) Close() { s.stop.Stop(); s.unregister() }
|
||||
func (s *SubscriberFunc[T]) Close() { s.core.Close() }
|
||||
|
||||
// subscribeType implements part of the subscriber interface.
|
||||
func (s *SubscriberFunc[T]) subscribeType() reflect.Type { return reflect.TypeFor[T]() }
|
||||
// Close implements the subscriber interface and the user-facing
|
||||
// (*SubscriberFunc[T]).Close.
|
||||
func (c *subscriberFuncCore) Close() {
|
||||
c.stop.Stop()
|
||||
c.unregister()
|
||||
}
|
||||
|
||||
// dispatch implements part of the subscriber interface.
|
||||
//
|
||||
// We deliberately keep this method body small and delegate the
|
||||
// dispatch loop to dispatchFunc, a non-generic helper. Each
|
||||
// instantiation of SubscriberFunc[T] otherwise produces a fresh
|
||||
// stencil of the full ~40-line select loop (including the slow-
|
||||
// subscriber timer, snapshot handling, and CI stack-dump branch),
|
||||
// which is responsible for hundreds of bytes of binary size per
|
||||
// distinct T. By isolating the per-T work to the type assertion
|
||||
// and the callback closure, only a small fixed-size wrapper is
|
||||
// emitted per T.
|
||||
func (s *SubscriberFunc[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool {
|
||||
t := vals.Peek().Event.(T)
|
||||
callDone := make(chan struct{})
|
||||
// Launch the user callback on a goroutine via a generic worker
|
||||
// rather than via a closure (`go func() { s.read(t) }()` would
|
||||
// allocate the closure object on the heap on every dispatched
|
||||
// event). `go runFuncCallback(s.read, t, callDone)` binds its
|
||||
// arguments directly to the goroutine's frame, leaving allocation
|
||||
// behavior identical to the original (*SubscriberFunc[T]).runCallback
|
||||
// method.
|
||||
go runFuncCallback(s.read, t, callDone)
|
||||
return dispatchFunc(ctx, dispatchFuncState{
|
||||
slow: s.slow,
|
||||
logf: s.logf,
|
||||
typeName: reflect.TypeFor[T]().String(),
|
||||
}, vals, acceptCh, snapshot, callDone)
|
||||
// subscribeType implements the subscriber interface.
|
||||
func (c *subscriberFuncCore) subscribeType() reflect.Type { return c.typ }
|
||||
|
||||
// dispatch implements the subscriber interface by invoking the
|
||||
// per-T dispatch closure that was captured at construction time.
|
||||
func (c *subscriberFuncCore) dispatch(
|
||||
ctx context.Context,
|
||||
vals *queue[DeliveredEvent],
|
||||
acceptCh func() chan DeliveredEvent,
|
||||
snapshot chan chan []DeliveredEvent,
|
||||
) bool {
|
||||
return c.dispatchFn(ctx, vals, acceptCh, snapshot)
|
||||
}
|
||||
|
||||
// dispatchFuncState is the non-generic state needed by dispatchFunc.
|
||||
|
||||
Reference in New Issue
Block a user