From 96dde53b43fc3b6b1c7aa254a10914347199cd2e Mon Sep 17 00:00:00 2001 From: Jordan Whited Date: Wed, 11 Mar 2026 13:02:09 -0700 Subject: [PATCH] net/{batching,udprelay},wgengine/magicsock: add SO_RXQ_OVFL clientmetrics For the purpose of improved observability of UDP socket receive buffer overflows on Linux. Updates tailscale/corp#37679 Signed-off-by: Jordan Whited --- net/batching/conn_default.go | 2 +- net/batching/conn_linux.go | 172 ++++++++++++++++++---- net/batching/conn_linux_test.go | 205 +++++++++++++++++++++++---- net/udprelay/server.go | 2 +- wgengine/magicsock/rebinding_conn.go | 2 +- 5 files changed, 330 insertions(+), 53 deletions(-) diff --git a/net/batching/conn_default.go b/net/batching/conn_default.go index 0d208578b..77c4c8b6a 100644 --- a/net/batching/conn_default.go +++ b/net/batching/conn_default.go @@ -10,7 +10,7 @@ import ( ) // TryUpgradeToConn is no-op on all platforms except linux. -func TryUpgradeToConn(pconn nettype.PacketConn, _ string, _ int) nettype.PacketConn { +func TryUpgradeToConn(pconn nettype.PacketConn, _ string, _ int, _ string) nettype.PacketConn { return pconn } diff --git a/net/batching/conn_linux.go b/net/batching/conn_linux.go index 70f91cfb6..b3702b72a 100644 --- a/net/batching/conn_linux.go +++ b/net/batching/conn_linux.go @@ -24,6 +24,7 @@ import ( "tailscale.com/net/neterror" "tailscale.com/net/packet" "tailscale.com/types/nettype" + "tailscale.com/util/clientmetric" ) // xnetBatchReaderWriter defines the batching i/o methods of @@ -58,6 +59,14 @@ type linuxBatchingConn struct { setGSOSizeInControl func(control *[]byte, gsoSize uint16) // typically setGSOSizeInControl(); swappable for testing getGSOSizeFromControl func(control []byte) (int, error) // typically getGSOSizeFromControl(); swappable for testing sendBatchPool sync.Pool + rxqOverflowsMetric *clientmetric.Metric + + // readOpMu guards read operations that must perform accounting against + // rxqOverflows in single-threaded fashion. There are no concurrent usages + // of read operations at the time of writing (2026-03-09), but it would be + // unidiomatic to push this responsibility onto callers. + readOpMu sync.Mutex + rxqOverflows uint32 // kernel pumps a cumulative counter, which we track to push a clientmetric delta value } func (c *linuxBatchingConn) ReadFromUDPAddrPort(p []byte) (n int, addr netip.AddrPort, err error) { @@ -285,16 +294,87 @@ func (c *linuxBatchingConn) splitCoalescedMessages(msgs []ipv6.Message, firstMsg return n, nil } +// getDataFromControl returns the data portion of the first control msg with +// matching cmsgLevel, matching cmsgType, and min data len of minDataLen, in +// control. If no matching cmsg is found or the len(control) < unix.SizeofCmsghdr, +// this function returns nil data. A non-nil error will be returned if +// len(control) > unix.SizeofCmsghdr but its contents cannot be parsed as a +// socket control message. +func getDataFromControl(control []byte, cmsgLevel, cmsgType int32, minDataLen int) ([]byte, error) { + var ( + hdr unix.Cmsghdr + data []byte + rem = control + err error + ) + + for len(rem) > unix.SizeofCmsghdr { + hdr, data, rem, err = unix.ParseOneSocketControlMessage(rem) + if err != nil { + return nil, fmt.Errorf("error parsing socket control message: %w", err) + } + if hdr.Level == cmsgLevel && hdr.Type == cmsgType && len(data) >= minDataLen { + return data, nil + } + } + return nil, nil +} + +// getRXQOverflowsFromControl returns the rxq overflows cumulative counter found +// in control. If no rxq counter is found or the len(control) < unix.SizeofCmsghdr, +// this function returns 0. A non-nil error will be returned if control is +// malformed. +func getRXQOverflowsFromControl(control []byte) (uint32, error) { + data, err := getDataFromControl(control, unix.SOL_SOCKET, unix.SO_RXQ_OVFL, 4) + if err != nil { + return 0, err + } + if len(data) >= 4 { + return binary.NativeEndian.Uint32(data), nil + } + return 0, nil +} + +// handleRXQOverflowCounter handles any rx queue overflow counter contained in +// the tail of msgs. +func (c *linuxBatchingConn) handleRXQOverflowCounter(msgs []ipv6.Message, n int, rxErr error) { + if n == 0 || rxErr != nil || c.rxqOverflowsMetric == nil { + return + } + tailMsg := msgs[n-1] // we only care about the latest value as it's a cumulative counter + if tailMsg.NN == 0 { + return + } + rxqOverflows, err := getRXQOverflowsFromControl(tailMsg.OOB[:tailMsg.NN]) + if err != nil { + return + } + // The counter is always present once nonzero on the kernel side. Compare it + // with our previous view, push the delta to the clientmetric, and update + // our view. + if rxqOverflows == c.rxqOverflows { + return + } + delta := int64(rxqOverflows - c.rxqOverflows) + c.rxqOverflowsMetric.Add(delta) + c.rxqOverflows = rxqOverflows +} + func (c *linuxBatchingConn) ReadBatch(msgs []ipv6.Message, flags int) (n int, err error) { + c.readOpMu.Lock() + defer c.readOpMu.Unlock() if !c.rxOffload || len(msgs) < 2 { - return c.xpc.ReadBatch(msgs, flags) + n, err = c.xpc.ReadBatch(msgs, flags) + c.handleRXQOverflowCounter(msgs, n, err) + return n, err } // Read into the tail of msgs, split into the head. readAt := len(msgs) - 2 - numRead, err := c.xpc.ReadBatch(msgs[readAt:], 0) - if err != nil || numRead == 0 { + n, err = c.xpc.ReadBatch(msgs[readAt:], 0) + if err != nil || n == 0 { return 0, err } + c.handleRXQOverflowCounter(msgs[readAt:], n, err) return c.splitCoalescedMessages(msgs, readAt) } @@ -310,6 +390,21 @@ func (c *linuxBatchingConn) Close() error { return c.pc.Close() } +// tryEnableRXQOverflowsCounter attempts to enable the SO_RXQ_OVFL socket option +// on pconn, and returns the result. SO_RXQ_OVFL was added in Linux v2.6.33. +func tryEnableRXQOverflowsCounter(pconn nettype.PacketConn) (enabled bool) { + if c, ok := pconn.(*net.UDPConn); ok { + rc, err := c.SyscallConn() + if err != nil { + return + } + rc.Control(func(fd uintptr) { + enabled = syscall.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_RXQ_OVFL, 1) == nil + }) + } + return enabled +} + // tryEnableUDPOffload attempts to enable the UDP_GRO socket option on pconn, // and returns two booleans indicating TX and RX UDP offload support. func tryEnableUDPOffload(pconn nettype.PacketConn) (hasTX bool, hasRX bool) { @@ -333,24 +428,14 @@ func tryEnableUDPOffload(pconn nettype.PacketConn) (hasTX bool, hasRX bool) { // getGSOSizeFromControl returns the GSO size found in control. If no GSO size // is found or the len(control) < unix.SizeofCmsghdr, this function returns 0. -// A non-nil error will be returned if len(control) > unix.SizeofCmsghdr but -// its contents cannot be parsed as a socket control message. +// A non-nil error will be returned if control is malformed. func getGSOSizeFromControl(control []byte) (int, error) { - var ( - hdr unix.Cmsghdr - data []byte - rem = control - err error - ) - - for len(rem) > unix.SizeofCmsghdr { - hdr, data, rem, err = unix.ParseOneSocketControlMessage(rem) - if err != nil { - return 0, fmt.Errorf("error parsing socket control message: %w", err) - } - if hdr.Level == unix.SOL_UDP && hdr.Type == unix.UDP_GRO && len(data) >= 2 { - return int(binary.NativeEndian.Uint16(data[:2])), nil - } + data, err := getDataFromControl(control, unix.SOL_UDP, unix.UDP_GRO, 2) + if err != nil { + return 0, err + } + if len(data) >= 2 { + return int(binary.NativeEndian.Uint16(data)), nil } return 0, nil } @@ -374,10 +459,39 @@ func setGSOSizeInControl(control *[]byte, gsoSize uint16) { *control = (*control)[:unix.CmsgSpace(2)] } +var ( + rxqOverflowsMetricsMu sync.Mutex + rxqOverflowsMetricsByName map[string]*clientmetric.Metric +) + +// getRXQOverflowsMetric returns a counter-based [*clientmetric.Metric] for the +// provided name in a thread-safe manner. Callers may pass the same metric name +// multiple times, which is common across rebinds of the underlying, associated +// [Conn]. +func getRXQOverflowsMetric(name string) *clientmetric.Metric { + if len(name) == 0 { + return nil + } + rxqOverflowsMetricsMu.Lock() + defer rxqOverflowsMetricsMu.Unlock() + m, ok := rxqOverflowsMetricsByName[name] + if ok { + return m + } + if rxqOverflowsMetricsByName == nil { + rxqOverflowsMetricsByName = make(map[string]*clientmetric.Metric) + } + m = clientmetric.NewCounter(name) + rxqOverflowsMetricsByName[name] = m + return m +} + // TryUpgradeToConn probes the capabilities of the OS and pconn, and upgrades // pconn to a [Conn] if appropriate. A batch size of [IdealBatchSize] is -// suggested for the best performance. -func TryUpgradeToConn(pconn nettype.PacketConn, network string, batchSize int) nettype.PacketConn { +// suggested for the best performance. If len(rxqOverflowsMetricName) is +// nonzero, then read ops will propagate the SO_RXQ_OVFL control message counter +// to a clientmetric with the supplied name. +func TryUpgradeToConn(pconn nettype.PacketConn, network string, batchSize int, rxqOverflowsMetricName string) nettype.PacketConn { if runtime.GOOS != "linux" { // Exclude Android. return pconn @@ -431,15 +545,23 @@ func TryUpgradeToConn(pconn nettype.PacketConn, network string, batchSize int) n var txOffload bool txOffload, b.rxOffload = tryEnableUDPOffload(uc) b.txOffload.Store(txOffload) + if len(rxqOverflowsMetricName) > 0 && tryEnableRXQOverflowsCounter(uc) { + // Don't register the metric unless the socket option has been + // successfully set, otherwise we will report a misleading zero value + // counter on the wire. This is one reason why we prefer to handle + // clientmetric instantiation internally, vs letting callers pass them + // to TryUpgradeToConn. + b.rxqOverflowsMetric = getRXQOverflowsMetric(rxqOverflowsMetricName) + } return b } var controlMessageSize = -1 // bomb if used for allocation before init func init() { - // controlMessageSize is set to hold a UDP_GRO or UDP_SEGMENT control - // message. These contain a single uint16 of data. - controlMessageSize = unix.CmsgSpace(2) + controlMessageSize = + unix.CmsgSpace(2) + // UDP_GRO or UDP_SEGMENT gsoSize (uint16) + unix.CmsgSpace(4) // SO_RXQ_OVFL counter (uint32) } // MinControlMessageSize returns the minimum control message size required to diff --git a/net/batching/conn_linux_test.go b/net/batching/conn_linux_test.go index a15de4f67..bc9e55a9d 100644 --- a/net/batching/conn_linux_test.go +++ b/net/batching/conn_linux_test.go @@ -5,10 +5,13 @@ package batching import ( "encoding/binary" + "io" + "math" "net" "testing" "unsafe" + qt "github.com/frankban/quicktest" "github.com/tailscale/wireguard-go/conn" "golang.org/x/net/ipv6" "golang.org/x/sys/unix" @@ -317,35 +320,187 @@ func TestMinReadBatchMsgsLen(t *testing.T) { } } -func Test_getGSOSizeFromControl_MultipleMessages(t *testing.T) { - // Test that getGSOSizeFromControl correctly parses UDP_GRO when it's not the first control message. - const expectedGSOSize = 1420 +func makeControlMsg(cmsgLevel, cmsgType int32, dataLen int) []byte { + msgLen := unix.CmsgSpace(dataLen) + msg := make([]byte, msgLen) + hdr2 := (*unix.Cmsghdr)(unsafe.Pointer(&msg[0])) + hdr2.Level = cmsgLevel + hdr2.Type = cmsgType + hdr2.SetLen(unix.CmsgLen(dataLen)) + return msg +} + +func gsoControl(gso uint16) []byte { + msg := makeControlMsg(unix.SOL_UDP, unix.UDP_GRO, 2) + binary.NativeEndian.PutUint16(msg[unix.SizeofCmsghdr:], gso) + return msg +} + +func rxqOverflowsControl(count uint32) []byte { + msg := makeControlMsg(unix.SOL_SOCKET, unix.SO_RXQ_OVFL, 4) + binary.NativeEndian.PutUint32(msg[unix.SizeofCmsghdr:], count) + return msg +} + +func Test_getRXQOverflowsMetric(t *testing.T) { + c := qt.New(t) + m := getRXQOverflowsMetric("") + c.Assert(m, qt.IsNil) + m = getRXQOverflowsMetric("rxq_overflows") + c.Assert(m, qt.IsNotNil) + wantM := getRXQOverflowsMetric("rxq_overflows") + c.Assert(m, qt.Equals, wantM) + uniq := getRXQOverflowsMetric("rxq_overflows_uniq") + c.Assert(m, qt.Not(qt.Equals), uniq) +} - // First message: IP_TOS - firstMsgLen := unix.CmsgSpace(1) - firstMsg := make([]byte, firstMsgLen) - hdr1 := (*unix.Cmsghdr)(unsafe.Pointer(&firstMsg[0])) - hdr1.Level = unix.SOL_IP - hdr1.Type = unix.IP_TOS - hdr1.SetLen(unix.CmsgLen(1)) - firstMsg[unix.SizeofCmsghdr] = 0 +func Test_getRXQOverflowsFromControl(t *testing.T) { + malformedControlMsg := gsoControl(1) + hdr := (*unix.Cmsghdr)(unsafe.Pointer(&malformedControlMsg[0])) + hdr.SetLen(1) - // Second message: UDP_GRO - secondMsgLen := unix.CmsgSpace(2) - secondMsg := make([]byte, secondMsgLen) - hdr2 := (*unix.Cmsghdr)(unsafe.Pointer(&secondMsg[0])) - hdr2.Level = unix.SOL_UDP - hdr2.Type = unix.UDP_GRO - hdr2.SetLen(unix.CmsgLen(2)) - binary.NativeEndian.PutUint16(secondMsg[unix.SizeofCmsghdr:], expectedGSOSize) + tests := []struct { + name string + control []byte + want uint32 + wantErr bool + }{ + { + name: "malformed", + control: malformedControlMsg, + want: 0, + wantErr: true, + }, + { + name: "gso", + control: gsoControl(1), + want: 0, + wantErr: false, + }, + { + name: "rxq overflows", + control: rxqOverflowsControl(1), + want: 1, + wantErr: false, + }, + { + name: "multiple cmsg rxq overflows at head", + control: append(rxqOverflowsControl(1), gsoControl(1)...), + want: 1, + wantErr: false, + }, + { + name: "multiple cmsg rxq overflows at tail", + control: append(gsoControl(1), rxqOverflowsControl(1)...), + want: 1, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := getRXQOverflowsFromControl(tt.control) + if (err != nil) != tt.wantErr { + t.Errorf("getRXQOverflowsFromControl() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("getRXQOverflowsFromControl() got = %v, want %v", got, tt.want) + } + }) + } +} - control := append(firstMsg, secondMsg...) +func Test_getGSOSizeFromControl(t *testing.T) { + malformedControlMsg := gsoControl(1) + hdr := (*unix.Cmsghdr)(unsafe.Pointer(&malformedControlMsg[0])) + hdr.SetLen(1) - gsoSize, err := getGSOSizeFromControl(control) - if err != nil { - t.Fatalf("unexpected error: %v", err) + tests := []struct { + name string + control []byte + want int + wantErr bool + }{ + { + name: "malformed", + control: malformedControlMsg, + want: 0, + wantErr: true, + }, + { + name: "gso", + control: gsoControl(1), + want: 1, + wantErr: false, + }, + { + name: "rxq overflows", + control: rxqOverflowsControl(1), + want: 0, + wantErr: false, + }, + { + name: "multiple cmsg gso at tail", + control: append(rxqOverflowsControl(1), gsoControl(1)...), + want: 1, + wantErr: false, + }, + { + name: "multiple cmsg gso at head", + control: append(gsoControl(1), rxqOverflowsControl(1)...), + want: 1, + wantErr: false, + }, } - if gsoSize != expectedGSOSize { - t.Errorf("got GSO size %d, want %d", gsoSize, expectedGSOSize) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := getGSOSizeFromControl(tt.control) + if (err != nil) != tt.wantErr { + t.Errorf("getGSOSizeFromControl() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("getGSOSizeFromControl() got = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_linuxBatchingConn_handleRXQOverflowCounter(t *testing.T) { + c := qt.New(t) + conn := &linuxBatchingConn{ + rxqOverflowsMetric: getRXQOverflowsMetric("test_handleRXQOverflowCounter"), } + conn.rxqOverflowsMetric.Set(0) // test count > 1 will accumulate, reset + + // n == 0 + conn.handleRXQOverflowCounter([]ipv6.Message{{}}, 0, nil) + c.Assert(conn.rxqOverflowsMetric.Value(), qt.Equals, int64(0)) + + // rxErr non-nil + conn.handleRXQOverflowCounter([]ipv6.Message{{}}, 0, io.EOF) + c.Assert(conn.rxqOverflowsMetric.Value(), qt.Equals, int64(0)) + + // nonzero counter + control := rxqOverflowsControl(1) + conn.handleRXQOverflowCounter([]ipv6.Message{{ + OOB: control, + NN: len(control), + }}, 1, nil) + c.Assert(conn.rxqOverflowsMetric.Value(), qt.Equals, int64(1)) + + // nonzero counter, no change + conn.handleRXQOverflowCounter([]ipv6.Message{{ + OOB: control, + NN: len(control), + }}, 1, nil) + c.Assert(conn.rxqOverflowsMetric.Value(), qt.Equals, int64(1)) + + // counter rollover + control = rxqOverflowsControl(0) + conn.handleRXQOverflowCounter([]ipv6.Message{{ + OOB: control, + NN: len(control), + }}, 1, nil) + c.Assert(conn.rxqOverflowsMetric.Value(), qt.Equals, int64(1+math.MaxUint32)) } diff --git a/net/udprelay/server.go b/net/udprelay/server.go index 7dd89920e..6f066762d 100644 --- a/net/udprelay/server.go +++ b/net/udprelay/server.go @@ -689,7 +689,7 @@ func (s *Server) bindSockets(desiredPort uint16) error { break SocketsLoop } } - pc := batching.TryUpgradeToConn(uc, network, batching.IdealBatchSize) + pc := batching.TryUpgradeToConn(uc, network, batching.IdealBatchSize, "udprelay_rxq_overflows") bc, ok := pc.(batching.Conn) if !ok { bc = &singlePacketConn{uc} diff --git a/wgengine/magicsock/rebinding_conn.go b/wgengine/magicsock/rebinding_conn.go index e00eed1f5..11398c592 100644 --- a/wgengine/magicsock/rebinding_conn.go +++ b/wgengine/magicsock/rebinding_conn.go @@ -43,7 +43,7 @@ type RebindingUDPConn struct { // disrupting surrounding code that assumes nettype.PacketConn is a // *net.UDPConn. func (c *RebindingUDPConn) setConnLocked(p nettype.PacketConn, network string, batchSize int) { - upc := batching.TryUpgradeToConn(p, network, batchSize) + upc := batching.TryUpgradeToConn(p, network, batchSize, "magicsock_udp_rxq_overflows") c.pconn = upc c.pconnAtomic.Store(&upc) c.port = uint16(c.localAddrLocked().Port)