net/udprelay: expose peer relay metrics (#18218)
Adding both user and client metrics for peer relay forwarded bytes and
packets, and the total endpoints gauge.
User metrics:
tailscaled_peer_relay_forwarded_packets_total{transport_in, transport_out}
tailscaled_peer_relay_forwarded_bytes_total{transport_in, transport_out}
tailscaled_peer_relay_endpoints_total{}
Where the transport labels can be of "udp4" or "udp6".
Client metrics:
udprelay_forwarded_(packets|bytes)_udp(4|6)_udp(4|6)
udprelay_endpoints
RELNOTE: Expose tailscaled metrics for peer relay.
Updates tailscale/corp#30820
Change-Id: I1a905d15bdc5ee84e28017e0b93210e2d9660259
Signed-off-by: Alex Valiushko <alexvaliushko@tailscale.com>
This commit is contained in:
+48
-10
@@ -43,6 +43,7 @@ import (
|
||||
"tailscale.com/types/views"
|
||||
"tailscale.com/util/eventbus"
|
||||
"tailscale.com/util/set"
|
||||
"tailscale.com/util/usermetric"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -76,6 +77,7 @@ type Server struct {
|
||||
wg sync.WaitGroup
|
||||
closeCh chan struct{}
|
||||
netChecker *netcheck.Client
|
||||
metrics *metrics
|
||||
|
||||
mu sync.Mutex // guards the following fields
|
||||
macSecrets views.Slice[[blake2s.Size]byte] // [0] is most recent, max 2 elements
|
||||
@@ -320,8 +322,8 @@ func (e *serverEndpoint) isBoundLocked() bool {
|
||||
// port selection is left up to the host networking stack. If
|
||||
// onlyStaticAddrPorts is true, then dynamic addr:port discovery will be
|
||||
// disabled, and only addr:port's set via [Server.SetStaticAddrPorts] will be
|
||||
// used.
|
||||
func NewServer(logf logger.Logf, port uint16, onlyStaticAddrPorts bool) (s *Server, err error) {
|
||||
// used. Metrics must be non-nil.
|
||||
func NewServer(logf logger.Logf, port uint16, onlyStaticAddrPorts bool, metrics *usermetric.Registry) (s *Server, err error) {
|
||||
s = &Server{
|
||||
logf: logf,
|
||||
disco: key.NewDisco(),
|
||||
@@ -333,6 +335,7 @@ func NewServer(logf logger.Logf, port uint16, onlyStaticAddrPorts bool) (s *Serv
|
||||
nextVNI: minVNI,
|
||||
}
|
||||
s.discoPublic = s.disco.Public()
|
||||
s.metrics = registerMetrics(metrics)
|
||||
|
||||
// TODO(creachadair): Find a way to plumb this in during initialization.
|
||||
// As-written, messages published here will not be seen by other components
|
||||
@@ -670,6 +673,7 @@ func (s *Server) endpointGCLoop() {
|
||||
defer s.mu.Unlock()
|
||||
for k, v := range s.serverEndpointByDisco {
|
||||
if v.isExpired(now, s.bindLifetime, s.steadyStateLifetime) {
|
||||
s.metrics.addEndpoints(-1)
|
||||
delete(s.serverEndpointByDisco, k)
|
||||
s.serverEndpointByVNI.Delete(v.vni)
|
||||
}
|
||||
@@ -686,36 +690,50 @@ func (s *Server) endpointGCLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handlePacket(from netip.AddrPort, b []byte) (write []byte, to netip.AddrPort) {
|
||||
// handlePacket unwraps headers and dispatches packet handling according to its
|
||||
// type and destination. If the returned address is valid, write will contain data
|
||||
// to transmit, and isDataPacket signals whether input was a data packet or OOB
|
||||
// signaling.
|
||||
//
|
||||
// write, to, isDataPacket := s.handlePacket(from, buf)
|
||||
// if to.IsValid() && isDataPacket {
|
||||
// // ..handle data transmission
|
||||
// }
|
||||
|
||||
func (s *Server) handlePacket(from netip.AddrPort, b []byte) (write []byte, to netip.AddrPort, isDataPacket bool) {
|
||||
if stun.Is(b) && b[1] == 0x01 {
|
||||
// A b[1] value of 0x01 (STUN method binding) is sufficiently
|
||||
// non-overlapping with the Geneve header where the LSB is always 0
|
||||
// (part of 6 "reserved" bits).
|
||||
s.netChecker.ReceiveSTUNPacket(b, from)
|
||||
return nil, netip.AddrPort{}
|
||||
return nil, netip.AddrPort{}, false
|
||||
}
|
||||
gh := packet.GeneveHeader{}
|
||||
err := gh.Decode(b)
|
||||
if err != nil {
|
||||
return nil, netip.AddrPort{}
|
||||
return nil, netip.AddrPort{}, false
|
||||
}
|
||||
e, ok := s.serverEndpointByVNI.Load(gh.VNI.Get())
|
||||
if !ok {
|
||||
// unknown VNI
|
||||
return nil, netip.AddrPort{}
|
||||
return nil, netip.AddrPort{}, false
|
||||
}
|
||||
|
||||
now := mono.Now()
|
||||
if gh.Control {
|
||||
if gh.Protocol != packet.GeneveProtocolDisco {
|
||||
// control packet, but not Disco
|
||||
return nil, netip.AddrPort{}
|
||||
return nil, netip.AddrPort{}, false
|
||||
}
|
||||
msg := b[packet.GeneveFixedHeaderLength:]
|
||||
secrets := s.getMACSecrets(now)
|
||||
return e.(*serverEndpoint).handleSealedDiscoControlMsg(from, msg, s.discoPublic, secrets, now)
|
||||
write, to = e.(*serverEndpoint).handleSealedDiscoControlMsg(from, msg, s.discoPublic, secrets, now)
|
||||
isDataPacket = false
|
||||
return
|
||||
}
|
||||
return e.(*serverEndpoint).handleDataPacket(from, b, now)
|
||||
write, to = e.(*serverEndpoint).handleDataPacket(from, b, now)
|
||||
isDataPacket = true
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Server) getMACSecrets(now mono.Time) views.Slice[[blake2s.Size]byte] {
|
||||
@@ -783,16 +801,32 @@ func (s *Server) packetReadLoop(readFromSocket, otherSocket batching.Conn, readF
|
||||
return
|
||||
}
|
||||
|
||||
// Aggregate counts for the packet batch before writing metrics.
|
||||
forwardedByOutAF := struct {
|
||||
bytes4 int64
|
||||
packets4 int64
|
||||
bytes6 int64
|
||||
packets6 int64
|
||||
}{}
|
||||
for _, msg := range msgs[:n] {
|
||||
if msg.N == 0 {
|
||||
continue
|
||||
}
|
||||
buf := msg.Buffers[0][:msg.N]
|
||||
from := msg.Addr.(*net.UDPAddr).AddrPort()
|
||||
write, to := s.handlePacket(from, buf)
|
||||
write, to, isDataPacket := s.handlePacket(from, buf)
|
||||
if !to.IsValid() {
|
||||
continue
|
||||
}
|
||||
if isDataPacket {
|
||||
if to.Addr().Is4() {
|
||||
forwardedByOutAF.bytes4 += int64(len(write))
|
||||
forwardedByOutAF.packets4++
|
||||
} else {
|
||||
forwardedByOutAF.bytes6 += int64(len(write))
|
||||
forwardedByOutAF.packets6++
|
||||
}
|
||||
}
|
||||
if from.Addr().Is4() == to.Addr().Is4() || otherSocket != nil {
|
||||
buffs, ok := writeBuffsByDest[to]
|
||||
if !ok {
|
||||
@@ -823,6 +857,9 @@ func (s *Server) packetReadLoop(readFromSocket, otherSocket batching.Conn, readF
|
||||
}
|
||||
delete(writeBuffsByDest, dest)
|
||||
}
|
||||
|
||||
s.metrics.countForwarded(readFromSocketIsIPv4, true, forwardedByOutAF.bytes4, forwardedByOutAF.packets4)
|
||||
s.metrics.countForwarded(readFromSocketIsIPv4, false, forwardedByOutAF.bytes6, forwardedByOutAF.packets6)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -932,6 +969,7 @@ func (s *Server) AllocateEndpoint(discoA, discoB key.DiscoPublic) (endpoint.Serv
|
||||
s.serverEndpointByVNI.Store(e.vni, e)
|
||||
|
||||
s.logf("allocated endpoint vni=%d lamportID=%d disco[0]=%v disco[1]=%v", e.vni, e.lamportID, pair.Get()[0].ShortString(), pair.Get()[1].ShortString())
|
||||
s.metrics.addEndpoints(1)
|
||||
return endpoint.ServerEndpoint{
|
||||
ServerDisco: s.discoPublic,
|
||||
ClientDisco: pair.Get(),
|
||||
|
||||
Reference in New Issue
Block a user