cmd/tailscale: add event bus queue depth debugging
Under extremely high load it appears we may have some retention issues as a result of queue depth build up, but there is currently no direct way to observe this. The scenario does not trigger the slow subscriber log message, and the event stream debugging endpoint produces a saturating volume of information. Updates tailscale/corp#36904 Signed-off-by: James Tucker <james@tailscale.com>
This commit is contained in:
@@ -6,6 +6,7 @@
|
||||
package localapi
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
@@ -35,6 +36,7 @@ func init() {
|
||||
Register("dev-set-state-store", (*Handler).serveDevSetStateStore)
|
||||
Register("debug-bus-events", (*Handler).serveDebugBusEvents)
|
||||
Register("debug-bus-graph", (*Handler).serveEventBusGraph)
|
||||
Register("debug-bus-queues", (*Handler).serveDebugBusQueues)
|
||||
Register("debug-derp-region", (*Handler).serveDebugDERPRegion)
|
||||
Register("debug-dial-types", (*Handler).serveDebugDialTypes)
|
||||
Register("debug-log", (*Handler).serveDebugLog)
|
||||
@@ -424,6 +426,62 @@ func (h *Handler) serveEventBusGraph(w http.ResponseWriter, r *http.Request) {
|
||||
json.NewEncoder(w).Encode(topics)
|
||||
}
|
||||
|
||||
func (h *Handler) serveDebugBusQueues(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != httpm.GET {
|
||||
http.Error(w, "GET required", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
bus, ok := h.LocalBackend().Sys().Bus.GetOK()
|
||||
if !ok {
|
||||
http.Error(w, "event bus not running", http.StatusPreconditionFailed)
|
||||
return
|
||||
}
|
||||
|
||||
debugger := bus.Debugger()
|
||||
|
||||
type clientQueue struct {
|
||||
Name string `json:"name"`
|
||||
SubscribeDepth int `json:"subscribeDepth"`
|
||||
SubscribeTypes []string `json:"subscribeTypes,omitempty"`
|
||||
PublishTypes []string `json:"publishTypes,omitempty"`
|
||||
}
|
||||
|
||||
publishQueue := debugger.PublishQueue()
|
||||
clients := debugger.Clients()
|
||||
result := struct {
|
||||
PublishQueueDepth int `json:"publishQueueDepth"`
|
||||
Clients []clientQueue `json:"clients"`
|
||||
}{
|
||||
PublishQueueDepth: len(publishQueue),
|
||||
}
|
||||
|
||||
for _, c := range clients {
|
||||
sq := debugger.SubscribeQueue(c)
|
||||
cq := clientQueue{
|
||||
Name: c.Name(),
|
||||
SubscribeDepth: len(sq),
|
||||
}
|
||||
for _, t := range debugger.SubscribeTypes(c) {
|
||||
cq.SubscribeTypes = append(cq.SubscribeTypes, t.String())
|
||||
}
|
||||
for _, t := range debugger.PublishTypes(c) {
|
||||
cq.PublishTypes = append(cq.PublishTypes, t.String())
|
||||
}
|
||||
result.Clients = append(result.Clients, cq)
|
||||
}
|
||||
|
||||
slices.SortFunc(result.Clients, func(a, b clientQueue) int {
|
||||
if a.SubscribeDepth != b.SubscribeDepth {
|
||||
return b.SubscribeDepth - a.SubscribeDepth
|
||||
}
|
||||
return cmp.Compare(a.Name, b.Name)
|
||||
})
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(result)
|
||||
}
|
||||
|
||||
func (h *Handler) serveDebugLog(w http.ResponseWriter, r *http.Request) {
|
||||
if !buildfeatures.HasLogTail {
|
||||
http.Error(w, feature.ErrUnavailable.Error(), http.StatusNotImplemented)
|
||||
|
||||
Reference in New Issue
Block a user