util/eventbus: give a nicer error when attempting to use a closed client (#17208)

It is a programming error to Publish or Subscribe on a closed Client, but now
the way you discover that is by getting a panic from down in the machinery of
the bus after the client state has been cleaned up.

To provide a more helpful error, let's panic explicitly when that happens and
say what went wrong ("the client is closed"), by preventing subscriptions from
interleaving with closure of the client. With this change, either an attachment
fails outright (because the client is already closed) or completes and then
shuts down in good order in the normal course.

This does not change the semantics of the client, publishers, or subscribers,
it's just making the failure more eager so we can attach explanatory text.

Updates #15160

Change-Id: Ia492f4c1dea7535aec2cdcc2e5ea5410ed5218d2
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
This commit is contained in:
M. J. Fromberger
2025-09-22 07:07:57 -07:00
committed by GitHub
parent 6e128498a7
commit e59fbaab64
2 changed files with 64 additions and 6 deletions
+38 -1
View File
@@ -257,8 +257,8 @@ func TestMonitor(t *testing.T) {
cli := bus.Client("test client")
// The monitored goroutine runs until the client or test subscription ends.
sub := eventbus.Subscribe[string](cli)
m := cli.Monitor(func(c *eventbus.Client) {
sub := eventbus.Subscribe[string](cli)
select {
case <-c.Done():
t.Log("client closed")
@@ -294,6 +294,43 @@ func TestMonitor(t *testing.T) {
t.Run("Wait", testMon(t, func(c *eventbus.Client, m eventbus.Monitor) { c.Close(); m.Wait() }))
}
func TestRegression(t *testing.T) {
bus := eventbus.New()
t.Cleanup(bus.Close)
t.Run("SubscribeClosed", func(t *testing.T) {
c := bus.Client("test sub client")
c.Close()
var v any
func() {
defer func() { v = recover() }()
eventbus.Subscribe[string](c)
}()
if v == nil {
t.Fatal("Expected a panic from Subscribe on a closed client")
} else {
t.Logf("Got expected panic: %v", v)
}
})
t.Run("PublishClosed", func(t *testing.T) {
c := bus.Client("test pub client")
c.Close()
var v any
func() {
defer func() { v = recover() }()
eventbus.Publish[string](c)
}()
if v == nil {
t.Fatal("expected a panic from Publish on a closed client")
} else {
t.Logf("Got expected panic: %v", v)
}
})
}
type queueChecker struct {
t *testing.T
want []any