WIP: rebase for 2026-05-18 #7
@@ -2471,6 +2471,14 @@ func (b *LocalBackend) PeersForTest() []tailcfg.NodeView {
|
||||
return b.currentNode().PeersForTest()
|
||||
}
|
||||
|
||||
// AwaitNodeKeyForTest returns a channel that is closed once a peer with the
|
||||
// given node key first appears in the current netmap. If the peer is already
|
||||
// present, the returned channel is already closed. See
|
||||
// [nodeBackend.AwaitNodeKeyForTest].
|
||||
func (b *LocalBackend) AwaitNodeKeyForTest(k key.NodePublic) <-chan struct{} {
|
||||
return b.currentNode().AwaitNodeKeyForTest(k)
|
||||
}
|
||||
|
||||
func (b *LocalBackend) getNewControlClientFuncLocked() clientGen {
|
||||
if b.ccGen == nil {
|
||||
// Initialize it rather than just returning the
|
||||
|
||||
@@ -29,6 +29,7 @@ import (
|
||||
"tailscale.com/util/eventbus"
|
||||
"tailscale.com/util/mak"
|
||||
"tailscale.com/util/slicesx"
|
||||
"tailscale.com/util/testenv"
|
||||
"tailscale.com/wgengine/filter"
|
||||
)
|
||||
|
||||
@@ -107,6 +108,12 @@ type nodeBackend struct {
|
||||
// nodeByKey is an index of node public key to node ID for fast lookups.
|
||||
// It is mutated in place (with mu held) and must not escape the [nodeBackend].
|
||||
nodeByKey map[key.NodePublic]tailcfg.NodeID
|
||||
|
||||
// keyWaitersForTest is the test-only registry of channels waiting for
|
||||
// a given peer key to first appear in the netmap. See
|
||||
// [nodeBackend.AwaitNodeKeyForTest]. It is populated lazily and remains
|
||||
// nil in production, where no test installs a waiter.
|
||||
keyWaitersForTest map[key.NodePublic]chan struct{}
|
||||
}
|
||||
|
||||
func newNodeBackend(ctx context.Context, logf logger.Logf, bus *eventbus.Bus) *nodeBackend {
|
||||
@@ -421,6 +428,7 @@ func (nb *nodeBackend) SetNetMap(nm *netmap.NetworkMap) {
|
||||
nb.updateNodeByAddrLocked()
|
||||
nb.updateNodeByKeyLocked()
|
||||
nb.updatePeersLocked()
|
||||
nb.signalKeyWaitersForTestLocked()
|
||||
if nm != nil {
|
||||
nb.derpMapViewPub.Publish(nm.DERPMap.View())
|
||||
} else {
|
||||
@@ -428,6 +436,43 @@ func (nb *nodeBackend) SetNetMap(nm *netmap.NetworkMap) {
|
||||
}
|
||||
}
|
||||
|
||||
// AwaitNodeKeyForTest returns a channel that is closed once a peer with the
|
||||
// given node key first appears in this nodeBackend's peer index, or
|
||||
// immediately (a closed channel) if it's already present. It is intended for
|
||||
// in-process benchmarks that drive synthetic netmap deltas and need a
|
||||
// zero-overhead signal that the client has applied a delta, replacing
|
||||
// poll-based [local.Client.WhoIsNodeKey] loops in tests. It panics outside
|
||||
// of tests.
|
||||
func (nb *nodeBackend) AwaitNodeKeyForTest(k key.NodePublic) <-chan struct{} {
|
||||
testenv.AssertInTest()
|
||||
nb.mu.Lock()
|
||||
defer nb.mu.Unlock()
|
||||
if _, ok := nb.nodeByKey[k]; ok {
|
||||
return syncs.ClosedChan()
|
||||
}
|
||||
if ch, ok := nb.keyWaitersForTest[k]; ok {
|
||||
return ch
|
||||
}
|
||||
ch := make(chan struct{})
|
||||
mak.Set(&nb.keyWaitersForTest, k, ch)
|
||||
return ch
|
||||
}
|
||||
|
||||
// signalKeyWaitersForTestLocked closes any waiter channels whose keys now
|
||||
// appear in nb.nodeByKey. It is cheap when there are no waiters, which is
|
||||
// the common case in production. It is called from [nodeBackend.SetNetMap]
|
||||
// after the per-key index has been rebuilt.
|
||||
//
|
||||
// Caller must hold nb.mu.
|
||||
func (nb *nodeBackend) signalKeyWaitersForTestLocked() {
|
||||
for k, ch := range nb.keyWaitersForTest {
|
||||
if _, ok := nb.nodeByKey[k]; ok {
|
||||
close(ch)
|
||||
delete(nb.keyWaitersForTest, k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (nb *nodeBackend) updateNodeByAddrLocked() {
|
||||
nm := nb.netMap
|
||||
if nm == nil {
|
||||
|
||||
@@ -403,6 +403,19 @@ func (s *Server) LocalClient() (*local.Client, error) {
|
||||
return s.localClient, nil
|
||||
}
|
||||
|
||||
// TestHooks are hooks meant for internal-testing only; they're not stable
|
||||
// or documented, intentionally.
|
||||
var TestHooks testHooks
|
||||
|
||||
type testHooks struct{}
|
||||
|
||||
// LocalBackend returns the [ipnlocal.LocalBackend] backing s. It panics
|
||||
// outside of tests.
|
||||
func (testHooks) LocalBackend(s *Server) *ipnlocal.LocalBackend {
|
||||
testenv.AssertInTest()
|
||||
return s.lb
|
||||
}
|
||||
|
||||
// Loopback starts a routing server on a loopback address.
|
||||
//
|
||||
// The server has multiple functions.
|
||||
|
||||
@@ -89,6 +89,9 @@ type Server struct {
|
||||
// MapResponse stream to modify the first MapResponse sent in response to it.
|
||||
ModifyFirstMapResponse func(*tailcfg.MapResponse, *tailcfg.MapRequest)
|
||||
|
||||
// AltMapStream, if non-nil, takes over serveMap. See [AltMapStreamFunc].
|
||||
AltMapStream AltMapStreamFunc
|
||||
|
||||
initMuxOnce sync.Once
|
||||
mux *http.ServeMux
|
||||
|
||||
@@ -1144,6 +1147,15 @@ func (s *Server) serveMap(w http.ResponseWriter, r *http.Request, mkey key.Machi
|
||||
go panic(fmt.Sprintf("bad map request: %v", err))
|
||||
}
|
||||
|
||||
if s.AltMapStream != nil {
|
||||
// The caller takes over the stream entirely; it must handle
|
||||
// keeping the HTTP response alive until ctx is done.
|
||||
compress := req.Compress != ""
|
||||
w.WriteHeader(200)
|
||||
s.AltMapStream(ctx, &mapStreamSender{s: s, w: w, compress: compress}, req)
|
||||
return
|
||||
}
|
||||
|
||||
jitter := rand.N(8 * time.Second)
|
||||
keepAlive := 50*time.Second + jitter
|
||||
|
||||
@@ -1486,12 +1498,51 @@ func (s *Server) takeRawMapMessage(nk key.NodePublic) (mapResJSON []byte, ok boo
|
||||
return mapResJSON, true
|
||||
}
|
||||
|
||||
// AltMapStreamFunc is the type of [Server.AltMapStream]: a callback that
|
||||
// takes over the serveMap handler entirely. The callback hand-builds and
|
||||
// sends MapResponses via the provided [MapStreamWriter] and is responsible
|
||||
// for keeping the stream alive until ctx is done. When set, the normal
|
||||
// per-node map-stream state machine in serveMap is bypassed.
|
||||
//
|
||||
// The callback is invoked for every map long-poll, including the
|
||||
// non-streaming "lite" polls controlclient issues to push HostInfo updates
|
||||
// (req.Stream == false). Implementations that only care about the streaming
|
||||
// long-poll typically respond to non-streaming polls with an empty
|
||||
// MapResponse and return immediately.
|
||||
//
|
||||
// This hook is for benchmarks and stress tests that need to drive clients
|
||||
// with a controlled sequence of responses.
|
||||
type AltMapStreamFunc func(ctx context.Context, w MapStreamWriter, req *tailcfg.MapRequest)
|
||||
|
||||
// MapStreamWriter is the interface passed to an [AltMapStreamFunc],
|
||||
// letting the callback write framed MapResponse messages directly onto the
|
||||
// long-poll HTTP response.
|
||||
type MapStreamWriter interface {
|
||||
// SendMapMessage encodes and writes msg as a single framed
|
||||
// MapResponse on the stream. It respects the client's Compress flag
|
||||
// (captured when the stream started).
|
||||
SendMapMessage(msg *tailcfg.MapResponse) error
|
||||
}
|
||||
|
||||
// mapStreamSender implements [MapStreamWriter] for [Server.AltMapStream]
|
||||
// callbacks.
|
||||
type mapStreamSender struct {
|
||||
s *Server
|
||||
w http.ResponseWriter
|
||||
compress bool
|
||||
}
|
||||
|
||||
func (m *mapStreamSender) SendMapMessage(msg *tailcfg.MapResponse) error {
|
||||
return m.s.sendMapMsg(m.w, m.compress, msg)
|
||||
}
|
||||
|
||||
func (s *Server) sendMapMsg(w http.ResponseWriter, compress bool, msg any) error {
|
||||
resBytes, err := s.encode(compress, msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(resBytes) > 16<<20 {
|
||||
const maxMapSize = 256 << 20 // 256MB
|
||||
if len(resBytes) > maxMapSize {
|
||||
return fmt.Errorf("map message too big: %d", len(resBytes))
|
||||
}
|
||||
var siz [4]byte
|
||||
|
||||
@@ -0,0 +1,265 @@
|
||||
// Copyright (c) Tailscale Inc & contributors
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
// Package largetailnet provides reusable building blocks for in-process
|
||||
// benchmarks and stress tests that drive a single tailnet client (typically a
|
||||
// [tsnet.Server]) with a synthetic large-tailnet MapResponse stream.
|
||||
//
|
||||
// A [Streamer] takes over the map long-poll on a [testcontrol.Server] via the
|
||||
// AltMapStream hook: it sends one initial MapResponse announcing the self
|
||||
// node and N synthetic peers, and then forwards caller-supplied delta
|
||||
// MapResponses on the same stream until ctx is done.
|
||||
//
|
||||
// The package is designed so that a benchmark can:
|
||||
//
|
||||
// - Build a [Streamer] with the desired peer count.
|
||||
// - Stand up a [testcontrol.Server] with the streamer's [Streamer.AltMapStream]
|
||||
// installed.
|
||||
// - Stand up a [tsnet.Server] pointed at the testcontrol; its Up call
|
||||
// blocks until the initial netmap has been processed.
|
||||
// - Reset the benchmark timer and drive add/remove deltas with
|
||||
// [Streamer.SendDelta] and [Streamer.AllocPeer].
|
||||
package largetailnet
|
||||
|
||||
import (
|
||||
"context"
|
||||
cryptorand "crypto/rand"
|
||||
"fmt"
|
||||
"net/netip"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"go4.org/mem"
|
||||
"tailscale.com/net/tsaddr"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/tstest/integration/testcontrol"
|
||||
"tailscale.com/types/key"
|
||||
)
|
||||
|
||||
// SelfUserID is the synthetic [tailcfg.UserID] assigned to the self node and
|
||||
// to every initial peer produced by [Streamer]. Tests that build their own
|
||||
// peers via [MakePeer] should pass this value.
|
||||
const SelfUserID tailcfg.UserID = 1_000_000
|
||||
|
||||
// Streamer drives a controlled MapResponse stream to a single client via
|
||||
// [testcontrol.Server.AltMapStream]. It synthesizes an initial netmap with N
|
||||
// peers and forwards caller-supplied delta MapResponses on the same stream.
|
||||
//
|
||||
// A Streamer is single-shot: it expects exactly one map long-poll over its
|
||||
// lifetime and is not safe for re-use across multiple clients.
|
||||
type Streamer struct {
|
||||
n int
|
||||
derpMap *tailcfg.DERPMap
|
||||
|
||||
started chan struct{} // closed when the alt-map-stream callback first fires
|
||||
initialDone chan struct{} // closed after initial MapResponse has been written
|
||||
deltas chan *tailcfg.MapResponse
|
||||
|
||||
// nextID is the next free node ID. It starts at N+2 (1 is the self
|
||||
// node, 2..N+1 are the initial peers) and is bumped by AllocPeer.
|
||||
nextID atomic.Int64
|
||||
}
|
||||
|
||||
// New constructs a Streamer that will produce an initial netmap with n peers
|
||||
// and a self node when its AltMapStream callback first fires. derpMap is
|
||||
// included verbatim in the initial MapResponse.
|
||||
func New(n int, derpMap *tailcfg.DERPMap) *Streamer {
|
||||
s := &Streamer{
|
||||
n: n,
|
||||
derpMap: derpMap,
|
||||
started: make(chan struct{}),
|
||||
initialDone: make(chan struct{}),
|
||||
// Buffered so a benchmark loop body that does send-then-wait
|
||||
// doesn't block on the channel under steady state.
|
||||
deltas: make(chan *tailcfg.MapResponse, 64),
|
||||
}
|
||||
s.nextID.Store(int64(n) + 2)
|
||||
return s
|
||||
}
|
||||
|
||||
// AltMapStream returns a callback suitable for [testcontrol.Server.AltMapStream].
|
||||
// On the first streaming long-poll it sends the initial big MapResponse and
|
||||
// then forwards deltas enqueued via [Streamer.SendDelta] until ctx is done.
|
||||
// Non-streaming "lite" polls are answered with an empty MapResponse so they
|
||||
// complete quickly. The streamer is single-shot: any later streaming polls
|
||||
// are kept alive but produce no further messages.
|
||||
func (s *Streamer) AltMapStream() testcontrol.AltMapStreamFunc {
|
||||
return func(ctx context.Context, w testcontrol.MapStreamWriter, req *tailcfg.MapRequest) {
|
||||
if !req.Stream {
|
||||
_ = w.SendMapMessage(&tailcfg.MapResponse{})
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-s.started:
|
||||
// Re-poll after the original stream ended. Keep the
|
||||
// connection alive so the client doesn't churn.
|
||||
<-ctx.Done()
|
||||
return
|
||||
default:
|
||||
close(s.started)
|
||||
}
|
||||
|
||||
if err := s.sendInitial(w, req); err != nil {
|
||||
// Make the failure loud rather than wedging the
|
||||
// caller's [tsnet.Server.Up] on a silent retry loop.
|
||||
panic(fmt.Sprintf("largetailnet: sendInitial: %v", err))
|
||||
}
|
||||
close(s.initialDone)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case mr := <-s.deltas:
|
||||
if err := w.SendMapMessage(mr); err != nil {
|
||||
<-ctx.Done()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// AwaitInitialSent blocks until the initial big MapResponse has been written
|
||||
// to the wire. Note this is not the same as "the client has finished
|
||||
// processing it"; for that, callers should rely on [tsnet.Server.Up]
|
||||
// returning, or watch the IPN bus.
|
||||
func (s *Streamer) AwaitInitialSent(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-s.initialDone:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// SendDelta enqueues mr for delivery on the active MapResponse stream. It
|
||||
// blocks if the internal queue is full or the stream hasn't started yet.
|
||||
func (s *Streamer) SendDelta(ctx context.Context, mr *tailcfg.MapResponse) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case s.deltas <- mr:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// AllocPeer returns a fresh synthetic peer node with a never-before-used
|
||||
// [tailcfg.NodeID]. It's intended for use in PeersChanged deltas.
|
||||
func (s *Streamer) AllocPeer() *tailcfg.Node {
|
||||
return MakePeer(tailcfg.NodeID(s.nextID.Add(1)-1), SelfUserID)
|
||||
}
|
||||
|
||||
// SelfNodeID returns the [tailcfg.NodeID] used for the self node in the
|
||||
// initial netmap.
|
||||
func (s *Streamer) SelfNodeID() tailcfg.NodeID { return 1 }
|
||||
|
||||
// sendInitial writes the big initial MapResponse with s.n peers.
|
||||
func (s *Streamer) sendInitial(w testcontrol.MapStreamWriter, req *tailcfg.MapRequest) error {
|
||||
selfNodeID := s.SelfNodeID()
|
||||
selfIP4 := node4(selfNodeID)
|
||||
selfIP6 := node6(selfNodeID)
|
||||
|
||||
peers := make([]*tailcfg.Node, 0, s.n)
|
||||
for i := 0; i < s.n; i++ {
|
||||
peers = append(peers, MakePeer(tailcfg.NodeID(i+2), SelfUserID))
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
selfNode := &tailcfg.Node{
|
||||
ID: selfNodeID,
|
||||
StableID: "largetailnet-self",
|
||||
Name: "self.largetailnet.ts.net.",
|
||||
User: SelfUserID,
|
||||
Key: req.NodeKey,
|
||||
KeyExpiry: now.Add(24 * time.Hour),
|
||||
Machine: randMachineKey(), // fake; client doesn't verify
|
||||
DiscoKey: req.DiscoKey,
|
||||
MachineAuthorized: true,
|
||||
Addresses: []netip.Prefix{selfIP4, selfIP6},
|
||||
AllowedIPs: []netip.Prefix{selfIP4, selfIP6},
|
||||
CapMap: map[tailcfg.NodeCapability][]tailcfg.RawMessage{},
|
||||
}
|
||||
|
||||
initial := &tailcfg.MapResponse{
|
||||
KeepAlive: false,
|
||||
Node: selfNode,
|
||||
DERPMap: s.derpMap,
|
||||
Peers: peers,
|
||||
PacketFilter: []tailcfg.FilterRule{{
|
||||
// Accept-all filter so the client isn't logging packet-filter
|
||||
// failures; this is a benchmark harness, not a security test.
|
||||
SrcIPs: []string{"*"},
|
||||
DstPorts: []tailcfg.NetPortRange{{IP: "*", Ports: tailcfg.PortRangeAny}},
|
||||
}},
|
||||
DNSConfig: &tailcfg.DNSConfig{},
|
||||
Domain: "largetailnet.ts.net",
|
||||
UserProfiles: []tailcfg.UserProfile{{
|
||||
ID: SelfUserID,
|
||||
LoginName: "largetailnet@example.com",
|
||||
DisplayName: "largetailnet",
|
||||
}},
|
||||
ControlTime: &now,
|
||||
}
|
||||
return w.SendMapMessage(initial)
|
||||
}
|
||||
|
||||
// MakePeer constructs a synthetic [tailcfg.Node] for the given NodeID and
|
||||
// UserID. The peer's node/disco/machine keys are derived from random bytes
|
||||
// via the *PublicFromRaw32 constructors rather than via key.New*().Public(),
|
||||
// which avoids the per-peer Curve25519 ScalarBaseMult and lets the harness
|
||||
// construct hundreds of thousands of peers in a few hundred milliseconds.
|
||||
// The client never crypto-validates these keys in the bench, so opaque
|
||||
// random bytes are sufficient.
|
||||
func MakePeer(nid tailcfg.NodeID, user tailcfg.UserID) *tailcfg.Node {
|
||||
v4, v6 := node4(nid), node6(nid)
|
||||
name := fmt.Sprintf("peer-%d", nid)
|
||||
return &tailcfg.Node{
|
||||
ID: nid,
|
||||
StableID: tailcfg.StableNodeID(name),
|
||||
Name: name + ".largetailnet.ts.net.",
|
||||
Key: randNodeKey(),
|
||||
MachineAuthorized: true,
|
||||
DiscoKey: randDiscoKey(),
|
||||
Machine: randMachineKey(),
|
||||
Addresses: []netip.Prefix{v4, v6},
|
||||
AllowedIPs: []netip.Prefix{v4, v6},
|
||||
User: user,
|
||||
// Hostinfo must be non-nil: LocalBackend.populatePeerStatus
|
||||
// dereferences it via HostinfoView.Hostname unconditionally.
|
||||
Hostinfo: (&tailcfg.Hostinfo{Hostname: name}).View(),
|
||||
}
|
||||
}
|
||||
|
||||
func randNodeKey() key.NodePublic {
|
||||
var b [32]byte
|
||||
cryptorand.Read(b[:])
|
||||
return key.NodePublicFromRaw32(mem.B(b[:]))
|
||||
}
|
||||
|
||||
func randDiscoKey() key.DiscoPublic {
|
||||
var b [32]byte
|
||||
cryptorand.Read(b[:])
|
||||
return key.DiscoPublicFromRaw32(mem.B(b[:]))
|
||||
}
|
||||
|
||||
func randMachineKey() key.MachinePublic {
|
||||
var b [32]byte
|
||||
cryptorand.Read(b[:])
|
||||
return key.MachinePublicFromRaw32(mem.B(b[:]))
|
||||
}
|
||||
|
||||
func node4(nid tailcfg.NodeID) netip.Prefix {
|
||||
return netip.PrefixFrom(
|
||||
netip.AddrFrom4([4]byte{100, 100 + byte(nid>>16), byte(nid >> 8), byte(nid)}),
|
||||
32)
|
||||
}
|
||||
|
||||
func node6(nid tailcfg.NodeID) netip.Prefix {
|
||||
a := tsaddr.TailscaleULARange().Addr().As16()
|
||||
a[13] = byte(nid >> 16)
|
||||
a[14] = byte(nid >> 8)
|
||||
a[15] = byte(nid)
|
||||
return netip.PrefixFrom(netip.AddrFrom16(a), 128)
|
||||
}
|
||||
@@ -0,0 +1,218 @@
|
||||
// Copyright (c) Tailscale Inc & contributors
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package largetailnet_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"tailscale.com/ipn/store/mem"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/tsnet"
|
||||
"tailscale.com/tstest/integration"
|
||||
"tailscale.com/tstest/integration/testcontrol"
|
||||
"tailscale.com/tstest/largetailnet"
|
||||
"tailscale.com/types/logger"
|
||||
)
|
||||
|
||||
// tsnet.Server.Up handles the wait-for-ipn.Running step itself: it
|
||||
// subscribes to the IPN bus with NotifyInitialState and blocks until State
|
||||
// reaches ipn.Running, which by definition means a netmap has been applied.
|
||||
// We don't redo that work here.
|
||||
|
||||
var (
|
||||
flagActuallyTest = flag.Bool("actually-test-giant-tailnet", false,
|
||||
"if set, run the BenchmarkGiantTailnet* benchmarks; otherwise they are skipped")
|
||||
flagN = flag.Int("giant-tailnet-n", 250_000,
|
||||
"size of the initial netmap (peer count) for BenchmarkGiantTailnet*")
|
||||
flagBenchVerbose = flag.Bool("giant-tailnet-verbose", false,
|
||||
"if set, log tsnet output and DERP setup to stderr")
|
||||
)
|
||||
|
||||
// BenchmarkGiantTailnet measures the per-delta CPU cost of a tailnet client
|
||||
// processing peer-add/peer-remove deltas in steady state, with no IPN bus
|
||||
// subscribers attached. This represents the headless-tailscaled workload
|
||||
// (Linux subnet routers, container sidecars, ...) where the LocalBackend
|
||||
// does not pay for fanning Notify.NetMap out to GUI watchers.
|
||||
//
|
||||
// Use [BenchmarkGiantTailnetBusWatcher] for the GUI-client workload.
|
||||
//
|
||||
// The benchmark is opt-in via --actually-test-giant-tailnet.
|
||||
func BenchmarkGiantTailnet(b *testing.B) {
|
||||
if !*flagActuallyTest {
|
||||
b.Skip("set --actually-test-giant-tailnet to run this benchmark")
|
||||
}
|
||||
benchGiantTailnet(b, false)
|
||||
}
|
||||
|
||||
// BenchmarkGiantTailnetBusWatcher is like [BenchmarkGiantTailnet] but
|
||||
// attaches one [local.Client.WatchIPNBus] subscriber for the duration of the
|
||||
// benchmark. The Notify-fan-out cost (notably Notify.NetMap encoding to
|
||||
// every watcher on every full-rebuild path) is therefore included in the
|
||||
// per-delta measurement, which approximates the GUI-client workload.
|
||||
//
|
||||
// The benchmark is opt-in via --actually-test-giant-tailnet.
|
||||
func BenchmarkGiantTailnetBusWatcher(b *testing.B) {
|
||||
if !*flagActuallyTest {
|
||||
b.Skip("set --actually-test-giant-tailnet to run this benchmark")
|
||||
}
|
||||
benchGiantTailnet(b, true)
|
||||
}
|
||||
|
||||
// benchGiantTailnet is the shared body of the BenchmarkGiantTailnet*
|
||||
// benchmarks. Setup is entirely in-process: a [testcontrol.Server] hosts
|
||||
// the control plane, a [tsnet.Server] hosts the client, and a
|
||||
// [largetailnet.Streamer] hijacks the map long-poll to drive an exact
|
||||
// MapResponse sequence.
|
||||
//
|
||||
// Each loop iteration sends one [tailcfg.MapResponse] with PeersChanged
|
||||
// (a fresh peer) and PeersRemoved (the previous fresh peer), then waits
|
||||
// for the client to apply it. Net peer count stays at flagN throughout the
|
||||
// loop.
|
||||
//
|
||||
// The wait mechanism differs by variant:
|
||||
//
|
||||
// - busWatcher=false: block on a channel returned by
|
||||
// [ipnlocal.LocalBackend.AwaitNodeKeyForTest] (reached via
|
||||
// [tsnet.TestHooks]). The channel is closed by LocalBackend the moment
|
||||
// the just-added peer's key appears in the netmap, so the wait has zero
|
||||
// polling overhead.
|
||||
// - busWatcher=true: drain Notify events from the bus subscription, since
|
||||
// a Notify firing is exactly the side-effect we want to amortize into
|
||||
// the per-delta measurement.
|
||||
//
|
||||
// Recommended invocation for profiling on unmodified main:
|
||||
//
|
||||
// go test ./tstest/largetailnet/ -run=^$ \
|
||||
// -bench='BenchmarkGiantTailnet(BusWatcher)?$' \
|
||||
// -benchtime=2000x -timeout=10m \
|
||||
// --actually-test-giant-tailnet \
|
||||
// --giant-tailnet-n=250000 \
|
||||
// -cpuprofile=/tmp/giant.cpu.pprof
|
||||
func benchGiantTailnet(b *testing.B, busWatcher bool) {
|
||||
logf := logger.Discard
|
||||
if *flagBenchVerbose {
|
||||
logf = b.Logf
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
|
||||
b.Cleanup(cancel)
|
||||
|
||||
derpMap := integration.RunDERPAndSTUN(b, logf, "127.0.0.1")
|
||||
|
||||
streamer := largetailnet.New(*flagN, derpMap)
|
||||
|
||||
ctrl := &testcontrol.Server{
|
||||
DERPMap: derpMap,
|
||||
DNSConfig: &tailcfg.DNSConfig{},
|
||||
AltMapStream: streamer.AltMapStream(),
|
||||
Logf: logf,
|
||||
}
|
||||
ctrl.HTTPTestServer = httptest.NewUnstartedServer(ctrl)
|
||||
ctrl.HTTPTestServer.Start()
|
||||
b.Cleanup(ctrl.HTTPTestServer.Close)
|
||||
controlURL := ctrl.HTTPTestServer.URL
|
||||
b.Logf("testcontrol listening on %s", controlURL)
|
||||
|
||||
tmp := filepath.Join(b.TempDir(), "tsnet")
|
||||
if err := os.MkdirAll(tmp, 0755); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
s := &tsnet.Server{
|
||||
Dir: tmp,
|
||||
ControlURL: controlURL,
|
||||
Hostname: "largetailnet-bench",
|
||||
Store: new(mem.Store),
|
||||
Ephemeral: true,
|
||||
Logf: logf,
|
||||
}
|
||||
b.Cleanup(func() { s.Close() })
|
||||
|
||||
// tsnet.Server.Up blocks until the backend reaches Running, which
|
||||
// requires the initial flagN-peer MapResponse to have been processed.
|
||||
upStart := time.Now()
|
||||
if _, err := s.Up(ctx); err != nil {
|
||||
b.Fatalf("tsnet.Server.Up: %v", err)
|
||||
}
|
||||
b.Logf("initial %d-peer netmap processed in %v", *flagN, time.Since(upStart))
|
||||
|
||||
lc, err := s.LocalClient()
|
||||
if err != nil {
|
||||
b.Fatalf("LocalClient: %v", err)
|
||||
}
|
||||
lb := tsnet.TestHooks.LocalBackend(s)
|
||||
|
||||
var notifyCh chan struct{}
|
||||
if busWatcher {
|
||||
bw, err := lc.WatchIPNBus(ctx, 0)
|
||||
if err != nil {
|
||||
b.Fatalf("WatchIPNBus: %v", err)
|
||||
}
|
||||
b.Cleanup(func() { bw.Close() })
|
||||
notifyCh = make(chan struct{}, 1024)
|
||||
go func() {
|
||||
for {
|
||||
n, err := bw.Next()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if n.NetMap != nil || len(n.PeerChanges) > 0 {
|
||||
select {
|
||||
case notifyCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
var prevAdded *tailcfg.Node
|
||||
runtime.GC()
|
||||
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
added := streamer.AllocPeer()
|
||||
mr := &tailcfg.MapResponse{
|
||||
PeersChanged: []*tailcfg.Node{added},
|
||||
}
|
||||
if prevAdded != nil {
|
||||
mr.PeersRemoved = []tailcfg.NodeID{prevAdded.ID}
|
||||
}
|
||||
prevAdded = added
|
||||
|
||||
if err := streamer.SendDelta(ctx, mr); err != nil {
|
||||
b.Fatalf("SendDelta: %v", err)
|
||||
}
|
||||
|
||||
if busWatcher {
|
||||
// A Notify firing is itself part of the workload we
|
||||
// want to measure on this variant.
|
||||
select {
|
||||
case <-notifyCh:
|
||||
case <-time.After(10 * time.Second):
|
||||
b.Fatal("timed out waiting for notify")
|
||||
case <-ctx.Done():
|
||||
b.Fatalf("ctx done waiting for notify: %v", ctx.Err())
|
||||
}
|
||||
} else {
|
||||
// Block on the LocalBackend's test-only signal that
|
||||
// the just-added peer key has landed in the netmap.
|
||||
// No polling, no notify fan-out cost.
|
||||
select {
|
||||
case <-lb.AwaitNodeKeyForTest(added.Key):
|
||||
case <-time.After(10 * time.Second):
|
||||
b.Fatalf("timed out waiting for node key %v", added.Key)
|
||||
case <-ctx.Done():
|
||||
b.Fatalf("ctx done waiting for node key: %v", ctx.Err())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user