|
|
|
|
@ -8,8 +8,16 @@ import ( |
|
|
|
|
"fmt" |
|
|
|
|
"reflect" |
|
|
|
|
"sync" |
|
|
|
|
"time" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
type queuedEvent struct { |
|
|
|
|
Event any |
|
|
|
|
From *Client |
|
|
|
|
Published time.Time |
|
|
|
|
Routed time.Time |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// subscriber is a uniformly typed wrapper around Subscriber[T], so
|
|
|
|
|
// that debugging facilities can look at active subscribers.
|
|
|
|
|
type subscriber interface { |
|
|
|
|
@ -27,7 +35,7 @@ type subscriber interface { |
|
|
|
|
// processing other potential sources of wakeups, which is how we end
|
|
|
|
|
// up at this awkward type signature and sharing of internal state
|
|
|
|
|
// through dispatch.
|
|
|
|
|
dispatch(ctx context.Context, vals *queue[any], acceptCh func() chan any) bool |
|
|
|
|
dispatch(ctx context.Context, vals *queue[queuedEvent], acceptCh func() chan queuedEvent) bool |
|
|
|
|
Close() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -36,8 +44,8 @@ type subscribeState struct { |
|
|
|
|
client *Client |
|
|
|
|
|
|
|
|
|
dispatcher *worker |
|
|
|
|
write chan any |
|
|
|
|
snapshot chan chan []any |
|
|
|
|
write chan queuedEvent |
|
|
|
|
snapshot chan chan []queuedEvent |
|
|
|
|
|
|
|
|
|
outputsMu sync.Mutex |
|
|
|
|
outputs map[reflect.Type]subscriber |
|
|
|
|
@ -46,8 +54,8 @@ type subscribeState struct { |
|
|
|
|
func newSubscribeState(c *Client) *subscribeState { |
|
|
|
|
ret := &subscribeState{ |
|
|
|
|
client: c, |
|
|
|
|
write: make(chan any), |
|
|
|
|
snapshot: make(chan chan []any), |
|
|
|
|
write: make(chan queuedEvent), |
|
|
|
|
snapshot: make(chan chan []queuedEvent), |
|
|
|
|
outputs: map[reflect.Type]subscriber{}, |
|
|
|
|
} |
|
|
|
|
ret.dispatcher = runWorker(ret.pump) |
|
|
|
|
@ -55,8 +63,8 @@ func newSubscribeState(c *Client) *subscribeState { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (q *subscribeState) pump(ctx context.Context) { |
|
|
|
|
var vals queue[any] |
|
|
|
|
acceptCh := func() chan any { |
|
|
|
|
var vals queue[queuedEvent] |
|
|
|
|
acceptCh := func() chan queuedEvent { |
|
|
|
|
if vals.Full() { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
@ -65,7 +73,7 @@ func (q *subscribeState) pump(ctx context.Context) { |
|
|
|
|
for { |
|
|
|
|
if !vals.Empty() { |
|
|
|
|
val := vals.Peek() |
|
|
|
|
sub := q.subscriberFor(val) |
|
|
|
|
sub := q.subscriberFor(val.Event) |
|
|
|
|
if sub == nil { |
|
|
|
|
// Raced with unsubscribe.
|
|
|
|
|
vals.Drop() |
|
|
|
|
@ -155,8 +163,8 @@ func (s *Subscriber[T]) subscribeType() reflect.Type { |
|
|
|
|
return reflect.TypeFor[T]() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[any], acceptCh func() chan any) bool { |
|
|
|
|
t := vals.Peek().(T) |
|
|
|
|
func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[queuedEvent], acceptCh func() chan queuedEvent) bool { |
|
|
|
|
t := vals.Peek().Event.(T) |
|
|
|
|
for { |
|
|
|
|
// Keep the cases in this select in sync with subscribeState.pump
|
|
|
|
|
// above. The only different should be that this select
|
|
|
|
|
|