wgengine/magicsock: deflake TestTwoDevicePing compare-metrics-stats

The compare-metrics-stats subtest reset two independent counting
systems (physical connection counters and expvar.Int user metrics)
non-atomically. Background WireGuard keepalives arriving between the
resets could increment one system but not the other, causing
off-by-one packet/byte mismatches in either direction.

Replace the reset-then-compare pattern with snapshot-and-delta:
snapshot both systems before pings, snapshot again after, and compare
the deltas. This eliminates the non-atomic reset window entirely.
As a belt-and-suspenders safety net, tolerate a difference of exactly
one packet (and corresponding bytes) from a stray keepalive that
could still arrive in the narrow window between the two snapshots.

flakestress passes with ~5900 runs (~2800 without -race, ~3100 with
-race) but it also passed previously too. This is an annoying one to
repro.

Fixes #11762

Change-Id: I3447ad67e71c8146e85eed38b7a665033ef9e284
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
Brad Fitzpatrick
2026-04-13 21:18:32 +00:00
committed by Brad Fitzpatrick
parent 49eb1b5d26
commit 6aa10576c9
+113 -78
View File
@@ -39,7 +39,6 @@ import (
"go4.org/mem" "go4.org/mem"
"golang.org/x/net/icmp" "golang.org/x/net/icmp"
"golang.org/x/net/ipv4" "golang.org/x/net/ipv4"
"tailscale.com/cmd/testwrapper/flakytest"
"tailscale.com/control/controlknobs" "tailscale.com/control/controlknobs"
"tailscale.com/derp/derpserver" "tailscale.com/derp/derpserver"
"tailscale.com/disco" "tailscale.com/disco"
@@ -733,7 +732,6 @@ func (localhostListener) ListenPacket(ctx context.Context, network, address stri
} }
func TestTwoDevicePing(t *testing.T) { func TestTwoDevicePing(t *testing.T) {
flakytest.Mark(t, "https://github.com/tailscale/tailscale/issues/11762")
ln, ip := localhostListener{}, netaddr.IPv4(127, 0, 0, 1) ln, ip := localhostListener{}, netaddr.IPv4(127, 0, 0, 1)
n := &devices{ n := &devices{
m1: ln, m1: ln,
@@ -1265,96 +1263,133 @@ func testTwoDevicePing(t *testing.T, d *devices) {
t.Run("compare-metrics-stats", func(t *testing.T) { t.Run("compare-metrics-stats", func(t *testing.T) {
setT(t) setT(t)
defer setT(outerT) defer setT(outerT)
m1.counts.Reset()
m2.counts.Reset() // Snapshot both counting systems before pings rather than
m1.conn.resetMetricsForTest() // resetting them. Resetting two independent systems
m2.conn.resetMetricsForTest() // non-atomically left a window where background WireGuard
t.Logf("Metrics before: %s\n", m1.metrics.String()) // keepalives could increment one system but not the other,
// causing flaky off-by-one mismatches.
physBefore1, metricBefore1 := snapshotCounts(m1)
physBefore2, metricBefore2 := snapshotCounts(m2)
ping1(t) ping1(t)
ping2(t) ping2(t)
assertConnStatsAndUserMetricsEqual(t, m1)
assertConnStatsAndUserMetricsEqual(t, m2) assertConnStatDeltasMatchMetricDeltas(t, m1, physBefore1, metricBefore1)
assertConnStatDeltasMatchMetricDeltas(t, m2, physBefore2, metricBefore2)
assertGlobalMetricsMatchPerConn(t, m1, m2) assertGlobalMetricsMatchPerConn(t, m1, m2)
t.Logf("Metrics after: %s\n", m1.metrics.String())
}) })
} }
func (c *Conn) resetMetricsForTest() { // countSnapshot holds a point-in-time snapshot of packet/byte statistics,
c.metrics.inboundBytesIPv4Total.Set(0) // categorized by transport type (IPv4 vs DERP).
c.metrics.inboundPacketsIPv4Total.Set(0) type countSnapshot struct {
c.metrics.outboundBytesIPv4Total.Set(0) ipv4RxBytes, ipv4TxBytes int64
c.metrics.outboundPacketsIPv4Total.Set(0) ipv4RxPackets, ipv4TxPackets int64
c.metrics.inboundBytesIPv6Total.Set(0) derpRxBytes, derpTxBytes int64
c.metrics.inboundPacketsIPv6Total.Set(0) derpRxPackets, derpTxPackets int64
c.metrics.outboundBytesIPv6Total.Set(0)
c.metrics.outboundPacketsIPv6Total.Set(0)
c.metrics.inboundBytesDERPTotal.Set(0)
c.metrics.inboundPacketsDERPTotal.Set(0)
c.metrics.outboundBytesDERPTotal.Set(0)
c.metrics.outboundPacketsDERPTotal.Set(0)
} }
func assertConnStatsAndUserMetricsEqual(t *testing.T, ms *magicStack) { // snapshotCounts captures the current physical connection counter values and
t.Helper() // user metrics for ms, returning them as separate snapshots. Reading both
physIPv4RxBytes := int64(0) // systems back-to-back (rather than resetting them non-atomically) avoids a
physIPv4TxBytes := int64(0) // race where background WireGuard keepalives could increment one system but
physDERPRxBytes := int64(0) // not the other during a reset window.
physDERPTxBytes := int64(0) func snapshotCounts(ms *magicStack) (phys, metric countSnapshot) {
physIPv4RxPackets := int64(0)
physIPv4TxPackets := int64(0)
physDERPRxPackets := int64(0)
physDERPTxPackets := int64(0)
for conn, count := range ms.counts.Clone() { for conn, count := range ms.counts.Clone() {
t.Logf("physconn src: %s, dst: %s", conn.Src.String(), conn.Dst.String())
if conn.Dst.String() == "127.3.3.40:1" { if conn.Dst.String() == "127.3.3.40:1" {
physDERPRxBytes += int64(count.RxBytes) phys.derpRxBytes += int64(count.RxBytes)
physDERPTxBytes += int64(count.TxBytes) phys.derpTxBytes += int64(count.TxBytes)
physDERPRxPackets += int64(count.RxPackets) phys.derpRxPackets += int64(count.RxPackets)
physDERPTxPackets += int64(count.TxPackets) phys.derpTxPackets += int64(count.TxPackets)
} else { } else {
physIPv4RxBytes += int64(count.RxBytes) phys.ipv4RxBytes += int64(count.RxBytes)
physIPv4TxBytes += int64(count.TxBytes) phys.ipv4TxBytes += int64(count.TxBytes)
physIPv4RxPackets += int64(count.RxPackets) phys.ipv4RxPackets += int64(count.RxPackets)
physIPv4TxPackets += int64(count.TxPackets) phys.ipv4TxPackets += int64(count.TxPackets)
} }
} }
metric = countSnapshot{
metricIPv4RxBytes := ms.conn.metrics.inboundBytesIPv4Total.Value() ipv4RxBytes: ms.conn.metrics.inboundBytesIPv4Total.Value(),
metricIPv4RxPackets := ms.conn.metrics.inboundPacketsIPv4Total.Value() ipv4TxBytes: ms.conn.metrics.outboundBytesIPv4Total.Value(),
metricIPv4TxBytes := ms.conn.metrics.outboundBytesIPv4Total.Value() ipv4RxPackets: ms.conn.metrics.inboundPacketsIPv4Total.Value(),
metricIPv4TxPackets := ms.conn.metrics.outboundPacketsIPv4Total.Value() ipv4TxPackets: ms.conn.metrics.outboundPacketsIPv4Total.Value(),
derpRxBytes: ms.conn.metrics.inboundBytesDERPTotal.Value(),
metricDERPRxBytes := ms.conn.metrics.inboundBytesDERPTotal.Value() derpTxBytes: ms.conn.metrics.outboundBytesDERPTotal.Value(),
metricDERPRxPackets := ms.conn.metrics.inboundPacketsDERPTotal.Value() derpRxPackets: ms.conn.metrics.inboundPacketsDERPTotal.Value(),
metricDERPTxBytes := ms.conn.metrics.outboundBytesDERPTotal.Value() derpTxPackets: ms.conn.metrics.outboundPacketsDERPTotal.Value(),
metricDERPTxPackets := ms.conn.metrics.outboundPacketsDERPTotal.Value() }
return phys, metric
// Reset counts after reading all values to minimize the window where a }
// background packet could increment metrics but miss the cloned counts.
ms.counts.Reset() // assertConnStatDeltasMatchMetricDeltas checks that the changes in physical
// connection counters since physBefore match the changes in user metrics since
// Compare physical connection stats with per-conn user metrics. // metricBefore. Using deltas avoids a race from non-atomically resetting the
// A rebind during the measurement window can reset the physical connection // two independent counting systems.
// counter, causing physical stats to show 0 while user metrics recorded //
// packets normally. Tolerate this by logging instead of failing. // As a safety net, a difference of exactly one packet (and the corresponding
checkPhysVsMetric := func(phys, metric int64, name string) { // bytes) is tolerated, since a background WireGuard keepalive could still
if phys == metric { // arrive in the narrow window between snapshotting the two systems.
return func assertConnStatDeltasMatchMetricDeltas(t *testing.T, ms *magicStack, physBefore, metricBefore countSnapshot) {
} t.Helper()
if phys == 0 && metric > 0 { physAfter, metricAfter := snapshotCounts(ms)
t.Logf("%s: physical counter is 0 but metric is %d (possible rebind during measurement)", name, metric)
return type stat struct {
} name string
t.Errorf("%s: physical=%d, metric=%d", name, phys, metric) physDelta, metDelta int64
isPackets bool // true for packet counts, false for byte counts
packetDeltaTolerated bool // set by packet check, used by byte check
}
stats := []stat{
{name: "IPv4RxPackets", physDelta: physAfter.ipv4RxPackets - physBefore.ipv4RxPackets, metDelta: metricAfter.ipv4RxPackets - metricBefore.ipv4RxPackets, isPackets: true},
{name: "IPv4RxBytes", physDelta: physAfter.ipv4RxBytes - physBefore.ipv4RxBytes, metDelta: metricAfter.ipv4RxBytes - metricBefore.ipv4RxBytes},
{name: "IPv4TxPackets", physDelta: physAfter.ipv4TxPackets - physBefore.ipv4TxPackets, metDelta: metricAfter.ipv4TxPackets - metricBefore.ipv4TxPackets, isPackets: true},
{name: "IPv4TxBytes", physDelta: physAfter.ipv4TxBytes - physBefore.ipv4TxBytes, metDelta: metricAfter.ipv4TxBytes - metricBefore.ipv4TxBytes},
{name: "DERPRxPackets", physDelta: physAfter.derpRxPackets - physBefore.derpRxPackets, metDelta: metricAfter.derpRxPackets - metricBefore.derpRxPackets, isPackets: true},
{name: "DERPRxBytes", physDelta: physAfter.derpRxBytes - physBefore.derpRxBytes, metDelta: metricAfter.derpRxBytes - metricBefore.derpRxBytes},
{name: "DERPTxPackets", physDelta: physAfter.derpTxPackets - physBefore.derpTxPackets, metDelta: metricAfter.derpTxPackets - metricBefore.derpTxPackets, isPackets: true},
{name: "DERPTxBytes", physDelta: physAfter.derpTxBytes - physBefore.derpTxBytes, metDelta: metricAfter.derpTxBytes - metricBefore.derpTxBytes},
}
// First pass: check packet counts, tolerating ±1 from stray keepalives.
for i := range stats {
s := &stats[i]
if !s.isPackets {
continue
}
if s.physDelta == s.metDelta {
continue
}
diff := s.physDelta - s.metDelta
if diff < 0 {
diff = -diff
}
if diff <= 1 {
s.packetDeltaTolerated = true
t.Logf("%s: physical delta=%d, metric delta=%d (off by 1, likely background WireGuard keepalive)", s.name, s.physDelta, s.metDelta)
continue
}
t.Errorf("%s: physical delta=%d, metric delta=%d", s.name, s.physDelta, s.metDelta)
}
// Second pass: check byte counts; tolerate mismatches when the
// corresponding packet count was already tolerated.
for i := range stats {
s := &stats[i]
if s.isPackets {
continue
}
if s.physDelta == s.metDelta {
continue
}
// The preceding entry in the slice is always the corresponding packet stat.
if stats[i-1].packetDeltaTolerated {
t.Logf("%s: physical delta=%d, metric delta=%d (within single-packet tolerance)", s.name, s.physDelta, s.metDelta)
continue
}
t.Errorf("%s: physical delta=%d, metric delta=%d", s.name, s.physDelta, s.metDelta)
} }
checkPhysVsMetric(physDERPRxBytes, metricDERPRxBytes, "DERPRxBytes")
checkPhysVsMetric(physDERPTxBytes, metricDERPTxBytes, "DERPTxBytes")
checkPhysVsMetric(physIPv4RxBytes, metricIPv4RxBytes, "IPv4RxBytes")
checkPhysVsMetric(physIPv4TxBytes, metricIPv4TxBytes, "IPv4TxBytes")
checkPhysVsMetric(physDERPRxPackets, metricDERPRxPackets, "DERPRxPackets")
checkPhysVsMetric(physDERPTxPackets, metricDERPTxPackets, "DERPTxPackets")
checkPhysVsMetric(physIPv4RxPackets, metricIPv4RxPackets, "IPv4RxPackets")
checkPhysVsMetric(physIPv4TxPackets, metricIPv4TxPackets, "IPv4TxPackets")
} }
// assertGlobalMetricsMatchPerConn validates that the global clientmetric // assertGlobalMetricsMatchPerConn validates that the global clientmetric