e7415e6393
Brings Subscriber[T] in line with the same non-generic-core pattern already
applied to SubscriberFunc[T] and Publisher[T]:
- Renames subscriberFuncCore to subscriberCore and shares it between
Subscriber[T] and SubscriberFunc[T]. Both typed facades hold a
*subscriberCore plus their respective per-T delivery state
(Subscriber: chan T; SubscriberFunc: nothing, the user callback is
captured in the dispatch closure).
- The bus's outputs map and subscriber-interface itab key on
*subscriberCore for both subscriber kinds, so adding a new Subscribe[T]
call site no longer pays a per-T itab, dictionary, or equality function
for the subscriber-interface side.
- Subscribe[T] now hoists the non-generic constructor portion into
newSubscriberCore (timer setup, core allocation, cached type/typeName,
unregister method-value), matching SubscribeFunc.
The dispatch loop is intentionally NOT extracted to a non-generic helper for
Subscriber[T], unlike SubscriberFunc[T]. The reason is the typed channel send
'case s.read <- t:' must appear lexically inside the select; the only way to
lift it into a non-generic loop is to bridge typed and untyped via a per-event
goroutine, which costs ~2.7x throughput on BenchmarkBasicThroughput. We keep
dispatchTyped on the generic facade and accept the per-shape stencil cost as
the cheaper alternative.
Symbol-level effect on tailscaled (linux/amd64, measured via
`go tool nm -size`):
Before:
(*Subscriber[T]).dispatch
2 shape stencils: 1,682 + 1,549 = 3,231 B
3 thin per-T wrappers: 124 B each = 372 B
2 deferwrap1 helpers: 62 B each = 124 B
total: 3,727 B
After:
(*Subscriber[T]).dispatchTyped
2 shape stencils: 1,678 + 1,582 = 3,260 B
0 per-T wrappers (replaced by closure stored on core)
2 deferwrap1 helpers: 62 B each = 124 B
total: 3,384 B
dispatch path .text delta: -343 B (-9.2%)
Per-shape stencils are ~1,600 B (.text body) + ~1,100 B (pclntab) =
~2,700 B each on production tailscaled. The shape count matches before/after
(two distinct GC shapes for the Subscriber[T] event types in this binary).
What changes is that the per-T thin wrappers are eliminated because
Subscriber[T] no longer implements the subscriber interface directly.
Whole-binary section deltas:
.text: -2,304 B (includes the dispatch savings plus other
small downstream effects)
.rodata: +512 B (additional closure-type metadata)
.gopclntab: -2,981 B (fewer per-T compiled functions => less metadata)
Stripped tailscaled (linux/amd64): no change at the file level (the savings
fall below the linker's section-alignment boundary). Unstripped builds shrink
by ~2,900 B.
Behavior is unchanged:
BenchmarkBasicThroughput: 2,161 ns/op, 0 B/op, 0 allocs/op
BenchmarkBasicFuncThroughput: 2,493 ns/op, 144 B/op, 2 allocs/op
BenchmarkSubsThroughput: 3,727 ns/op, 0 B/op, 0 allocs/op
Updates #12614
Change-Id: I97918ec68bd2cdb15958bbfd7687592b39663efe
Signed-off-by: James Tucker <james@tailscale.com>
195 lines
5.2 KiB
Go
195 lines
5.2 KiB
Go
// Copyright (c) Tailscale Inc & contributors
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
package eventbus
|
|
|
|
import (
|
|
"reflect"
|
|
|
|
"tailscale.com/syncs"
|
|
"tailscale.com/types/logger"
|
|
"tailscale.com/util/set"
|
|
)
|
|
|
|
// A Client can publish and subscribe to events on its attached
|
|
// bus. See [Publish] to publish events, and [Subscribe] to receive
|
|
// events.
|
|
//
|
|
// Subscribers that share the same client receive events one at a
|
|
// time, in the order they were published.
|
|
type Client struct {
|
|
name string
|
|
bus *Bus
|
|
publishDebug hook[PublishedEvent]
|
|
|
|
mu syncs.Mutex
|
|
pub set.Set[publisher]
|
|
sub *subscribeState // Lazily created on first subscribe
|
|
stop stopFlag // signaled on Close
|
|
}
|
|
|
|
func (c *Client) Name() string { return c.name }
|
|
|
|
func (c *Client) logger() logger.Logf { return c.bus.logger() }
|
|
|
|
// Close closes the client. It implicitly closes all publishers and
|
|
// subscribers obtained from this client.
|
|
func (c *Client) Close() {
|
|
var (
|
|
pub set.Set[publisher]
|
|
sub *subscribeState
|
|
)
|
|
|
|
c.mu.Lock()
|
|
pub, c.pub = c.pub, nil
|
|
sub, c.sub = c.sub, nil
|
|
c.mu.Unlock()
|
|
|
|
if sub != nil {
|
|
sub.close()
|
|
}
|
|
for p := range pub {
|
|
p.Close()
|
|
}
|
|
c.stop.Stop()
|
|
}
|
|
|
|
func (c *Client) isClosed() bool { return c.pub == nil && c.sub == nil }
|
|
|
|
// Done returns a channel that is closed when [Client.Close] is called.
|
|
// The channel is closed after all the publishers and subscribers governed by
|
|
// the client have been closed.
|
|
func (c *Client) Done() <-chan struct{} { return c.stop.Done() }
|
|
|
|
func (c *Client) snapshotSubscribeQueue() []DeliveredEvent {
|
|
return c.peekSubscribeState().snapshotQueue()
|
|
}
|
|
|
|
func (c *Client) peekSubscribeState() *subscribeState {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
return c.sub
|
|
}
|
|
|
|
func (c *Client) publishTypes() []reflect.Type {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
ret := make([]reflect.Type, 0, len(c.pub))
|
|
for pub := range c.pub {
|
|
ret = append(ret, pub.publishType())
|
|
}
|
|
return ret
|
|
}
|
|
|
|
func (c *Client) subscribeTypes() []reflect.Type {
|
|
return c.peekSubscribeState().subscribeTypes()
|
|
}
|
|
|
|
func (c *Client) subscribeState() *subscribeState {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
return c.subscribeStateLocked()
|
|
}
|
|
|
|
func (c *Client) subscribeStateLocked() *subscribeState {
|
|
if c.sub == nil {
|
|
c.sub = newSubscribeState(c)
|
|
}
|
|
return c.sub
|
|
}
|
|
|
|
func (c *Client) addPublisher(pub publisher) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
if c.isClosed() {
|
|
panic("cannot Publish on a closed client")
|
|
}
|
|
c.pub.Add(pub)
|
|
}
|
|
|
|
func (c *Client) deletePublisher(pub publisher) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
c.pub.Delete(pub)
|
|
}
|
|
|
|
func (c *Client) addSubscriber(t reflect.Type, s *subscribeState) {
|
|
c.bus.subscribe(t, s)
|
|
}
|
|
|
|
func (c *Client) deleteSubscriber(t reflect.Type, s *subscribeState) {
|
|
c.bus.unsubscribe(t, s)
|
|
}
|
|
|
|
func (c *Client) publish() chan<- PublishedEvent {
|
|
return c.bus.write
|
|
}
|
|
|
|
func (c *Client) shouldPublish(t reflect.Type) bool {
|
|
return c.publishDebug.active() || c.bus.shouldPublish(t)
|
|
}
|
|
|
|
// Subscribe requests delivery of events of type T through the given client.
|
|
// It panics if c already has a subscriber for type T, or if c is closed.
|
|
func Subscribe[T any](c *Client) *Subscriber[T] {
|
|
// Hold the client lock throughout the subscription process so that a caller
|
|
// attempting to subscribe on a closed client will get a useful diagnostic
|
|
// instead of a random panic from inside the subscriber plumbing.
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
// The caller should not race subscriptions with close, give them a useful
|
|
// diagnostic at the call site.
|
|
if c.isClosed() {
|
|
panic("cannot Subscribe on a closed client")
|
|
}
|
|
|
|
r := c.subscribeStateLocked()
|
|
s := newSubscriber[T](r, logfForCaller(c.logger()))
|
|
// Register the non-generic core with the bus rather than the typed facade,
|
|
// mirroring SubscribeFunc and Publish: this keeps the bus's outputs map
|
|
// and subscriber-interface itab out of per-T cost.
|
|
r.addSubscriber(s.core)
|
|
return s
|
|
}
|
|
|
|
// SubscribeFunc is like [Subscribe], but calls the provided func for each
|
|
// event of type T.
|
|
//
|
|
// A SubscriberFunc calls f synchronously from the client's goroutine.
|
|
// This means the callback must not block for an extended period of time,
|
|
// as this will block the subscriber and slow event processing for all
|
|
// subscriptions on c.
|
|
func SubscribeFunc[T any](c *Client, f func(T)) *SubscriberFunc[T] {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
// The caller should not race subscriptions with close, give them a useful
|
|
// diagnostic at the call site.
|
|
if c.isClosed() {
|
|
panic("cannot SubscribeFunc on a closed client")
|
|
}
|
|
|
|
r := c.subscribeStateLocked()
|
|
s := newSubscriberFunc[T](r, f, logfForCaller(c.logger()))
|
|
// Register the non-generic core, not the typed facade. Doing
|
|
// so means the bus's outputs map and the subscriber interface
|
|
// itab are not parameterized by T, eliminating per-T itab and
|
|
// dictionary cost.
|
|
r.addSubscriber(s.core)
|
|
return s
|
|
}
|
|
|
|
// Publish returns a publisher for event type T using the given client.
|
|
// It panics if c is closed.
|
|
func Publish[T any](c *Client) *Publisher[T] {
|
|
p := newPublisher[T](c)
|
|
// Register the non-generic core with the client so the
|
|
// per-Client publisher set, the publisher interface itab, and
|
|
// the publisher equality function are not parameterized by T.
|
|
// This eliminates per-T itab/dictionary/eq cost for every new
|
|
// event type passed through Publish[T].
|
|
c.addPublisher(p.core)
|
|
return p
|
|
}
|