@ -10,17 +10,12 @@ import (
"sync"
)
type d eliveredEvent struct {
type D eliveredEvent struct {
Event any
From * Client
To * Client
}
type queuedEvent struct {
Event any
From * Client
}
// subscriber is a uniformly typed wrapper around Subscriber[T], so
// that debugging facilities can look at active subscribers.
type subscriber interface {
@ -38,7 +33,7 @@ type subscriber interface {
// processing other potential sources of wakeups, which is how we end
// up at this awkward type signature and sharing of internal state
// through dispatch.
dispatch ( ctx context . Context , vals * queue [ queu edEvent] , acceptCh func ( ) chan queu edEvent) bool
dispatch ( ctx context . Context , vals * queue [ Deliver edEvent] , acceptCh func ( ) chan DeliveredEvent , snapshot chan chan [ ] Deliver edEvent) bool
Close ( )
}
@ -47,9 +42,9 @@ type subscribeState struct {
client * Client
dispatcher * worker
write chan queu edEvent
snapshot chan chan [ ] queu edEvent
debug hook [ d eliveredEvent]
write chan Deliver edEvent
snapshot chan chan [ ] Deliver edEvent
debug hook [ D eliveredEvent]
outputsMu sync . Mutex
outputs map [ reflect . Type ] subscriber
@ -58,8 +53,8 @@ type subscribeState struct {
func newSubscribeState ( c * Client ) * subscribeState {
ret := & subscribeState {
client : c ,
write : make ( chan queu edEvent) ,
snapshot : make ( chan chan [ ] queu edEvent) ,
write : make ( chan Deliver edEvent) ,
snapshot : make ( chan chan [ ] Deliver edEvent) ,
outputs : map [ reflect . Type ] subscriber { } ,
}
ret . dispatcher = runWorker ( ret . pump )
@ -67,8 +62,8 @@ func newSubscribeState(c *Client) *subscribeState {
}
func ( q * subscribeState ) pump ( ctx context . Context ) {
var vals queue [ queu edEvent]
acceptCh := func ( ) chan queu edEvent {
var vals queue [ Deliver edEvent]
acceptCh := func ( ) chan Deliver edEvent {
if vals . Full ( ) {
return nil
}
@ -83,12 +78,12 @@ func (q *subscribeState) pump(ctx context.Context) {
vals . Drop ( )
continue
}
if ! sub . dispatch ( ctx , & vals , acceptCh ) {
if ! sub . dispatch ( ctx , & vals , acceptCh , q . snapshot ) {
return
}
if q . debug . active ( ) {
q . debug . run ( d eliveredEvent{
q . debug . run ( D eliveredEvent{
Event : val . Event ,
From : val . From ,
To : q . client ,
@ -111,6 +106,20 @@ func (q *subscribeState) pump(ctx context.Context) {
}
}
func ( s * subscribeState ) snapshotQueue ( ) [ ] DeliveredEvent {
if s == nil {
return nil
}
resp := make ( chan [ ] DeliveredEvent )
select {
case s . snapshot <- resp :
return <- resp
case <- s . dispatcher . Done ( ) :
return nil
}
}
func ( s * subscribeState ) addSubscriber ( t reflect . Type , sub subscriber ) {
s . outputsMu . Lock ( )
defer s . outputsMu . Unlock ( )
@ -154,28 +163,43 @@ func (s *subscribeState) closed() <-chan struct{} {
// A Subscriber delivers one type of event from a [Client].
type Subscriber [ T any ] struct {
stop stopFlag
recv * subscribeState
read chan T
stop stopFlag
read chan T
unregister func ( )
}
func newSubscriber [ T any ] ( r * subscribeState ) * Subscriber [ T ] {
t := reflect . TypeFor [ T ] ( )
ret := & Subscriber [ T ] {
recv : r ,
read : make ( chan T ) ,
read : make ( chan T ) ,
unregister : func ( ) { r . deleteSubscriber ( t ) } ,
}
r . addSubscriber ( t , ret )
return ret
}
func newMonitor [ T any ] ( attach func ( fn func ( T ) ) ( cancel func ( ) ) ) * Subscriber [ T ] {
ret := & Subscriber [ T ] {
read : make ( chan T , 100 ) , // arbitrary, large
}
ret . unregister = attach ( ret . monitor )
return ret
}
func ( s * Subscriber [ T ] ) subscribeType ( ) reflect . Type {
return reflect . TypeFor [ T ] ( )
}
func ( s * Subscriber [ T ] ) dispatch ( ctx context . Context , vals * queue [ queuedEvent ] , acceptCh func ( ) chan queuedEvent ) bool {
func ( s * Subscriber [ T ] ) monitor ( debugEvent T ) {
select {
case s . read <- debugEvent :
case <- s . stop . Done ( ) :
}
}
func ( s * Subscriber [ T ] ) dispatch ( ctx context . Context , vals * queue [ DeliveredEvent ] , acceptCh func ( ) chan DeliveredEvent , snapshot chan chan [ ] DeliveredEvent ) bool {
t := vals . Peek ( ) . Event . ( T )
for {
// Keep the cases in this select in sync with subscribeState.pump
@ -189,7 +213,7 @@ func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[queuedEvent],
vals . Add ( val )
case <- ctx . Done ( ) :
return false
case ch := <- s . recv . s napshot:
case ch := <- snapshot :
ch <- vals . Snapshot ( )
}
}
@ -212,5 +236,5 @@ func (s *Subscriber[T]) Done() <-chan struct{} {
// [Subscriber.Events] block for ever.
func ( s * Subscriber [ T ] ) Close ( ) {
s . stop . Stop ( ) // unblock receivers
s . recv . deleteSubscriber ( reflect . TypeFor [ T ] ( ) )
s . unregister ( )
}