Files
tailscale/util/eventbus/client.go
T
James Tucker 4eec4423b4 util/eventbus: move Publisher publisher-interface impl to a non-generic core
Mirrors the same refactor previously applied to SubscriberFunc:

  - Publisher[T]: a thin user-facing facade. Holds a pointer to a
    non-generic publisherCore and exposes Publish/Close/ShouldPublish.
  - publisherCore: a non-generic struct that owns the *Client back-
    pointer, stop flag, and cached reflect.Type. It implements the
    package-private publisher interface (publishType, Close).
    The bus's per-Client publisher set is set.Set[publisher] keyed
    on this single non-generic type.

The publisher interface only exists to support diagnostic
introspection (Debugger.PublishTypes returning the list of types a
client publishes). Previously, satisfying that diagnostic-only
interface forced *Publisher[T] to be the implementor and cost a
per-T itab, generic dictionary, and equality function on every
event type ever passed through Publish[T]. Moving the
implementation to a non-generic core lets the diagnostic surface
work unchanged while charging zero per-T cost for the
diagnostic-driven generic interface.

Publisher[T].Publish is also slimmed: the channel/select/stopFlag
loop is now a non-generic publish() helper that takes the value as
'any'. The per-T body is reduced to forwarding the boxed value to
the helper.

Measured impact (util/eventbus/sizetest):

  total per-flow binary cost:
    linux/amd64:  2252.8 B/flow -> 1900.5 B/flow  (-352.3 B / -15.6%)
    linux/arm64:  2228.2 B/flow -> 1835.0 B/flow  (-393.2 B / -17.6%)

  Publisher per-receiver attribution:
    linux/amd64:   635.2 B/flow ->  369.6 B/flow  (-265.6 B / -41.8%)
    linux/arm64:   751.7 B/flow ->  373.2 B/flow  (-378.5 B / -50.4%)

Cumulative reduction from the original baseline (5167ff412):
    linux/amd64:  3096.6 B/flow -> 1900.5 B/flow  (-1196.1 B / -38.6%)
    linux/arm64:  3145.7 B/flow -> 1835.0 B/flow  (-1310.7 B / -41.7%)

Dropped per-T symbols (200-flow eventbus binary):

  - .dict.Publisher[T]                   was 14,400 B (72 B/T)
  - type:.eq.Publisher[T]                was 11,832 B (58 B/T)
  - go:itab.*Publisher[T],publisher      was  8,000 B (40 B/T)
  - (*Publisher[T]).Close shape stencils collapsed to 1

Behavior is unchanged: BenchmarkBasicThroughput is within noise
(2018 -> 2038 ns/op at -benchtime=2s) and all eventbus tests pass.

Updates #12614

Change-Id: I61979c2bf95d2a711c2321e6e0b4b7d15980e9f5
Signed-off-by: James Tucker <james@tailscale.com>
2026-05-11 14:39:42 -07:00

192 lines
5.0 KiB
Go

// Copyright (c) Tailscale Inc & contributors
// SPDX-License-Identifier: BSD-3-Clause
package eventbus
import (
"reflect"
"tailscale.com/syncs"
"tailscale.com/types/logger"
"tailscale.com/util/set"
)
// A Client can publish and subscribe to events on its attached
// bus. See [Publish] to publish events, and [Subscribe] to receive
// events.
//
// Subscribers that share the same client receive events one at a
// time, in the order they were published.
type Client struct {
name string
bus *Bus
publishDebug hook[PublishedEvent]
mu syncs.Mutex
pub set.Set[publisher]
sub *subscribeState // Lazily created on first subscribe
stop stopFlag // signaled on Close
}
func (c *Client) Name() string { return c.name }
func (c *Client) logger() logger.Logf { return c.bus.logger() }
// Close closes the client. It implicitly closes all publishers and
// subscribers obtained from this client.
func (c *Client) Close() {
var (
pub set.Set[publisher]
sub *subscribeState
)
c.mu.Lock()
pub, c.pub = c.pub, nil
sub, c.sub = c.sub, nil
c.mu.Unlock()
if sub != nil {
sub.close()
}
for p := range pub {
p.Close()
}
c.stop.Stop()
}
func (c *Client) isClosed() bool { return c.pub == nil && c.sub == nil }
// Done returns a channel that is closed when [Client.Close] is called.
// The channel is closed after all the publishers and subscribers governed by
// the client have been closed.
func (c *Client) Done() <-chan struct{} { return c.stop.Done() }
func (c *Client) snapshotSubscribeQueue() []DeliveredEvent {
return c.peekSubscribeState().snapshotQueue()
}
func (c *Client) peekSubscribeState() *subscribeState {
c.mu.Lock()
defer c.mu.Unlock()
return c.sub
}
func (c *Client) publishTypes() []reflect.Type {
c.mu.Lock()
defer c.mu.Unlock()
ret := make([]reflect.Type, 0, len(c.pub))
for pub := range c.pub {
ret = append(ret, pub.publishType())
}
return ret
}
func (c *Client) subscribeTypes() []reflect.Type {
return c.peekSubscribeState().subscribeTypes()
}
func (c *Client) subscribeState() *subscribeState {
c.mu.Lock()
defer c.mu.Unlock()
return c.subscribeStateLocked()
}
func (c *Client) subscribeStateLocked() *subscribeState {
if c.sub == nil {
c.sub = newSubscribeState(c)
}
return c.sub
}
func (c *Client) addPublisher(pub publisher) {
c.mu.Lock()
defer c.mu.Unlock()
if c.isClosed() {
panic("cannot Publish on a closed client")
}
c.pub.Add(pub)
}
func (c *Client) deletePublisher(pub publisher) {
c.mu.Lock()
defer c.mu.Unlock()
c.pub.Delete(pub)
}
func (c *Client) addSubscriber(t reflect.Type, s *subscribeState) {
c.bus.subscribe(t, s)
}
func (c *Client) deleteSubscriber(t reflect.Type, s *subscribeState) {
c.bus.unsubscribe(t, s)
}
func (c *Client) publish() chan<- PublishedEvent {
return c.bus.write
}
func (c *Client) shouldPublish(t reflect.Type) bool {
return c.publishDebug.active() || c.bus.shouldPublish(t)
}
// Subscribe requests delivery of events of type T through the given client.
// It panics if c already has a subscriber for type T, or if c is closed.
func Subscribe[T any](c *Client) *Subscriber[T] {
// Hold the client lock throughout the subscription process so that a caller
// attempting to subscribe on a closed client will get a useful diagnostic
// instead of a random panic from inside the subscriber plumbing.
c.mu.Lock()
defer c.mu.Unlock()
// The caller should not race subscriptions with close, give them a useful
// diagnostic at the call site.
if c.isClosed() {
panic("cannot Subscribe on a closed client")
}
r := c.subscribeStateLocked()
s := newSubscriber[T](r, logfForCaller(c.logger()))
r.addSubscriber(s)
return s
}
// SubscribeFunc is like [Subscribe], but calls the provided func for each
// event of type T.
//
// A SubscriberFunc calls f synchronously from the client's goroutine.
// This means the callback must not block for an extended period of time,
// as this will block the subscriber and slow event processing for all
// subscriptions on c.
func SubscribeFunc[T any](c *Client, f func(T)) *SubscriberFunc[T] {
c.mu.Lock()
defer c.mu.Unlock()
// The caller should not race subscriptions with close, give them a useful
// diagnostic at the call site.
if c.isClosed() {
panic("cannot SubscribeFunc on a closed client")
}
r := c.subscribeStateLocked()
s := newSubscriberFunc[T](r, f, logfForCaller(c.logger()))
// Register the non-generic core, not the typed facade. Doing
// so means the bus's outputs map and the subscriber interface
// itab are not parameterized by T, eliminating per-T itab and
// dictionary cost.
r.addSubscriber(s.core)
return s
}
// Publish returns a publisher for event type T using the given client.
// It panics if c is closed.
func Publish[T any](c *Client) *Publisher[T] {
p := newPublisher[T](c)
// Register the non-generic core with the client so the
// per-Client publisher set, the publisher interface itab, and
// the publisher equality function are not parameterized by T.
// This eliminates per-T itab/dictionary/eq cost for every new
// event type passed through Publish[T].
c.addPublisher(p.core)
return p
}