net/tunstats: new package to track per-connection counters (#5818)
High-level API:
type Statistics struct { ... }
type Counts struct { TxPackets, TxBytes, RxPackets, RxBytes uint64 }
func (*Statistics) UpdateTx([]byte)
func (*Statistics) UpdateRx([]byte)
func (*Statistics) Extract() map[flowtrack.Tuple]Counts
The API accepts a []byte instead of a packet.Parsed so that a future
implementation can directly hash the address and port bytes,
which are contiguous in most IP packets.
This will be useful for a custom concurrent-safe hashmap implementation.
Signed-off-by: Joe Tsai <joetsai@digital-static.net>
main
parent
bdf3d2a63f
commit
2934c5114c
@ -0,0 +1,78 @@ |
||||
// Copyright (c) 2022 Tailscale Inc & AUTHORS All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package tunstats maintains statistics about connections
|
||||
// flowing through a TUN device (which operate at the IP layer).
|
||||
package tunstats |
||||
|
||||
import ( |
||||
"sync" |
||||
|
||||
"tailscale.com/net/flowtrack" |
||||
"tailscale.com/net/packet" |
||||
) |
||||
|
||||
// Statistics maintains counters for every connection.
|
||||
// All methods are safe for concurrent use.
|
||||
// The zero value is ready for use.
|
||||
type Statistics struct { |
||||
mu sync.Mutex |
||||
m map[flowtrack.Tuple]Counts |
||||
} |
||||
|
||||
// Counts are statistics about a particular connection.
|
||||
type Counts struct { |
||||
TxPackets uint64 `json:"txPkts,omitempty"` |
||||
TxBytes uint64 `json:"txBytes,omitempty"` |
||||
RxPackets uint64 `json:"rxPkts,omitempty"` |
||||
RxBytes uint64 `json:"rxBytes,omitempty"` |
||||
} |
||||
|
||||
// UpdateTx updates the counters for a transmitted IP packet
|
||||
// The source and destination of the packet directly correspond with
|
||||
// the source and destination in flowtrack.Tuple.
|
||||
func (s *Statistics) UpdateTx(b []byte) { |
||||
s.update(b, false) |
||||
} |
||||
|
||||
// UpdateRx updates the counters for a received IP packet.
|
||||
// The source and destination of the packet are inverted with respect to
|
||||
// the source and destination in flowtrack.Tuple.
|
||||
func (s *Statistics) UpdateRx(b []byte) { |
||||
s.update(b, true) |
||||
} |
||||
|
||||
func (s *Statistics) update(b []byte, receive bool) { |
||||
var p packet.Parsed |
||||
p.Decode(b) |
||||
tuple := flowtrack.Tuple{Proto: p.IPProto, Src: p.Src, Dst: p.Dst} |
||||
if receive { |
||||
tuple.Src, tuple.Dst = tuple.Dst, tuple.Src |
||||
} |
||||
|
||||
s.mu.Lock() |
||||
defer s.mu.Unlock() |
||||
if s.m == nil { |
||||
s.m = make(map[flowtrack.Tuple]Counts) |
||||
} |
||||
cnts := s.m[tuple] |
||||
if receive { |
||||
cnts.RxPackets++ |
||||
cnts.RxBytes += uint64(len(b)) |
||||
} else { |
||||
cnts.TxPackets++ |
||||
cnts.TxBytes += uint64(len(b)) |
||||
} |
||||
s.m[tuple] = cnts |
||||
} |
||||
|
||||
// Extract extracts and resets the counters for all active connections.
|
||||
// It must be called periodically otherwise the memory used is unbounded.
|
||||
func (s *Statistics) Extract() map[flowtrack.Tuple]Counts { |
||||
s.mu.Lock() |
||||
defer s.mu.Unlock() |
||||
m := s.m |
||||
s.m = make(map[flowtrack.Tuple]Counts) |
||||
return m |
||||
} |
||||
@ -0,0 +1,194 @@ |
||||
// Copyright (c) 2022 Tailscale Inc & AUTHORS All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package tunstats |
||||
|
||||
import ( |
||||
"encoding/binary" |
||||
"fmt" |
||||
"math/rand" |
||||
"net/netip" |
||||
"runtime" |
||||
"sync" |
||||
"testing" |
||||
"time" |
||||
|
||||
qt "github.com/frankban/quicktest" |
||||
"tailscale.com/net/flowtrack" |
||||
"tailscale.com/types/ipproto" |
||||
) |
||||
|
||||
func testPacketV4(proto ipproto.Proto, srcAddr, dstAddr [4]byte, srcPort, dstPort, size uint16) (out []byte) { |
||||
var ipHdr [20]byte |
||||
ipHdr[0] = 4<<4 | 5 |
||||
binary.BigEndian.PutUint16(ipHdr[2:], size) |
||||
ipHdr[9] = byte(proto) |
||||
*(*[4]byte)(ipHdr[12:]) = srcAddr |
||||
*(*[4]byte)(ipHdr[16:]) = dstAddr |
||||
out = append(out, ipHdr[:]...) |
||||
switch proto { |
||||
case ipproto.TCP: |
||||
var tcpHdr [20]byte |
||||
binary.BigEndian.PutUint16(tcpHdr[0:], srcPort) |
||||
binary.BigEndian.PutUint16(tcpHdr[2:], dstPort) |
||||
out = append(out, tcpHdr[:]...) |
||||
case ipproto.UDP: |
||||
var udpHdr [8]byte |
||||
binary.BigEndian.PutUint16(udpHdr[0:], srcPort) |
||||
binary.BigEndian.PutUint16(udpHdr[2:], dstPort) |
||||
out = append(out, udpHdr[:]...) |
||||
default: |
||||
panic(fmt.Sprintf("unknown proto: %d", proto)) |
||||
} |
||||
return append(out, make([]byte, int(size)-len(out))...) |
||||
} |
||||
|
||||
func TestConcurrent(t *testing.T) { |
||||
c := qt.New(t) |
||||
|
||||
var stats Statistics |
||||
var wants []map[flowtrack.Tuple]Counts |
||||
gots := make([]map[flowtrack.Tuple]Counts, runtime.NumCPU()) |
||||
var group sync.WaitGroup |
||||
for i := range gots { |
||||
group.Add(1) |
||||
go func(i int) { |
||||
defer group.Done() |
||||
gots[i] = make(map[flowtrack.Tuple]Counts) |
||||
rn := rand.New(rand.NewSource(time.Now().UnixNano())) |
||||
var p []byte |
||||
var t flowtrack.Tuple |
||||
for j := 0; j < 1000; j++ { |
||||
delay := rn.Intn(10000) |
||||
if p == nil || rn.Intn(64) == 0 { |
||||
proto := ipproto.TCP |
||||
if rn.Intn(2) == 0 { |
||||
proto = ipproto.UDP |
||||
} |
||||
srcAddr := netip.AddrFrom4([4]byte{192, 168, 0, byte(rand.Intn(16))}) |
||||
dstAddr := netip.AddrFrom4([4]byte{192, 168, 0, byte(rand.Intn(16))}) |
||||
srcPort := uint16(rand.Intn(16)) |
||||
dstPort := uint16(rand.Intn(16)) |
||||
size := uint16(64 + rand.Intn(1024)) |
||||
p = testPacketV4(proto, srcAddr.As4(), dstAddr.As4(), srcPort, dstPort, size) |
||||
t = flowtrack.Tuple{Proto: proto, Src: netip.AddrPortFrom(srcAddr, srcPort), Dst: netip.AddrPortFrom(dstAddr, dstPort)} |
||||
} |
||||
t2 := t |
||||
receive := rn.Intn(2) == 0 |
||||
if receive { |
||||
t2.Src, t2.Dst = t2.Dst, t2.Src |
||||
} |
||||
|
||||
cnts := gots[i][t2] |
||||
if receive { |
||||
stats.UpdateRx(p) |
||||
cnts.RxPackets++ |
||||
cnts.RxBytes += uint64(len(p)) |
||||
} else { |
||||
cnts.TxPackets++ |
||||
cnts.TxBytes += uint64(len(p)) |
||||
stats.UpdateTx(p) |
||||
} |
||||
gots[i][t2] = cnts |
||||
time.Sleep(time.Duration(rn.Intn(1 + delay))) |
||||
} |
||||
}(i) |
||||
} |
||||
for range gots { |
||||
wants = append(wants, stats.Extract()) |
||||
time.Sleep(time.Millisecond) |
||||
} |
||||
group.Wait() |
||||
wants = append(wants, stats.Extract()) |
||||
|
||||
got := make(map[flowtrack.Tuple]Counts) |
||||
want := make(map[flowtrack.Tuple]Counts) |
||||
mergeMaps(got, gots...) |
||||
mergeMaps(want, wants...) |
||||
c.Assert(got, qt.DeepEquals, want) |
||||
} |
||||
|
||||
func mergeMaps(dst map[flowtrack.Tuple]Counts, srcs ...map[flowtrack.Tuple]Counts) { |
||||
for _, src := range srcs { |
||||
for tuple, cntsSrc := range src { |
||||
cntsDst := dst[tuple] |
||||
cntsDst.TxPackets += cntsSrc.TxPackets |
||||
cntsDst.TxBytes += cntsSrc.TxBytes |
||||
cntsDst.RxPackets += cntsSrc.RxPackets |
||||
cntsDst.RxBytes += cntsSrc.RxBytes |
||||
dst[tuple] = cntsDst |
||||
} |
||||
} |
||||
} |
||||
|
||||
func Benchmark(b *testing.B) { |
||||
// TODO: Test IPv6 packets?
|
||||
b.Run("SingleRoutine/SameConn", func(b *testing.B) { |
||||
p := testPacketV4(ipproto.UDP, [4]byte{192, 168, 0, 1}, [4]byte{192, 168, 0, 2}, 123, 456, 789) |
||||
b.ResetTimer() |
||||
b.ReportAllocs() |
||||
for i := 0; i < b.N; i++ { |
||||
var s Statistics |
||||
for j := 0; j < 1e3; j++ { |
||||
s.UpdateTx(p) |
||||
} |
||||
} |
||||
}) |
||||
b.Run("SingleRoutine/UniqueConns", func(b *testing.B) { |
||||
p := testPacketV4(ipproto.UDP, [4]byte{}, [4]byte{}, 0, 0, 789) |
||||
b.ResetTimer() |
||||
b.ReportAllocs() |
||||
for i := 0; i < b.N; i++ { |
||||
var s Statistics |
||||
for j := 0; j < 1e3; j++ { |
||||
binary.BigEndian.PutUint32(p[20:], uint32(j)) // unique port combination
|
||||
s.UpdateTx(p) |
||||
} |
||||
} |
||||
}) |
||||
b.Run("MultiRoutine/SameConn", func(b *testing.B) { |
||||
p := testPacketV4(ipproto.UDP, [4]byte{192, 168, 0, 1}, [4]byte{192, 168, 0, 2}, 123, 456, 789) |
||||
b.ResetTimer() |
||||
b.ReportAllocs() |
||||
for i := 0; i < b.N; i++ { |
||||
var s Statistics |
||||
var group sync.WaitGroup |
||||
for j := 0; j < runtime.NumCPU(); j++ { |
||||
group.Add(1) |
||||
go func() { |
||||
defer group.Done() |
||||
for k := 0; k < 1e3; k++ { |
||||
s.UpdateTx(p) |
||||
} |
||||
}() |
||||
} |
||||
group.Wait() |
||||
} |
||||
}) |
||||
b.Run("MultiRoutine/UniqueConns", func(b *testing.B) { |
||||
ps := make([][]byte, runtime.NumCPU()) |
||||
for i := range ps { |
||||
ps[i] = testPacketV4(ipproto.UDP, [4]byte{192, 168, 0, 1}, [4]byte{192, 168, 0, 2}, 0, 0, 789) |
||||
} |
||||
b.ResetTimer() |
||||
b.ReportAllocs() |
||||
for i := 0; i < b.N; i++ { |
||||
var s Statistics |
||||
var group sync.WaitGroup |
||||
for j := 0; j < runtime.NumCPU(); j++ { |
||||
group.Add(1) |
||||
go func(j int) { |
||||
defer group.Done() |
||||
p := ps[j] |
||||
j *= 1e3 |
||||
for k := 0; k < 1e3; k++ { |
||||
binary.BigEndian.PutUint32(p[20:], uint32(j+k)) // unique port combination
|
||||
s.UpdateTx(p) |
||||
} |
||||
}(j) |
||||
} |
||||
group.Wait() |
||||
} |
||||
}) |
||||
} |
||||
Loading…
Reference in new issue