control/controlclient: flesh out mapSession to break up gigantic method

Now mapSession has a bunch more fields and methods, rather than being
just one massive func with a ton of local variables.

So far there are no major new optimizations, though. It should behave
the same as before.

This has been done with an eye towards testability (so tests can set
all the callback funcs as needed, or not, without a huge Direct client
or long-running HTTP requests), but this change doesn't add new tests
yet. That will follow in the changes which flesh out the NetmapUpdater
interface.

Updates #1909

Change-Id: Iad4e7442d5bbbe2614bd4b1dc4b02e27504898df
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
Brad Fitzpatrick
2023-08-20 19:52:52 -07:00
committed by Brad Fitzpatrick
parent 1b223566dd
commit b5ff68a968
3 changed files with 284 additions and 114 deletions
+76 -103
View File
@@ -54,7 +54,6 @@ import (
"tailscale.com/types/ptr"
"tailscale.com/types/tkatype"
"tailscale.com/util/clientmetric"
"tailscale.com/util/cmpx"
"tailscale.com/util/multierr"
"tailscale.com/util/singleflight"
"tailscale.com/util/systemd"
@@ -806,10 +805,10 @@ func (c *Direct) SendUpdate(ctx context.Context) error {
return c.sendMapRequest(ctx, false, nil)
}
// If we go more than pollTimeout without hearing from the server,
// If we go more than watchdogTimeout without hearing from the server,
// end the long poll. We should be receiving a keep alive ping
// every minute.
const pollTimeout = 120 * time.Second
const watchdogTimeout = 120 * time.Second
// sendMapRequest makes a /map request to download the network map, calling cb
// with each new netmap. If isStreaming, it will poll forever and only returns
@@ -956,39 +955,48 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu Netmap
return nil
}
timeout, timeoutChannel := c.clock.NewTimer(pollTimeout)
timeoutReset := make(chan struct{})
pollDone := make(chan struct{})
defer close(pollDone)
go func() {
for {
select {
case <-pollDone:
vlogf("netmap: ending timeout goroutine")
return
case <-timeoutChannel:
c.logf("map response long-poll timed out!")
cancel()
return
case <-timeoutReset:
if !timeout.Stop() {
select {
case <-timeoutChannel:
case <-pollDone:
vlogf("netmap: ending timeout goroutine")
return
}
}
vlogf("netmap: reset timeout timer")
timeout.Reset(pollTimeout)
}
}
}()
var mapResIdx int // 0 for first message, then 1+ for deltas
sess := newMapSession(persist.PrivateNodeKey())
sess := newMapSession(persist.PrivateNodeKey(), nu)
defer sess.Close()
sess.cancel = cancel
sess.logf = c.logf
sess.vlogf = vlogf
sess.altClock = c.clock
sess.machinePubKey = machinePubKey
sess.onDebug = c.handleDebugMessage
sess.onConciseNetMapSummary = func(summary string) {
// Occasionally print the netmap header.
// This is handy for debugging, and our logs processing
// pipeline depends on it. (TODO: Remove this dependency.)
now := c.clock.Now()
if now.Sub(c.lastPrintMap) < 5*time.Minute {
return
}
c.lastPrintMap = now
c.logf("[v1] new network map[%d]:\n%s", mapResIdx, summary)
}
sess.onSelfNodeChanged = func(nm *netmap.NetworkMap) {
c.mu.Lock()
defer c.mu.Unlock()
// If we are the ones who last updated persist, then we can update it
// again. Otherwise, we should not touch it. Also, it's only worth
// change it if the Node info changed.
if persist == c.persist {
newPersist := persist.AsStruct()
newPersist.NodeID = nm.SelfNode.StableID
newPersist.UserProfile = nm.UserProfiles[nm.User()]
c.persist = newPersist.View()
persist = c.persist
}
c.expiry = nm.Expiry
}
sess.StartWatchdog()
// gotNonKeepAliveMessage is whether we've yet received a MapResponse message without
// KeepAlive set.
var gotNonKeepAliveMessage bool
// If allowStream, then the server will use an HTTP long poll to
// return incremental results. There is always one response right
@@ -997,8 +1005,8 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu Netmap
// the same format before just closing the connection.
// We can use this same read loop either way.
var msg []byte
for i := 0; i == 0 || isStreaming; i++ {
vlogf("netmap: starting size read after %v (poll %v)", time.Since(t0).Round(time.Millisecond), i)
for ; mapResIdx == 0 || isStreaming; mapResIdx++ {
vlogf("netmap: starting size read after %v (poll %v)", time.Since(t0).Round(time.Millisecond), mapResIdx)
var siz [4]byte
if _, err := io.ReadFull(res.Body, siz[:]); err != nil {
vlogf("netmap: size read error after %v: %v", time.Since(t0).Round(time.Millisecond), err)
@@ -1062,7 +1070,7 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu Netmap
}
select {
case timeoutReset <- struct{}{}:
case sess.watchdogReset <- struct{}{}:
vlogf("netmap: sent timer reset")
case <-ctx.Done():
c.logf("[v1] netmap: not resetting timer; context done: %v", ctx.Err())
@@ -1074,75 +1082,19 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu Netmap
}
metricMapResponseMap.Add(1)
if i > 0 {
if gotNonKeepAliveMessage {
// If we've already seen a non-keep-alive message, this is a delta update.
metricMapResponseMapDelta.Add(1)
} else if resp.Node == nil {
// The very first non-keep-alive message should have Node populated.
c.logf("initial MapResponse lacked Node")
return errors.New("initial MapResponse lacked node")
}
gotNonKeepAliveMessage = true
if debug := resp.Debug; debug != nil {
if code := debug.Exit; code != nil {
c.logf("exiting process with status %v per controlplane", *code)
os.Exit(*code)
}
if debug.DisableLogTail {
logtail.Disable()
envknob.SetNoLogsNoSupport()
}
if sleep := time.Duration(debug.SleepSeconds * float64(time.Second)); sleep > 0 {
if err := sleepAsRequested(ctx, c.logf, timeoutReset, sleep, c.clock); err != nil {
return err
}
}
if err := sess.HandleNonKeepAliveMapResponse(ctx, &resp); err != nil {
return err
}
// For responses that mutate the self node, check for updated nodeAttrs.
if resp.Node != nil {
setControlKnobsFromNodeAttrs(resp.Node.Capabilities)
}
// Call Node.InitDisplayNames on any changed nodes.
initDisplayNames(cmpx.Or(resp.Node, sess.lastNode).View(), &resp)
nm := sess.netmapForResponse(&resp)
if nm.SelfNode == nil {
c.logf("MapResponse lacked node")
return errors.New("MapResponse lacked node")
}
if DevKnob.StripEndpoints() {
for _, p := range resp.Peers {
p.Endpoints = nil
}
}
if DevKnob.StripCaps() {
nm.SelfNode.Capabilities = nil
}
// Occasionally print the netmap header.
// This is handy for debugging, and our logs processing
// pipeline depends on it. (TODO: Remove this dependency.)
// Code elsewhere prints netmap diffs every time they are received.
now := c.clock.Now()
if now.Sub(c.lastPrintMap) >= 5*time.Minute {
c.lastPrintMap = now
c.logf("[v1] new network map[%d]:\n%s", i, nm.VeryConcise())
}
c.mu.Lock()
// If we are the ones who last updated persist, then we can update it
// again. Otherwise, we should not touch it. Also, it's only worth
// change it if the Node info changed.
if persist == c.persist && resp.Node != nil {
newPersist := persist.AsStruct()
newPersist.NodeID = nm.SelfNode.StableID
newPersist.UserProfile = nm.UserProfiles[nm.User()]
c.persist = newPersist.View()
persist = c.persist
}
c.expiry = nm.Expiry
c.mu.Unlock()
nu.UpdateFullNetmap(nm)
}
if ctx.Err() != nil {
return ctx.Err()
@@ -1150,6 +1102,23 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu Netmap
return nil
}
func (c *Direct) handleDebugMessage(ctx context.Context, debug *tailcfg.Debug, watchdogReset chan<- struct{}) error {
if code := debug.Exit; code != nil {
c.logf("exiting process with status %v per controlplane", *code)
os.Exit(*code)
}
if debug.DisableLogTail {
logtail.Disable()
envknob.SetNoLogsNoSupport()
}
if sleep := time.Duration(debug.SleepSeconds * float64(time.Second)); sleep > 0 {
if err := sleepAsRequested(ctx, c.logf, watchdogReset, sleep, c.clock); err != nil {
return err
}
}
return nil
}
// initDisplayNames mutates any tailcfg.Nodes in resp to populate their display names,
// calling InitDisplayNames on each.
//
@@ -1538,7 +1507,11 @@ func answerC2NPing(logf logger.Logf, c2nHandler http.Handler, c *http.Client, pr
}
}
func sleepAsRequested(ctx context.Context, logf logger.Logf, timeoutReset chan<- struct{}, d time.Duration, clock tstime.Clock) error {
// sleepAsRequest implements the sleep for a tailcfg.Debug message requesting
// that the client sleep. The complication is that while we're sleeping (if for
// a long time), we need to periodically reset the watchdog timer before it
// expires.
func sleepAsRequested(ctx context.Context, logf logger.Logf, watchdogReset chan<- struct{}, d time.Duration, clock tstime.Clock) error {
const maxSleep = 5 * time.Minute
if d > maxSleep {
logf("sleeping for %v, capped from server-requested %v ...", maxSleep, d)
@@ -1547,7 +1520,7 @@ func sleepAsRequested(ctx context.Context, logf logger.Logf, timeoutReset chan<-
logf("sleeping for server-requested %v ...", d)
}
ticker, tickerChannel := clock.NewTicker(pollTimeout / 2)
ticker, tickerChannel := clock.NewTicker(watchdogTimeout / 2)
defer ticker.Stop()
timer, timerChannel := clock.NewTimer(d)
defer timer.Stop()
@@ -1559,7 +1532,7 @@ func sleepAsRequested(ctx context.Context, logf logger.Logf, timeoutReset chan<-
return nil
case <-tickerChannel:
select {
case timeoutReset <- struct{}{}:
case watchdogReset <- struct{}{}:
case <-timerChannel:
return nil
case <-ctx.Done():