0def0f19bd
The (*SubscriberFunc[T]).dispatch method body — a ~40-line select
loop with slow-subscriber timer, snapshot handling, ctx-cancel
draining, and a CI stack-dump branch — was previously fully
duplicated by the Go compiler for every distinct GC shape of T.
None of that body actually depends on T except for the type
assertion and the user callback invocation.
This change moves the loop body into a non-generic dispatchFunc()
helper, leaving (*SubscriberFunc[T]).dispatch as a tiny wrapper
that:
- performs the vals.Peek().Event.(T) type assertion
- spawns the callback goroutine via `go runFuncCallback(s.read,
t, callDone)` — a regular generic function call, not a closure,
so that `go` binds the args to the goroutine's frame instead of
allocating a closure on the heap. This preserves the
zero-extra-allocation behavior of the original
(*SubscriberFunc[T]).runCallback method.
- resolves T's name via reflect.TypeFor[T]().String() (cached on
the stack rather than recomputed on each %T formatting)
- calls dispatchFunc with the callDone channel
The %T formatting in the original logf calls is replaced with %s
on the resolved name string, removing per-T fmt instantiations.
A new BenchmarkBasicFuncThroughput is added alongside the existing
BenchmarkBasicThroughput so per-event allocation behavior on the
SubscribeFunc dispatch path is covered by the benchmark suite.
Measured impact (util/eventbus/sizetest):
SubscriberFunc per-flow attribution:
linux/amd64: 912.5 B/flow -> 840.8 B/flow (-71.7 B/flow)
linux/arm64: 917.5 B/flow -> 849.9 B/flow (-67.6 B/flow)
The total per-flow size delta on amd64 dropped from 3,096.6 B to
3,039.2 B (-57 B/flow). The arm64 total stayed at 3,145.7 B
because the linker's page-aligned section sizing absorbed the
improvement on this binary; the symcost-attributed per-receiver
number is the real signal.
Behavior is unchanged: BenchmarkBasicThroughput stays at 0
allocs/op and BenchmarkBasicFuncThroughput holds at the same 2
allocs/op, 144 B/op as the prior eventbus implementation. All
eventbus tests pass.
Updates #12614
Change-Id: I85f933f50f58cd25bbfe5cc46bdda7aab22f0bf7
Signed-off-by: James Tucker <james@tailscale.com>
147 lines
2.9 KiB
Go
147 lines
2.9 KiB
Go
// Copyright (c) Tailscale Inc & contributors
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
package eventbus_test
|
|
|
|
import (
|
|
"math/rand/v2"
|
|
"testing"
|
|
|
|
"tailscale.com/util/eventbus"
|
|
)
|
|
|
|
func BenchmarkBasicThroughput(b *testing.B) {
|
|
bus := eventbus.New()
|
|
pcli := bus.Client(b.Name() + "-pub")
|
|
scli := bus.Client(b.Name() + "-sub")
|
|
|
|
type emptyEvent [0]byte
|
|
|
|
// One publisher and a corresponding subscriber shoveling events as fast as
|
|
// they can through the plumbing.
|
|
pub := eventbus.Publish[emptyEvent](pcli)
|
|
sub := eventbus.Subscribe[emptyEvent](scli)
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-sub.Events():
|
|
continue
|
|
case <-sub.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
for b.Loop() {
|
|
pub.Publish(emptyEvent{})
|
|
}
|
|
bus.Close()
|
|
}
|
|
|
|
// BenchmarkBasicFuncThroughput is the SubscribeFunc analogue of
|
|
// BenchmarkBasicThroughput: one publisher and one SubscribeFunc
|
|
// callback, shoveling events as fast as they can through the
|
|
// plumbing. Useful for tracking per-event allocation behavior on the
|
|
// SubscribeFunc dispatch path.
|
|
func BenchmarkBasicFuncThroughput(b *testing.B) {
|
|
bus := eventbus.New()
|
|
pcli := bus.Client(b.Name() + "-pub")
|
|
scli := bus.Client(b.Name() + "-sub")
|
|
|
|
type emptyEvent [0]byte
|
|
|
|
pub := eventbus.Publish[emptyEvent](pcli)
|
|
eventbus.SubscribeFunc(scli, func(emptyEvent) {})
|
|
|
|
for b.Loop() {
|
|
pub.Publish(emptyEvent{})
|
|
}
|
|
bus.Close()
|
|
}
|
|
|
|
func BenchmarkSubsThroughput(b *testing.B) {
|
|
bus := eventbus.New()
|
|
pcli := bus.Client(b.Name() + "-pub")
|
|
scli1 := bus.Client(b.Name() + "-sub1")
|
|
scli2 := bus.Client(b.Name() + "-sub2")
|
|
|
|
type emptyEvent [0]byte
|
|
|
|
// One publisher and two subscribers shoveling events as fast as they can
|
|
// through the plumbing.
|
|
pub := eventbus.Publish[emptyEvent](pcli)
|
|
sub1 := eventbus.Subscribe[emptyEvent](scli1)
|
|
sub2 := eventbus.Subscribe[emptyEvent](scli2)
|
|
|
|
for _, sub := range []*eventbus.Subscriber[emptyEvent]{sub1, sub2} {
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-sub.Events():
|
|
continue
|
|
case <-sub.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
for b.Loop() {
|
|
pub.Publish(emptyEvent{})
|
|
}
|
|
bus.Close()
|
|
}
|
|
|
|
func BenchmarkMultiThroughput(b *testing.B) {
|
|
bus := eventbus.New()
|
|
cli := bus.Client(b.Name())
|
|
|
|
type eventA struct{}
|
|
type eventB struct{}
|
|
|
|
// Two disjoint event streams routed through the global order.
|
|
apub := eventbus.Publish[eventA](cli)
|
|
asub := eventbus.Subscribe[eventA](cli)
|
|
bpub := eventbus.Publish[eventB](cli)
|
|
bsub := eventbus.Subscribe[eventB](cli)
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-asub.Events():
|
|
continue
|
|
case <-asub.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-bsub.Events():
|
|
continue
|
|
case <-bsub.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
var rng uint64
|
|
var bits int
|
|
for b.Loop() {
|
|
if bits == 0 {
|
|
rng = rand.Uint64()
|
|
bits = 64
|
|
}
|
|
if rng&1 == 0 {
|
|
apub.Publish(eventA{})
|
|
} else {
|
|
bpub.Publish(eventB{})
|
|
}
|
|
rng >>= 1
|
|
bits--
|
|
}
|
|
bus.Close()
|
|
}
|