net/{batching,udprelay},wgengine/magicsock: add SO_RXQ_OVFL clientmetrics

For the purpose of improved observability of UDP socket receive buffer
overflows on Linux.

Updates tailscale/corp#37679

Signed-off-by: Jordan Whited <jordan@tailscale.com>
main
Jordan Whited 1 month ago committed by Jordan Whited
parent 660a4608d2
commit 96dde53b43
  1. 2
      net/batching/conn_default.go
  2. 172
      net/batching/conn_linux.go
  3. 205
      net/batching/conn_linux_test.go
  4. 2
      net/udprelay/server.go
  5. 2
      wgengine/magicsock/rebinding_conn.go

@ -10,7 +10,7 @@ import (
)
// TryUpgradeToConn is no-op on all platforms except linux.
func TryUpgradeToConn(pconn nettype.PacketConn, _ string, _ int) nettype.PacketConn {
func TryUpgradeToConn(pconn nettype.PacketConn, _ string, _ int, _ string) nettype.PacketConn {
return pconn
}

@ -24,6 +24,7 @@ import (
"tailscale.com/net/neterror"
"tailscale.com/net/packet"
"tailscale.com/types/nettype"
"tailscale.com/util/clientmetric"
)
// xnetBatchReaderWriter defines the batching i/o methods of
@ -58,6 +59,14 @@ type linuxBatchingConn struct {
setGSOSizeInControl func(control *[]byte, gsoSize uint16) // typically setGSOSizeInControl(); swappable for testing
getGSOSizeFromControl func(control []byte) (int, error) // typically getGSOSizeFromControl(); swappable for testing
sendBatchPool sync.Pool
rxqOverflowsMetric *clientmetric.Metric
// readOpMu guards read operations that must perform accounting against
// rxqOverflows in single-threaded fashion. There are no concurrent usages
// of read operations at the time of writing (2026-03-09), but it would be
// unidiomatic to push this responsibility onto callers.
readOpMu sync.Mutex
rxqOverflows uint32 // kernel pumps a cumulative counter, which we track to push a clientmetric delta value
}
func (c *linuxBatchingConn) ReadFromUDPAddrPort(p []byte) (n int, addr netip.AddrPort, err error) {
@ -285,16 +294,87 @@ func (c *linuxBatchingConn) splitCoalescedMessages(msgs []ipv6.Message, firstMsg
return n, nil
}
// getDataFromControl returns the data portion of the first control msg with
// matching cmsgLevel, matching cmsgType, and min data len of minDataLen, in
// control. If no matching cmsg is found or the len(control) < unix.SizeofCmsghdr,
// this function returns nil data. A non-nil error will be returned if
// len(control) > unix.SizeofCmsghdr but its contents cannot be parsed as a
// socket control message.
func getDataFromControl(control []byte, cmsgLevel, cmsgType int32, minDataLen int) ([]byte, error) {
var (
hdr unix.Cmsghdr
data []byte
rem = control
err error
)
for len(rem) > unix.SizeofCmsghdr {
hdr, data, rem, err = unix.ParseOneSocketControlMessage(rem)
if err != nil {
return nil, fmt.Errorf("error parsing socket control message: %w", err)
}
if hdr.Level == cmsgLevel && hdr.Type == cmsgType && len(data) >= minDataLen {
return data, nil
}
}
return nil, nil
}
// getRXQOverflowsFromControl returns the rxq overflows cumulative counter found
// in control. If no rxq counter is found or the len(control) < unix.SizeofCmsghdr,
// this function returns 0. A non-nil error will be returned if control is
// malformed.
func getRXQOverflowsFromControl(control []byte) (uint32, error) {
data, err := getDataFromControl(control, unix.SOL_SOCKET, unix.SO_RXQ_OVFL, 4)
if err != nil {
return 0, err
}
if len(data) >= 4 {
return binary.NativeEndian.Uint32(data), nil
}
return 0, nil
}
// handleRXQOverflowCounter handles any rx queue overflow counter contained in
// the tail of msgs.
func (c *linuxBatchingConn) handleRXQOverflowCounter(msgs []ipv6.Message, n int, rxErr error) {
if n == 0 || rxErr != nil || c.rxqOverflowsMetric == nil {
return
}
tailMsg := msgs[n-1] // we only care about the latest value as it's a cumulative counter
if tailMsg.NN == 0 {
return
}
rxqOverflows, err := getRXQOverflowsFromControl(tailMsg.OOB[:tailMsg.NN])
if err != nil {
return
}
// The counter is always present once nonzero on the kernel side. Compare it
// with our previous view, push the delta to the clientmetric, and update
// our view.
if rxqOverflows == c.rxqOverflows {
return
}
delta := int64(rxqOverflows - c.rxqOverflows)
c.rxqOverflowsMetric.Add(delta)
c.rxqOverflows = rxqOverflows
}
func (c *linuxBatchingConn) ReadBatch(msgs []ipv6.Message, flags int) (n int, err error) {
c.readOpMu.Lock()
defer c.readOpMu.Unlock()
if !c.rxOffload || len(msgs) < 2 {
return c.xpc.ReadBatch(msgs, flags)
n, err = c.xpc.ReadBatch(msgs, flags)
c.handleRXQOverflowCounter(msgs, n, err)
return n, err
}
// Read into the tail of msgs, split into the head.
readAt := len(msgs) - 2
numRead, err := c.xpc.ReadBatch(msgs[readAt:], 0)
if err != nil || numRead == 0 {
n, err = c.xpc.ReadBatch(msgs[readAt:], 0)
if err != nil || n == 0 {
return 0, err
}
c.handleRXQOverflowCounter(msgs[readAt:], n, err)
return c.splitCoalescedMessages(msgs, readAt)
}
@ -310,6 +390,21 @@ func (c *linuxBatchingConn) Close() error {
return c.pc.Close()
}
// tryEnableRXQOverflowsCounter attempts to enable the SO_RXQ_OVFL socket option
// on pconn, and returns the result. SO_RXQ_OVFL was added in Linux v2.6.33.
func tryEnableRXQOverflowsCounter(pconn nettype.PacketConn) (enabled bool) {
if c, ok := pconn.(*net.UDPConn); ok {
rc, err := c.SyscallConn()
if err != nil {
return
}
rc.Control(func(fd uintptr) {
enabled = syscall.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_RXQ_OVFL, 1) == nil
})
}
return enabled
}
// tryEnableUDPOffload attempts to enable the UDP_GRO socket option on pconn,
// and returns two booleans indicating TX and RX UDP offload support.
func tryEnableUDPOffload(pconn nettype.PacketConn) (hasTX bool, hasRX bool) {
@ -333,24 +428,14 @@ func tryEnableUDPOffload(pconn nettype.PacketConn) (hasTX bool, hasRX bool) {
// getGSOSizeFromControl returns the GSO size found in control. If no GSO size
// is found or the len(control) < unix.SizeofCmsghdr, this function returns 0.
// A non-nil error will be returned if len(control) > unix.SizeofCmsghdr but
// its contents cannot be parsed as a socket control message.
// A non-nil error will be returned if control is malformed.
func getGSOSizeFromControl(control []byte) (int, error) {
var (
hdr unix.Cmsghdr
data []byte
rem = control
err error
)
for len(rem) > unix.SizeofCmsghdr {
hdr, data, rem, err = unix.ParseOneSocketControlMessage(rem)
if err != nil {
return 0, fmt.Errorf("error parsing socket control message: %w", err)
}
if hdr.Level == unix.SOL_UDP && hdr.Type == unix.UDP_GRO && len(data) >= 2 {
return int(binary.NativeEndian.Uint16(data[:2])), nil
}
data, err := getDataFromControl(control, unix.SOL_UDP, unix.UDP_GRO, 2)
if err != nil {
return 0, err
}
if len(data) >= 2 {
return int(binary.NativeEndian.Uint16(data)), nil
}
return 0, nil
}
@ -374,10 +459,39 @@ func setGSOSizeInControl(control *[]byte, gsoSize uint16) {
*control = (*control)[:unix.CmsgSpace(2)]
}
var (
rxqOverflowsMetricsMu sync.Mutex
rxqOverflowsMetricsByName map[string]*clientmetric.Metric
)
// getRXQOverflowsMetric returns a counter-based [*clientmetric.Metric] for the
// provided name in a thread-safe manner. Callers may pass the same metric name
// multiple times, which is common across rebinds of the underlying, associated
// [Conn].
func getRXQOverflowsMetric(name string) *clientmetric.Metric {
if len(name) == 0 {
return nil
}
rxqOverflowsMetricsMu.Lock()
defer rxqOverflowsMetricsMu.Unlock()
m, ok := rxqOverflowsMetricsByName[name]
if ok {
return m
}
if rxqOverflowsMetricsByName == nil {
rxqOverflowsMetricsByName = make(map[string]*clientmetric.Metric)
}
m = clientmetric.NewCounter(name)
rxqOverflowsMetricsByName[name] = m
return m
}
// TryUpgradeToConn probes the capabilities of the OS and pconn, and upgrades
// pconn to a [Conn] if appropriate. A batch size of [IdealBatchSize] is
// suggested for the best performance.
func TryUpgradeToConn(pconn nettype.PacketConn, network string, batchSize int) nettype.PacketConn {
// suggested for the best performance. If len(rxqOverflowsMetricName) is
// nonzero, then read ops will propagate the SO_RXQ_OVFL control message counter
// to a clientmetric with the supplied name.
func TryUpgradeToConn(pconn nettype.PacketConn, network string, batchSize int, rxqOverflowsMetricName string) nettype.PacketConn {
if runtime.GOOS != "linux" {
// Exclude Android.
return pconn
@ -431,15 +545,23 @@ func TryUpgradeToConn(pconn nettype.PacketConn, network string, batchSize int) n
var txOffload bool
txOffload, b.rxOffload = tryEnableUDPOffload(uc)
b.txOffload.Store(txOffload)
if len(rxqOverflowsMetricName) > 0 && tryEnableRXQOverflowsCounter(uc) {
// Don't register the metric unless the socket option has been
// successfully set, otherwise we will report a misleading zero value
// counter on the wire. This is one reason why we prefer to handle
// clientmetric instantiation internally, vs letting callers pass them
// to TryUpgradeToConn.
b.rxqOverflowsMetric = getRXQOverflowsMetric(rxqOverflowsMetricName)
}
return b
}
var controlMessageSize = -1 // bomb if used for allocation before init
func init() {
// controlMessageSize is set to hold a UDP_GRO or UDP_SEGMENT control
// message. These contain a single uint16 of data.
controlMessageSize = unix.CmsgSpace(2)
controlMessageSize =
unix.CmsgSpace(2) + // UDP_GRO or UDP_SEGMENT gsoSize (uint16)
unix.CmsgSpace(4) // SO_RXQ_OVFL counter (uint32)
}
// MinControlMessageSize returns the minimum control message size required to

@ -5,10 +5,13 @@ package batching
import (
"encoding/binary"
"io"
"math"
"net"
"testing"
"unsafe"
qt "github.com/frankban/quicktest"
"github.com/tailscale/wireguard-go/conn"
"golang.org/x/net/ipv6"
"golang.org/x/sys/unix"
@ -317,35 +320,187 @@ func TestMinReadBatchMsgsLen(t *testing.T) {
}
}
func Test_getGSOSizeFromControl_MultipleMessages(t *testing.T) {
// Test that getGSOSizeFromControl correctly parses UDP_GRO when it's not the first control message.
const expectedGSOSize = 1420
func makeControlMsg(cmsgLevel, cmsgType int32, dataLen int) []byte {
msgLen := unix.CmsgSpace(dataLen)
msg := make([]byte, msgLen)
hdr2 := (*unix.Cmsghdr)(unsafe.Pointer(&msg[0]))
hdr2.Level = cmsgLevel
hdr2.Type = cmsgType
hdr2.SetLen(unix.CmsgLen(dataLen))
return msg
}
func gsoControl(gso uint16) []byte {
msg := makeControlMsg(unix.SOL_UDP, unix.UDP_GRO, 2)
binary.NativeEndian.PutUint16(msg[unix.SizeofCmsghdr:], gso)
return msg
}
func rxqOverflowsControl(count uint32) []byte {
msg := makeControlMsg(unix.SOL_SOCKET, unix.SO_RXQ_OVFL, 4)
binary.NativeEndian.PutUint32(msg[unix.SizeofCmsghdr:], count)
return msg
}
func Test_getRXQOverflowsMetric(t *testing.T) {
c := qt.New(t)
m := getRXQOverflowsMetric("")
c.Assert(m, qt.IsNil)
m = getRXQOverflowsMetric("rxq_overflows")
c.Assert(m, qt.IsNotNil)
wantM := getRXQOverflowsMetric("rxq_overflows")
c.Assert(m, qt.Equals, wantM)
uniq := getRXQOverflowsMetric("rxq_overflows_uniq")
c.Assert(m, qt.Not(qt.Equals), uniq)
}
// First message: IP_TOS
firstMsgLen := unix.CmsgSpace(1)
firstMsg := make([]byte, firstMsgLen)
hdr1 := (*unix.Cmsghdr)(unsafe.Pointer(&firstMsg[0]))
hdr1.Level = unix.SOL_IP
hdr1.Type = unix.IP_TOS
hdr1.SetLen(unix.CmsgLen(1))
firstMsg[unix.SizeofCmsghdr] = 0
func Test_getRXQOverflowsFromControl(t *testing.T) {
malformedControlMsg := gsoControl(1)
hdr := (*unix.Cmsghdr)(unsafe.Pointer(&malformedControlMsg[0]))
hdr.SetLen(1)
// Second message: UDP_GRO
secondMsgLen := unix.CmsgSpace(2)
secondMsg := make([]byte, secondMsgLen)
hdr2 := (*unix.Cmsghdr)(unsafe.Pointer(&secondMsg[0]))
hdr2.Level = unix.SOL_UDP
hdr2.Type = unix.UDP_GRO
hdr2.SetLen(unix.CmsgLen(2))
binary.NativeEndian.PutUint16(secondMsg[unix.SizeofCmsghdr:], expectedGSOSize)
tests := []struct {
name string
control []byte
want uint32
wantErr bool
}{
{
name: "malformed",
control: malformedControlMsg,
want: 0,
wantErr: true,
},
{
name: "gso",
control: gsoControl(1),
want: 0,
wantErr: false,
},
{
name: "rxq overflows",
control: rxqOverflowsControl(1),
want: 1,
wantErr: false,
},
{
name: "multiple cmsg rxq overflows at head",
control: append(rxqOverflowsControl(1), gsoControl(1)...),
want: 1,
wantErr: false,
},
{
name: "multiple cmsg rxq overflows at tail",
control: append(gsoControl(1), rxqOverflowsControl(1)...),
want: 1,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := getRXQOverflowsFromControl(tt.control)
if (err != nil) != tt.wantErr {
t.Errorf("getRXQOverflowsFromControl() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("getRXQOverflowsFromControl() got = %v, want %v", got, tt.want)
}
})
}
}
control := append(firstMsg, secondMsg...)
func Test_getGSOSizeFromControl(t *testing.T) {
malformedControlMsg := gsoControl(1)
hdr := (*unix.Cmsghdr)(unsafe.Pointer(&malformedControlMsg[0]))
hdr.SetLen(1)
gsoSize, err := getGSOSizeFromControl(control)
if err != nil {
t.Fatalf("unexpected error: %v", err)
tests := []struct {
name string
control []byte
want int
wantErr bool
}{
{
name: "malformed",
control: malformedControlMsg,
want: 0,
wantErr: true,
},
{
name: "gso",
control: gsoControl(1),
want: 1,
wantErr: false,
},
{
name: "rxq overflows",
control: rxqOverflowsControl(1),
want: 0,
wantErr: false,
},
{
name: "multiple cmsg gso at tail",
control: append(rxqOverflowsControl(1), gsoControl(1)...),
want: 1,
wantErr: false,
},
{
name: "multiple cmsg gso at head",
control: append(gsoControl(1), rxqOverflowsControl(1)...),
want: 1,
wantErr: false,
},
}
if gsoSize != expectedGSOSize {
t.Errorf("got GSO size %d, want %d", gsoSize, expectedGSOSize)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := getGSOSizeFromControl(tt.control)
if (err != nil) != tt.wantErr {
t.Errorf("getGSOSizeFromControl() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("getGSOSizeFromControl() got = %v, want %v", got, tt.want)
}
})
}
}
func Test_linuxBatchingConn_handleRXQOverflowCounter(t *testing.T) {
c := qt.New(t)
conn := &linuxBatchingConn{
rxqOverflowsMetric: getRXQOverflowsMetric("test_handleRXQOverflowCounter"),
}
conn.rxqOverflowsMetric.Set(0) // test count > 1 will accumulate, reset
// n == 0
conn.handleRXQOverflowCounter([]ipv6.Message{{}}, 0, nil)
c.Assert(conn.rxqOverflowsMetric.Value(), qt.Equals, int64(0))
// rxErr non-nil
conn.handleRXQOverflowCounter([]ipv6.Message{{}}, 0, io.EOF)
c.Assert(conn.rxqOverflowsMetric.Value(), qt.Equals, int64(0))
// nonzero counter
control := rxqOverflowsControl(1)
conn.handleRXQOverflowCounter([]ipv6.Message{{
OOB: control,
NN: len(control),
}}, 1, nil)
c.Assert(conn.rxqOverflowsMetric.Value(), qt.Equals, int64(1))
// nonzero counter, no change
conn.handleRXQOverflowCounter([]ipv6.Message{{
OOB: control,
NN: len(control),
}}, 1, nil)
c.Assert(conn.rxqOverflowsMetric.Value(), qt.Equals, int64(1))
// counter rollover
control = rxqOverflowsControl(0)
conn.handleRXQOverflowCounter([]ipv6.Message{{
OOB: control,
NN: len(control),
}}, 1, nil)
c.Assert(conn.rxqOverflowsMetric.Value(), qt.Equals, int64(1+math.MaxUint32))
}

@ -689,7 +689,7 @@ func (s *Server) bindSockets(desiredPort uint16) error {
break SocketsLoop
}
}
pc := batching.TryUpgradeToConn(uc, network, batching.IdealBatchSize)
pc := batching.TryUpgradeToConn(uc, network, batching.IdealBatchSize, "udprelay_rxq_overflows")
bc, ok := pc.(batching.Conn)
if !ok {
bc = &singlePacketConn{uc}

@ -43,7 +43,7 @@ type RebindingUDPConn struct {
// disrupting surrounding code that assumes nettype.PacketConn is a
// *net.UDPConn.
func (c *RebindingUDPConn) setConnLocked(p nettype.PacketConn, network string, batchSize int) {
upc := batching.TryUpgradeToConn(p, network, batchSize)
upc := batching.TryUpgradeToConn(p, network, batchSize, "magicsock_udp_rxq_overflows")
c.pconn = upc
c.pconnAtomic.Store(&upc)
c.port = uint16(c.localAddrLocked().Port)

Loading…
Cancel
Save