e7415e6393
Brings Subscriber[T] in line with the same non-generic-core pattern already
applied to SubscriberFunc[T] and Publisher[T]:
- Renames subscriberFuncCore to subscriberCore and shares it between
Subscriber[T] and SubscriberFunc[T]. Both typed facades hold a
*subscriberCore plus their respective per-T delivery state
(Subscriber: chan T; SubscriberFunc: nothing, the user callback is
captured in the dispatch closure).
- The bus's outputs map and subscriber-interface itab key on
*subscriberCore for both subscriber kinds, so adding a new Subscribe[T]
call site no longer pays a per-T itab, dictionary, or equality function
for the subscriber-interface side.
- Subscribe[T] now hoists the non-generic constructor portion into
newSubscriberCore (timer setup, core allocation, cached type/typeName,
unregister method-value), matching SubscribeFunc.
The dispatch loop is intentionally NOT extracted to a non-generic helper for
Subscriber[T], unlike SubscriberFunc[T]. The reason is the typed channel send
'case s.read <- t:' must appear lexically inside the select; the only way to
lift it into a non-generic loop is to bridge typed and untyped via a per-event
goroutine, which costs ~2.7x throughput on BenchmarkBasicThroughput. We keep
dispatchTyped on the generic facade and accept the per-shape stencil cost as
the cheaper alternative.
Symbol-level effect on tailscaled (linux/amd64, measured via
`go tool nm -size`):
Before:
(*Subscriber[T]).dispatch
2 shape stencils: 1,682 + 1,549 = 3,231 B
3 thin per-T wrappers: 124 B each = 372 B
2 deferwrap1 helpers: 62 B each = 124 B
total: 3,727 B
After:
(*Subscriber[T]).dispatchTyped
2 shape stencils: 1,678 + 1,582 = 3,260 B
0 per-T wrappers (replaced by closure stored on core)
2 deferwrap1 helpers: 62 B each = 124 B
total: 3,384 B
dispatch path .text delta: -343 B (-9.2%)
Per-shape stencils are ~1,600 B (.text body) + ~1,100 B (pclntab) =
~2,700 B each on production tailscaled. The shape count matches before/after
(two distinct GC shapes for the Subscriber[T] event types in this binary).
What changes is that the per-T thin wrappers are eliminated because
Subscriber[T] no longer implements the subscriber interface directly.
Whole-binary section deltas:
.text: -2,304 B (includes the dispatch savings plus other
small downstream effects)
.rodata: +512 B (additional closure-type metadata)
.gopclntab: -2,981 B (fewer per-T compiled functions => less metadata)
Stripped tailscaled (linux/amd64): no change at the file level (the savings
fall below the linker's section-alignment boundary). Unstripped builds shrink
by ~2,900 B.
Behavior is unchanged:
BenchmarkBasicThroughput: 2,161 ns/op, 0 B/op, 0 allocs/op
BenchmarkBasicFuncThroughput: 2,493 ns/op, 144 B/op, 2 allocs/op
BenchmarkSubsThroughput: 3,727 ns/op, 0 B/op, 0 allocs/op
Updates #12614
Change-Id: I97918ec68bd2cdb15958bbfd7687592b39663efe
Signed-off-by: James Tucker <james@tailscale.com>
502 lines
16 KiB
Go
502 lines
16 KiB
Go
// Copyright (c) Tailscale Inc & contributors
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
package eventbus
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"reflect"
|
|
"runtime"
|
|
"time"
|
|
|
|
"tailscale.com/syncs"
|
|
"tailscale.com/types/logger"
|
|
"tailscale.com/util/cibuild"
|
|
)
|
|
|
|
type DeliveredEvent struct {
|
|
Event any
|
|
From *Client
|
|
To *Client
|
|
}
|
|
|
|
// subscriber is a uniformly typed wrapper around Subscriber[T], so
|
|
// that debugging facilities can look at active subscribers.
|
|
type subscriber interface {
|
|
subscribeType() reflect.Type
|
|
// dispatch is a function that dispatches the head value in vals to
|
|
// a subscriber, while also handling stop and incoming queue write
|
|
// events.
|
|
//
|
|
// dispatch exists because of the strongly typed Subscriber[T]
|
|
// wrapper around subscriptions: within the bus events are boxed in an
|
|
// 'any', and need to be unpacked to their full type before delivery
|
|
// to the subscriber. This involves writing to a strongly-typed
|
|
// channel, so subscribeState cannot handle that dispatch by itself -
|
|
// but if that strongly typed send blocks, we also need to keep
|
|
// processing other potential sources of wakeups, which is how we end
|
|
// up at this awkward type signature and sharing of internal state
|
|
// through dispatch.
|
|
dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool
|
|
Close()
|
|
}
|
|
|
|
// subscribeState handles dispatching of events received from a Bus.
|
|
type subscribeState struct {
|
|
client *Client
|
|
|
|
dispatcher *worker
|
|
write chan DeliveredEvent
|
|
snapshot chan chan []DeliveredEvent
|
|
debug hook[DeliveredEvent]
|
|
|
|
outputsMu syncs.Mutex
|
|
outputs map[reflect.Type]subscriber
|
|
}
|
|
|
|
func newSubscribeState(c *Client) *subscribeState {
|
|
ret := &subscribeState{
|
|
client: c,
|
|
write: make(chan DeliveredEvent),
|
|
snapshot: make(chan chan []DeliveredEvent),
|
|
outputs: map[reflect.Type]subscriber{},
|
|
}
|
|
ret.dispatcher = runWorker(ret.pump)
|
|
return ret
|
|
}
|
|
|
|
func (s *subscribeState) pump(ctx context.Context) {
|
|
var vals queue[DeliveredEvent]
|
|
acceptCh := func() chan DeliveredEvent {
|
|
if vals.Full() {
|
|
return nil
|
|
}
|
|
return s.write
|
|
}
|
|
for {
|
|
if !vals.Empty() {
|
|
val := vals.Peek()
|
|
sub := s.subscriberFor(val.Event)
|
|
if sub == nil {
|
|
// Raced with unsubscribe.
|
|
vals.Drop()
|
|
continue
|
|
}
|
|
if !sub.dispatch(ctx, &vals, acceptCh, s.snapshot) {
|
|
return
|
|
}
|
|
|
|
if s.debug.active() {
|
|
s.debug.run(DeliveredEvent{
|
|
Event: val.Event,
|
|
From: val.From,
|
|
To: s.client,
|
|
})
|
|
}
|
|
} else {
|
|
// Keep the cases in this select in sync with
|
|
// Subscriber.dispatch and SubscriberFunc.dispatch below.
|
|
// The only difference should be that this select doesn't deliver
|
|
// queued values to anyone, and unconditionally accepts new values.
|
|
select {
|
|
case val := <-s.write:
|
|
vals.Add(val)
|
|
case <-ctx.Done():
|
|
return
|
|
case ch := <-s.snapshot:
|
|
ch <- vals.Snapshot()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *subscribeState) snapshotQueue() []DeliveredEvent {
|
|
if s == nil {
|
|
return nil
|
|
}
|
|
|
|
resp := make(chan []DeliveredEvent)
|
|
select {
|
|
case s.snapshot <- resp:
|
|
return <-resp
|
|
case <-s.dispatcher.Done():
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (s *subscribeState) subscribeTypes() []reflect.Type {
|
|
if s == nil {
|
|
return nil
|
|
}
|
|
|
|
s.outputsMu.Lock()
|
|
defer s.outputsMu.Unlock()
|
|
ret := make([]reflect.Type, 0, len(s.outputs))
|
|
for t := range s.outputs {
|
|
ret = append(ret, t)
|
|
}
|
|
return ret
|
|
}
|
|
|
|
func (s *subscribeState) addSubscriber(sub subscriber) {
|
|
s.outputsMu.Lock()
|
|
defer s.outputsMu.Unlock()
|
|
t := sub.subscribeType()
|
|
if s.outputs[t] != nil {
|
|
panic(fmt.Errorf("double subscription for event %s", t))
|
|
}
|
|
s.outputs[t] = sub
|
|
s.client.addSubscriber(t, s)
|
|
}
|
|
|
|
func (s *subscribeState) deleteSubscriber(t reflect.Type) {
|
|
s.outputsMu.Lock()
|
|
defer s.outputsMu.Unlock()
|
|
delete(s.outputs, t)
|
|
s.client.deleteSubscriber(t, s)
|
|
}
|
|
|
|
func (s *subscribeState) subscriberFor(val any) subscriber {
|
|
s.outputsMu.Lock()
|
|
defer s.outputsMu.Unlock()
|
|
return s.outputs[reflect.TypeOf(val)]
|
|
}
|
|
|
|
// Close closes the subscribeState. It implicitly closes all Subscribers
|
|
// linked to this state, and any pending events are discarded.
|
|
func (s *subscribeState) close() {
|
|
s.dispatcher.StopAndWait()
|
|
|
|
var subs map[reflect.Type]subscriber
|
|
s.outputsMu.Lock()
|
|
subs, s.outputs = s.outputs, nil
|
|
s.outputsMu.Unlock()
|
|
for _, sub := range subs {
|
|
sub.Close()
|
|
}
|
|
}
|
|
|
|
func (s *subscribeState) closed() <-chan struct{} {
|
|
return s.dispatcher.Done()
|
|
}
|
|
|
|
// A Subscriber delivers one type of event from a [Client].
|
|
// Events are sent to the [Subscriber.Events] channel.
|
|
type Subscriber[T any] struct {
|
|
// core holds the non-generic subscriber-interface implementation
|
|
// (Close, subscribeType, dispatch, slow timer, unregister) shared
|
|
// with [SubscriberFunc] via [subscriberCore]. The only per-T state
|
|
// owned by the facade itself is the typed delivery channel; the
|
|
// dispatch loop, unlike SubscriberFunc, must remain per-T — see
|
|
// [Subscriber.dispatchTyped].
|
|
core *subscriberCore
|
|
read chan T
|
|
}
|
|
|
|
func newSubscriber[T any](r *subscribeState, logf logger.Logf) *Subscriber[T] {
|
|
core := newSubscriberCore(r, logf, reflect.TypeFor[T]())
|
|
s := &Subscriber[T]{
|
|
core: core,
|
|
read: make(chan T),
|
|
}
|
|
// Subscriber[T] keeps a per-T dispatch loop; see [Subscriber.dispatchTyped]
|
|
// for why we don't share the non-generic dispatchFunc that SubscriberFunc
|
|
// uses.
|
|
core.dispatchFn = func(
|
|
ctx context.Context,
|
|
vals *queue[DeliveredEvent],
|
|
acceptCh func() chan DeliveredEvent,
|
|
snapshot chan chan []DeliveredEvent,
|
|
) bool {
|
|
return s.dispatchTyped(ctx, vals, acceptCh, snapshot)
|
|
}
|
|
return s
|
|
}
|
|
|
|
// dispatchTyped is the per-T dispatch loop for Subscriber[T]. It has to remain
|
|
// generic because the typed channel send `case s.read <- t:` must appear
|
|
// lexically inside the select. The rest of the cases match the non-generic
|
|
// dispatchFunc body to keep behavior aligned between Subscriber and
|
|
// SubscriberFunc.
|
|
//
|
|
// We don't share dispatchFunc (the way SubscriberFunc does) because bridging
|
|
// the typed channel send and the non-generic select would require running the
|
|
// send on its own goroutine on every event delivery. That bridge was measured
|
|
// at ~2.7x throughput regression on BenchmarkBasicThroughput, so we keep
|
|
// dispatchTyped generic and pay the per-shape stencil cost instead (measured
|
|
// at ~1,600 B body + ~1,100 B pclntab per shape on linux/amd64 tailscaled).
|
|
// Only the typed select lives in the per-shape stencil; the surrounding state
|
|
// (slow timer, log function, type name) is reached through the non-generic
|
|
// core.
|
|
func (s *Subscriber[T]) dispatchTyped(
|
|
ctx context.Context,
|
|
vals *queue[DeliveredEvent],
|
|
acceptCh func() chan DeliveredEvent,
|
|
snapshot chan chan []DeliveredEvent,
|
|
) bool {
|
|
t := vals.Peek().Event.(T)
|
|
|
|
start := time.Now()
|
|
s.core.slow.Reset(slowSubscriberTimeout)
|
|
defer s.core.slow.Stop()
|
|
|
|
for {
|
|
// Keep the cases in this select in sync with subscribeState.pump
|
|
// above. The only difference should be that this select
|
|
// delivers a value on s.read.
|
|
select {
|
|
case s.read <- t:
|
|
vals.Drop()
|
|
return true
|
|
case val := <-acceptCh():
|
|
vals.Add(val)
|
|
case <-ctx.Done():
|
|
return false
|
|
case ch := <-snapshot:
|
|
ch <- vals.Snapshot()
|
|
case <-s.core.slow.C:
|
|
s.core.logf("subscriber for %s is slow (%v elapsed)", s.core.typeName, time.Since(start))
|
|
s.core.slow.Reset(slowSubscriberTimeout)
|
|
}
|
|
}
|
|
}
|
|
|
|
func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] {
|
|
s := &Subscriber[T]{
|
|
// Monitors don't go through the bus's dispatch path (they
|
|
// are attached directly to the debug hook), so they don't
|
|
// need a fully-initialized subscriberCore — only the typed
|
|
// delivery channel and an unregister callback. We give them
|
|
// a placeholder core so Close() and Done() work uniformly.
|
|
core: &subscriberCore{},
|
|
read: make(chan T, 100), // arbitrary, large
|
|
}
|
|
cancel := attach(s.monitor)
|
|
s.core.unregister = func(reflect.Type) { cancel() }
|
|
return s
|
|
}
|
|
|
|
func (s *Subscriber[T]) monitor(debugEvent T) {
|
|
select {
|
|
case s.read <- debugEvent:
|
|
case <-s.core.stop.Done():
|
|
}
|
|
}
|
|
|
|
// Events returns a channel on which the subscriber's events are
|
|
// delivered.
|
|
func (s *Subscriber[T]) Events() <-chan T {
|
|
return s.read
|
|
}
|
|
|
|
// Done returns a channel that is closed when the subscriber is
|
|
// closed.
|
|
func (s *Subscriber[T]) Done() <-chan struct{} {
|
|
return s.core.stop.Done()
|
|
}
|
|
|
|
// Close closes the Subscriber, indicating the caller no longer wishes
|
|
// to receive this event type. After Close, receives on
|
|
// [Subscriber.Events] block for ever.
|
|
//
|
|
// If the Bus from which the Subscriber was created is closed,
|
|
// the Subscriber is implicitly closed and does not need to be closed
|
|
// separately.
|
|
func (s *Subscriber[T]) Close() { s.core.Close() }
|
|
|
|
// A SubscriberFunc delivers one type of event from a [Client].
|
|
// Events are forwarded synchronously to a function provided at construction.
|
|
type SubscriberFunc[T any] struct {
|
|
// core holds the non-generic subscriber-interface implementation shared
|
|
// with [Subscriber] via [subscriberCore]. The user callback is captured
|
|
// in the dispatchFn closure on the core, so SubscriberFunc[T] itself
|
|
// carries no per-T state beyond the core pointer; per-T cost is limited
|
|
// to the small forwarding Close method below.
|
|
core *subscriberCore
|
|
}
|
|
|
|
// subscriberCore is the non-generic backing for both Subscriber[T] and
|
|
// SubscriberFunc[T]. It implements the package-private subscriber interface
|
|
// so that the bus (and the subscribeState map) can store it without per-T
|
|
// itabs or dictionaries. The per-T behavior (type assertion plus either typed
|
|
// channel send or user callback invocation) is encapsulated in the dispatchFn
|
|
// closure set up by the constructor of the typed facade.
|
|
type subscriberCore struct {
|
|
stop stopFlag
|
|
unregister func(reflect.Type)
|
|
logf logger.Logf
|
|
slow *time.Timer // used to detect slow subscriber service
|
|
|
|
// typ is the cached reflect.Type of T. Returned by
|
|
// subscribeType() and used by the dispatch closure to format
|
|
// slow-subscriber log messages.
|
|
typ reflect.Type
|
|
// typeName is the cached reflect.TypeFor[T]().String() result.
|
|
// Computed once at construction time so the dispatch closure
|
|
// (which runs once per delivered event) doesn't allocate a
|
|
// fresh string on every call. The string is also independent
|
|
// of T, so it doesn't contribute to per-T stencil cost.
|
|
typeName string
|
|
|
|
// dispatchFn is the per-T dispatch closure. It performs the type
|
|
// assertion vals.Peek().Event.(T) and runs the typed delivery (either a
|
|
// user-callback invocation for SubscriberFunc[T] or a typed channel send
|
|
// for Subscriber[T]). The closure body is non-generic apart from those
|
|
// two T-bound operations; the bulk of the dispatch work happens in the
|
|
// non-generic dispatchFunc helper (used by SubscriberFunc) or in the
|
|
// Subscriber[T].dispatchTyped per-shape stencil.
|
|
dispatchFn func(
|
|
ctx context.Context,
|
|
vals *queue[DeliveredEvent],
|
|
acceptCh func() chan DeliveredEvent,
|
|
snapshot chan chan []DeliveredEvent,
|
|
) bool
|
|
}
|
|
|
|
func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *SubscriberFunc[T] {
|
|
core := newSubscriberCore(r, logf, reflect.TypeFor[T]())
|
|
// The dispatch closure is the only piece that intrinsically
|
|
// needs T: it performs the type assertion on the head queue
|
|
// value and forwards the unboxed value to the user callback.
|
|
// All non-generic setup (timer, core allocation, unregister
|
|
// closure) lives in newSubscriberCore so it isn't
|
|
// duplicated per T.
|
|
core.dispatchFn = func(
|
|
ctx context.Context,
|
|
vals *queue[DeliveredEvent],
|
|
acceptCh func() chan DeliveredEvent,
|
|
snapshot chan chan []DeliveredEvent,
|
|
) bool {
|
|
t := vals.Peek().Event.(T)
|
|
callDone := make(chan struct{})
|
|
// `go runFuncCallback(f, t, callDone)` binds its arguments
|
|
// directly to the new goroutine's frame; using a closure
|
|
// (`go func() { f(t) }()`) would allocate a closure on the
|
|
// heap on every dispatched event.
|
|
go runFuncCallback(f, t, callDone)
|
|
return dispatchFunc(ctx, core, vals, acceptCh, snapshot, callDone)
|
|
}
|
|
return &SubscriberFunc[T]{core: core}
|
|
}
|
|
|
|
// newSubscriberCore performs the non-generic portion of subscriber
|
|
// construction: timer setup, core struct allocation, and assignment of the
|
|
// unregister method-value. The caller fills in the per-T dispatchFn
|
|
// afterward.
|
|
//
|
|
// Hoisting this out of the typed constructors (newSubscriber[T] and
|
|
// newSubscriberFunc[T]) eliminates the bulk of their per-T stencil cost; the
|
|
// only T-typed instructions left in each generic constructor are the
|
|
// reflect.TypeFor[T]() call (whose body is shared via the
|
|
// internal/abi.TypeFor[T] dictionary) and the construction of the dispatch
|
|
// closure itself.
|
|
func newSubscriberCore(r *subscribeState, logf logger.Logf, typ reflect.Type) *subscriberCore {
|
|
slow := time.NewTimer(0)
|
|
slow.Stop() // reset in dispatch
|
|
core := &subscriberCore{
|
|
logf: logf,
|
|
slow: slow,
|
|
typ: typ,
|
|
typeName: typ.String(),
|
|
}
|
|
core.unregister = r.deleteSubscriber
|
|
return core
|
|
}
|
|
|
|
// Close closes the SubscriberFunc, indicating the caller no longer wishes to
|
|
// receive this event type. After Close, no further events will be passed to
|
|
// the callback.
|
|
//
|
|
// If the [Bus] from which s was created is closed, s is implicitly closed and
|
|
// does not need to be closed separately.
|
|
func (s *SubscriberFunc[T]) Close() { s.core.Close() }
|
|
|
|
// Close implements the subscriber interface and the user-facing Close on
|
|
// both Subscriber[T] and SubscriberFunc[T].
|
|
func (c *subscriberCore) Close() {
|
|
c.stop.Stop()
|
|
c.unregister(c.typ)
|
|
}
|
|
|
|
// subscribeType implements the subscriber interface.
|
|
func (c *subscriberCore) subscribeType() reflect.Type { return c.typ }
|
|
|
|
// dispatch implements the subscriber interface by invoking the
|
|
// per-T dispatch closure that was captured at construction time.
|
|
func (c *subscriberCore) dispatch(
|
|
ctx context.Context,
|
|
vals *queue[DeliveredEvent],
|
|
acceptCh func() chan DeliveredEvent,
|
|
snapshot chan chan []DeliveredEvent,
|
|
) bool {
|
|
return c.dispatchFn(ctx, vals, acceptCh, snapshot)
|
|
}
|
|
|
|
// dispatchFunc is the non-generic body of SubscriberFunc[T].dispatch.
|
|
// It is identical in observable behavior to the original loop; the
|
|
// only differences are that the dispatched value has already been
|
|
// unboxed by the caller (and the user callback is already running
|
|
// on its own goroutine, signaling completion via callDone) and the
|
|
// slow-subscriber timer / cached type name come from the
|
|
// non-generic core, not from a per-T struct.
|
|
//
|
|
// callDone is closed by runFuncCallback when the user callback returns.
|
|
func dispatchFunc(
|
|
ctx context.Context,
|
|
core *subscriberCore,
|
|
vals *queue[DeliveredEvent],
|
|
acceptCh func() chan DeliveredEvent,
|
|
snapshot chan chan []DeliveredEvent,
|
|
callDone chan struct{},
|
|
) bool {
|
|
start := time.Now()
|
|
core.slow.Reset(slowSubscriberTimeout)
|
|
defer core.slow.Stop()
|
|
|
|
// Keep the cases in this select in sync with subscribeState.pump
|
|
// above. The only difference should be that this select
|
|
// delivers a value by calling the user callback (via the
|
|
// goroutine spawned by the typed wrapper).
|
|
for {
|
|
select {
|
|
case <-callDone:
|
|
vals.Drop()
|
|
return true
|
|
case val := <-acceptCh():
|
|
vals.Add(val)
|
|
case <-ctx.Done():
|
|
// Wait for the callback to be complete, but not forever.
|
|
core.slow.Reset(5 * slowSubscriberTimeout)
|
|
select {
|
|
case <-core.slow.C:
|
|
core.logf("giving up on subscriber for %s after %v at close", core.typeName, time.Since(start))
|
|
if cibuild.On() {
|
|
all := make([]byte, 2<<20)
|
|
n := runtime.Stack(all, true)
|
|
core.logf("goroutine stacks:\n%s", all[:n])
|
|
}
|
|
case <-callDone:
|
|
}
|
|
return false
|
|
case ch := <-snapshot:
|
|
ch <- vals.Snapshot()
|
|
case <-core.slow.C:
|
|
core.logf("subscriber for %s is slow (%v elapsed)", core.typeName, time.Since(start))
|
|
core.slow.Reset(slowSubscriberTimeout)
|
|
}
|
|
}
|
|
}
|
|
|
|
// runFuncCallback runs f(t) and closes done when it returns. It is
|
|
// the per-T worker spawned as a goroutine for each dispatched
|
|
// event. Keeping it as a regular generic function (rather than a
|
|
// closure) means `go runFuncCallback(f, t, done)` binds its
|
|
// arguments to the goroutine's frame directly, with no per-event
|
|
// closure allocation. The body is small (defer + one indirect
|
|
// call), so the per-shape stencil cost is minimal.
|
|
func runFuncCallback[T any](f func(T), t T, done chan struct{}) {
|
|
defer close(done)
|
|
f(t)
|
|
}
|