derp/derpserver: add a unique sender cardinality estimate
Adds an observation point that may identify potentially abusive traffic patterns at outlier values. Updates tailscale/corp#24681 Signed-off-by: James Tucker <james@tailscale.com>
This commit is contained in:
committed by
James Tucker
parent
9eff8a4503
commit
5ee0c6bf1d
@@ -36,6 +36,7 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/axiomhq/hyperloglog"
|
||||
"go4.org/mem"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"tailscale.com/client/local"
|
||||
@@ -1643,6 +1644,12 @@ type sclient struct {
|
||||
sawSrc map[key.NodePublic]set.Handle
|
||||
bw *lazyBufioWriter
|
||||
|
||||
// senderCardinality estimates the number of unique peers that have
|
||||
// sent packets to this client. Owned by sendLoop, protected by
|
||||
// senderCardinalityMu for reads from other goroutines.
|
||||
senderCardinalityMu sync.Mutex
|
||||
senderCardinality *hyperloglog.Sketch
|
||||
|
||||
// Guarded by s.mu
|
||||
//
|
||||
// peerStateChange is used by mesh peers (a set of regional
|
||||
@@ -1778,6 +1785,8 @@ func (c *sclient) onSendLoopDone() {
|
||||
func (c *sclient) sendLoop(ctx context.Context) error {
|
||||
defer c.onSendLoopDone()
|
||||
|
||||
c.senderCardinality = hyperloglog.New()
|
||||
|
||||
jitter := rand.N(5 * time.Second)
|
||||
keepAliveTick, keepAliveTickChannel := c.s.clock.NewTicker(derp.KeepAlive + jitter)
|
||||
defer keepAliveTick.Stop()
|
||||
@@ -2000,6 +2009,11 @@ func (c *sclient) sendPacket(srcKey key.NodePublic, contents []byte) (err error)
|
||||
if withKey {
|
||||
pktLen += key.NodePublicRawLen
|
||||
c.noteSendFromSrc(srcKey)
|
||||
if c.senderCardinality != nil {
|
||||
c.senderCardinalityMu.Lock()
|
||||
c.senderCardinality.Insert(srcKey.AppendTo(nil))
|
||||
c.senderCardinalityMu.Unlock()
|
||||
}
|
||||
}
|
||||
if err = derp.WriteFrameHeader(c.bw.bw(), derp.FrameRecvPacket, uint32(pktLen)); err != nil {
|
||||
return err
|
||||
@@ -2013,6 +2027,17 @@ func (c *sclient) sendPacket(srcKey key.NodePublic, contents []byte) (err error)
|
||||
return err
|
||||
}
|
||||
|
||||
// EstimatedUniqueSenders returns an estimate of the number of unique peers
|
||||
// that have sent packets to this client.
|
||||
func (c *sclient) EstimatedUniqueSenders() uint64 {
|
||||
c.senderCardinalityMu.Lock()
|
||||
defer c.senderCardinalityMu.Unlock()
|
||||
if c.senderCardinality == nil {
|
||||
return 0
|
||||
}
|
||||
return c.senderCardinality.Estimate()
|
||||
}
|
||||
|
||||
// noteSendFromSrc notes that we are about to write a packet
|
||||
// from src to sclient.
|
||||
//
|
||||
@@ -2295,7 +2320,8 @@ type BytesSentRecv struct {
|
||||
Sent uint64
|
||||
Recv uint64
|
||||
// Key is the public key of the client which sent/received these bytes.
|
||||
Key key.NodePublic
|
||||
Key key.NodePublic
|
||||
UniqueSenders uint64 `json:",omitzero"`
|
||||
}
|
||||
|
||||
// parseSSOutput parses the output from the specific call to ss in ServeDebugTraffic.
|
||||
@@ -2349,6 +2375,11 @@ func (s *Server) ServeDebugTraffic(w http.ResponseWriter, r *http.Request) {
|
||||
if prev.Sent < next.Sent || prev.Recv < next.Recv {
|
||||
if pkey, ok := s.keyOfAddr[k]; ok {
|
||||
next.Key = pkey
|
||||
if cs, ok := s.clients[pkey]; ok {
|
||||
if c := cs.activeClient.Load(); c != nil {
|
||||
next.UniqueSenders = c.EstimatedUniqueSenders()
|
||||
}
|
||||
}
|
||||
if err := enc.Encode(next); err != nil {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"context"
|
||||
"crypto/x509"
|
||||
"encoding/asn1"
|
||||
"encoding/binary"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"log"
|
||||
@@ -20,6 +21,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/axiomhq/hyperloglog"
|
||||
qt "github.com/frankban/quicktest"
|
||||
"go4.org/mem"
|
||||
"golang.org/x/time/rate"
|
||||
@@ -755,6 +757,35 @@ func TestParseSSOutput(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestServeDebugTrafficUniqueSenders(t *testing.T) {
|
||||
s := New(key.NewNode(), t.Logf)
|
||||
defer s.Close()
|
||||
|
||||
clientKey := key.NewNode().Public()
|
||||
c := &sclient{
|
||||
key: clientKey,
|
||||
s: s,
|
||||
logf: logger.Discard,
|
||||
senderCardinality: hyperloglog.New(),
|
||||
}
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
c.senderCardinality.Insert(key.NewNode().Public().AppendTo(nil))
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
cs := &clientSet{}
|
||||
cs.activeClient.Store(c)
|
||||
s.clients[clientKey] = cs
|
||||
s.mu.Unlock()
|
||||
|
||||
estimate := c.EstimatedUniqueSenders()
|
||||
t.Logf("Estimated unique senders: %d", estimate)
|
||||
if estimate < 4 || estimate > 6 {
|
||||
t.Errorf("EstimatedUniqueSenders() = %d, want ~5 (4-6 range)", estimate)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetPerClientSendQueueDepth(t *testing.T) {
|
||||
c := qt.New(t)
|
||||
envKey := "TS_DEBUG_DERP_PER_CLIENT_SEND_QUEUE_DEPTH"
|
||||
@@ -780,3 +811,167 @@ func TestGetPerClientSendQueueDepth(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSenderCardinality(t *testing.T) {
|
||||
s := New(key.NewNode(), t.Logf)
|
||||
defer s.Close()
|
||||
|
||||
c := &sclient{
|
||||
key: key.NewNode().Public(),
|
||||
s: s,
|
||||
logf: logger.WithPrefix(t.Logf, "test client: "),
|
||||
}
|
||||
|
||||
if got := c.EstimatedUniqueSenders(); got != 0 {
|
||||
t.Errorf("EstimatedUniqueSenders() before init = %d, want 0", got)
|
||||
}
|
||||
|
||||
c.senderCardinality = hyperloglog.New()
|
||||
|
||||
if got := c.EstimatedUniqueSenders(); got != 0 {
|
||||
t.Errorf("EstimatedUniqueSenders() with no senders = %d, want 0", got)
|
||||
}
|
||||
|
||||
senders := make([]key.NodePublic, 10)
|
||||
for i := range senders {
|
||||
senders[i] = key.NewNode().Public()
|
||||
c.senderCardinality.Insert(senders[i].AppendTo(nil))
|
||||
}
|
||||
|
||||
estimate := c.EstimatedUniqueSenders()
|
||||
t.Logf("Estimated unique senders after 10 inserts: %d", estimate)
|
||||
|
||||
if estimate < 8 || estimate > 12 {
|
||||
t.Errorf("EstimatedUniqueSenders() = %d, want ~10 (8-12 range)", estimate)
|
||||
}
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
c.senderCardinality.Insert(senders[i].AppendTo(nil))
|
||||
}
|
||||
|
||||
estimate2 := c.EstimatedUniqueSenders()
|
||||
t.Logf("Estimated unique senders after duplicates: %d", estimate2)
|
||||
|
||||
if estimate2 < 8 || estimate2 > 12 {
|
||||
t.Errorf("EstimatedUniqueSenders() after duplicates = %d, want ~10 (8-12 range)", estimate2)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSenderCardinality100(t *testing.T) {
|
||||
s := New(key.NewNode(), t.Logf)
|
||||
defer s.Close()
|
||||
|
||||
c := &sclient{
|
||||
key: key.NewNode().Public(),
|
||||
s: s,
|
||||
logf: logger.WithPrefix(t.Logf, "test client: "),
|
||||
senderCardinality: hyperloglog.New(),
|
||||
}
|
||||
|
||||
numSenders := 100
|
||||
for i := 0; i < numSenders; i++ {
|
||||
c.senderCardinality.Insert(key.NewNode().Public().AppendTo(nil))
|
||||
}
|
||||
|
||||
estimate := c.EstimatedUniqueSenders()
|
||||
t.Logf("Estimated unique senders for 100 actual senders: %d", estimate)
|
||||
|
||||
if estimate < 85 || estimate > 115 {
|
||||
t.Errorf("EstimatedUniqueSenders() = %d, want ~100 (85-115 range)", estimate)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSenderCardinalityTracking(t *testing.T) {
|
||||
s := New(key.NewNode(), t.Logf)
|
||||
defer s.Close()
|
||||
|
||||
c := &sclient{
|
||||
key: key.NewNode().Public(),
|
||||
s: s,
|
||||
logf: logger.WithPrefix(t.Logf, "test client: "),
|
||||
senderCardinality: hyperloglog.New(),
|
||||
}
|
||||
|
||||
zeroKey := key.NodePublic{}
|
||||
if zeroKey != (key.NodePublic{}) {
|
||||
c.senderCardinality.Insert(zeroKey.AppendTo(nil))
|
||||
}
|
||||
|
||||
if estimate := c.EstimatedUniqueSenders(); estimate != 0 {
|
||||
t.Errorf("EstimatedUniqueSenders() after zero key = %d, want 0", estimate)
|
||||
}
|
||||
|
||||
sender1 := key.NewNode().Public()
|
||||
sender2 := key.NewNode().Public()
|
||||
|
||||
if sender1 != (key.NodePublic{}) {
|
||||
c.senderCardinality.Insert(sender1.AppendTo(nil))
|
||||
}
|
||||
if sender2 != (key.NodePublic{}) {
|
||||
c.senderCardinality.Insert(sender2.AppendTo(nil))
|
||||
}
|
||||
|
||||
estimate := c.EstimatedUniqueSenders()
|
||||
t.Logf("Estimated unique senders after 2 senders: %d", estimate)
|
||||
|
||||
if estimate < 1 || estimate > 3 {
|
||||
t.Errorf("EstimatedUniqueSenders() = %d, want ~2 (1-3 range)", estimate)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkHyperLogLogInsert(b *testing.B) {
|
||||
hll := hyperloglog.New()
|
||||
sender := key.NewNode().Public()
|
||||
senderBytes := sender.AppendTo(nil)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
hll.Insert(senderBytes)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkHyperLogLogInsertUnique(b *testing.B) {
|
||||
hll := hyperloglog.New()
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
buf := make([]byte, 32)
|
||||
for i := 0; i < b.N; i++ {
|
||||
binary.LittleEndian.PutUint64(buf, uint64(i))
|
||||
hll.Insert(buf)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkHyperLogLogEstimate(b *testing.B) {
|
||||
hll := hyperloglog.New()
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
hll.Insert(key.NewNode().Public().AppendTo(nil))
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_ = hll.Estimate()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkSenderCardinalityOverhead(b *testing.B) {
|
||||
hll := hyperloglog.New()
|
||||
sender := key.NewNode().Public()
|
||||
|
||||
b.Run("WithTracking", func(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
for i := 0; i < b.N; i++ {
|
||||
if hll != nil {
|
||||
hll.Insert(sender.AppendTo(nil))
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("WithoutTracking", func(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_ = sender.AppendTo(nil)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user