diff --git a/cmd/k8s-operator/depaware.txt b/cmd/k8s-operator/depaware.txt index cbb4738d7..3c8e35542 100644 --- a/cmd/k8s-operator/depaware.txt +++ b/cmd/k8s-operator/depaware.txt @@ -914,6 +914,7 @@ tailscale.com/cmd/k8s-operator dependencies: (generated by github.com/tailscale/ tailscale.com/types/bools from tailscale.com/tsnet+ tailscale.com/types/dnstype from tailscale.com/ipn/ipnlocal+ tailscale.com/types/empty from tailscale.com/ipn+ + tailscale.com/types/events from tailscale.com/control/controlclient+ tailscale.com/types/ipproto from tailscale.com/net/flowtrack+ tailscale.com/types/key from tailscale.com/client/local+ tailscale.com/types/lazy from tailscale.com/ipn/ipnlocal+ diff --git a/cmd/tailscaled/depaware-min.txt b/cmd/tailscaled/depaware-min.txt index a3a747c80..43b61f7f3 100644 --- a/cmd/tailscaled/depaware-min.txt +++ b/cmd/tailscaled/depaware-min.txt @@ -132,6 +132,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de tailscale.com/types/appctype from tailscale.com/ipn/ipnlocal+ tailscale.com/types/dnstype from tailscale.com/client/tailscale/apitype+ tailscale.com/types/empty from tailscale.com/ipn+ + tailscale.com/types/events from tailscale.com/control/controlclient+ tailscale.com/types/flagtype from tailscale.com/cmd/tailscaled tailscale.com/types/ipproto from tailscale.com/ipn+ tailscale.com/types/key from tailscale.com/control/controlbase+ diff --git a/cmd/tailscaled/depaware-minbox.txt b/cmd/tailscaled/depaware-minbox.txt index 133365cc9..0da47e03a 100644 --- a/cmd/tailscaled/depaware-minbox.txt +++ b/cmd/tailscaled/depaware-minbox.txt @@ -151,6 +151,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de tailscale.com/types/appctype from tailscale.com/ipn/ipnlocal+ tailscale.com/types/dnstype from tailscale.com/client/tailscale/apitype+ tailscale.com/types/empty from tailscale.com/ipn+ + tailscale.com/types/events from tailscale.com/control/controlclient+ tailscale.com/types/flagtype from tailscale.com/cmd/tailscaled tailscale.com/types/ipproto from tailscale.com/ipn+ tailscale.com/types/key from tailscale.com/client/local+ diff --git a/cmd/tailscaled/depaware.txt b/cmd/tailscaled/depaware.txt index a7ecc865c..8af768895 100644 --- a/cmd/tailscaled/depaware.txt +++ b/cmd/tailscaled/depaware.txt @@ -407,6 +407,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de tailscale.com/types/bools from tailscale.com/wgengine/netlog tailscale.com/types/dnstype from tailscale.com/ipn/ipnlocal+ tailscale.com/types/empty from tailscale.com/ipn+ + tailscale.com/types/events from tailscale.com/control/controlclient+ tailscale.com/types/flagtype from tailscale.com/cmd/tailscaled tailscale.com/types/ipproto from tailscale.com/net/flowtrack+ tailscale.com/types/key from tailscale.com/client/local+ diff --git a/cmd/tsidp/depaware.txt b/cmd/tsidp/depaware.txt index 2f1190393..123df14ce 100644 --- a/cmd/tsidp/depaware.txt +++ b/cmd/tsidp/depaware.txt @@ -315,6 +315,7 @@ tailscale.com/cmd/tsidp dependencies: (generated by github.com/tailscale/depawar tailscale.com/types/bools from tailscale.com/tsnet+ tailscale.com/types/dnstype from tailscale.com/client/local+ tailscale.com/types/empty from tailscale.com/ipn+ + tailscale.com/types/events from tailscale.com/control/controlclient+ tailscale.com/types/ipproto from tailscale.com/ipn+ tailscale.com/types/key from tailscale.com/client/local+ tailscale.com/types/lazy from tailscale.com/cmd/tsidp+ diff --git a/control/controlclient/auto.go b/control/controlclient/auto.go index e96aeec5f..195525228 100644 --- a/control/controlclient/auto.go +++ b/control/controlclient/auto.go @@ -91,7 +91,7 @@ func (c *Auto) updateRoutine() { bo.BackOff(ctx, err) continue } - bo.BackOff(ctx, nil) + bo.Reset() c.direct.logf("[v1] successful lite map update in %v", d) lastUpdateGenInformed = gen @@ -382,7 +382,7 @@ func (c *Auto) authRoutine() { // backoff to avoid a busy loop. bo.BackOff(ctx, errors.New("login URL not changing")) } else { - bo.BackOff(ctx, nil) + bo.Reset() } continue } @@ -397,7 +397,7 @@ func (c *Auto) authRoutine() { c.sendStatus("authRoutine-success", nil, "", nil) c.restartMap() - bo.BackOff(ctx, nil) + bo.Reset() } } @@ -446,13 +446,14 @@ func (mrs mapRoutineState) UpdateFullNetmap(nm *netmap.NetworkMap) { c.expiry = nm.SelfKeyExpiry() stillAuthed := c.loggedIn c.logf("[v1] mapRoutine: netmap received: loggedIn=%v inMapPoll=true", stillAuthed) + + // Reset the backoff timer if we got a netmap. + mrs.bo.Reset() c.mu.Unlock() if stillAuthed { c.sendStatus("mapRoutine-got-netmap", nil, "", nm) } - // Reset the backoff timer if we got a netmap. - mrs.bo.Reset() } func (mrs mapRoutineState) UpdateNetmapDelta(muts []netmap.NodeMutation) bool { @@ -526,13 +527,18 @@ func (c *Auto) mapRoutine() { c.mu.Lock() c.inMapPoll = false paused := c.paused + + if paused { + mrs.bo.Reset() + } else { + mrs.bo.BackOff(ctx, err) + } c.mu.Unlock() + // Now safe to call functions that might acquire the mutex if paused { - mrs.bo.BackOff(ctx, nil) c.logf("mapRoutine: paused") } else { - mrs.bo.BackOff(ctx, err) report(err, "PollNetMap") } } diff --git a/control/controlclient/direct.go b/control/controlclient/direct.go index d6189cf9b..593aa463d 100644 --- a/control/controlclient/direct.go +++ b/control/controlclient/direct.go @@ -46,6 +46,7 @@ import ( "tailscale.com/tailcfg" "tailscale.com/tka" "tailscale.com/tstime" + "tailscale.com/types/events" "tailscale.com/types/key" "tailscale.com/types/logger" "tailscale.com/types/netmap" @@ -106,8 +107,9 @@ type Direct struct { netinfo *tailcfg.NetInfo endpoints []tailcfg.Endpoint tkaHead string - lastPingURL string // last PingRequest.URL received, for dup suppression - connectionHandleForTest string // sent in MapRequest.ConnectionHandleForTest + lastPingURL string // last PingRequest.URL received, for dup suppression + connectionHandleForTest string // sent in MapRequest.ConnectionHandleForTest + streamingMapSession *mapSession // the one streaming mapSession instance controlClientID int64 // Random ID used to differentiate clients for consumers of messages. } @@ -348,6 +350,38 @@ func NewDirect(opts Options) (*Direct, error) { c.clientVersionPub = eventbus.Publish[tailcfg.ClientVersion](c.busClient) c.autoUpdatePub = eventbus.Publish[AutoUpdate](c.busClient) c.controlTimePub = eventbus.Publish[ControlTime](c.busClient) + discoKeyPub := eventbus.Publish[events.PeerDiscoKeyUpdate](c.busClient) + eventbus.SubscribeFunc(c.busClient, func(update events.DiscoKeyAdvertisement) { + c.mu.Lock() + defer c.mu.Unlock() + c.logf("controlclient direct: got TSMP disco key advertisement from %v via eventbus", update.Src) + if c.streamingMapSession != nil { + nm := c.streamingMapSession.netmap() + peer, ok := nm.PeerByTailscaleIP(update.Src) + if !ok { + return + } + c.logf("controlclient direct: updating discoKey for %v via mapSession", update.Src) + + // If we update without error, return. If the err indicates that the + // mapSession has gone away, we want to fall back to pushing the key + // further down the chain. + if err := c.streamingMapSession.updateDiscoForNode( + peer.ID(), update.Key, time.Now(), false); err == nil || + !errors.Is(err, ErrChangeQueueClosed) { + return + } + } + + // We need to push the update further down the chain. Either because we do + // not have a mapSession (we are not connected to control) or because the + // mapSession queue has closed. + c.logf("controlclient direct: updating discoKey for %v via magicsock", update.Src) + discoKeyPub.Publish(events.PeerDiscoKeyUpdate{ + Src: update.Src, + Key: update.Key, + }) + }) return c, nil } @@ -821,21 +855,28 @@ func (c *Direct) PollNetMap(ctx context.Context, nu NetmapUpdater) error { return c.sendMapRequest(ctx, true, nu) } +// rememberLastNetmapUpdater is a container that remembers the last netmap +// update it observed. It is used by tests and [NetmapFromMapResponseForDebug]. +// It will report only the first netmap seen. type rememberLastNetmapUpdater struct { last *netmap.NetworkMap + done chan any } func (nu *rememberLastNetmapUpdater) UpdateFullNetmap(nm *netmap.NetworkMap) { nu.last = nm + nu.done <- nil } // FetchNetMapForTest fetches the netmap once. func (c *Direct) FetchNetMapForTest(ctx context.Context) (*netmap.NetworkMap, error) { var nu rememberLastNetmapUpdater + nu.done = make(chan any) err := c.sendMapRequest(ctx, false, &nu) if err == nil && nu.last == nil { return nil, errors.New("[unexpected] sendMapRequest success without callback") } + <-nu.done return nu.last, err } @@ -1080,8 +1121,18 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu Netmap return nil } + if c.streamingMapSession != nil { + panic("mapSession is already set") + } + sess := newMapSession(persist.PrivateNodeKey(), nu, c.controlKnobs) - defer sess.Close() + c.streamingMapSession = sess + defer func() { + sess.Close() + c.mu.Lock() + c.streamingMapSession = nil + c.mu.Unlock() + }() sess.cancel = cancel sess.logf = c.logf sess.vlogf = vlogf @@ -1235,7 +1286,7 @@ func NetmapFromMapResponseForDebug(ctx context.Context, pr persist.PersistView, return nil, errors.New("PersistView invalid") } - nu := &rememberLastNetmapUpdater{} + nu := &rememberLastNetmapUpdater{done: make(chan any)} sess := newMapSession(pr.PrivateNodeKey(), nu, nil) defer sess.Close() @@ -1243,6 +1294,7 @@ func NetmapFromMapResponseForDebug(ctx context.Context, pr persist.PersistView, return nil, fmt.Errorf("HandleNonKeepAliveMapResponse: %w", err) } + <-nu.done return sess.netmap(), nil } @@ -1303,10 +1355,10 @@ var jsonEscapedZero = []byte(`\u0000`) const justKeepAliveStr = `{"KeepAlive":true}` // decodeMsg is responsible for uncompressing msg and unmarshaling into v. -func (sess *mapSession) decodeMsg(compressedMsg []byte, v *tailcfg.MapResponse) error { +func (ms *mapSession) decodeMsg(compressedMsg []byte, v *tailcfg.MapResponse) error { // Fast path for common case of keep-alive message. // See tailscale/tailscale#17343. - if sess.keepAliveZ != nil && bytes.Equal(compressedMsg, sess.keepAliveZ) { + if ms.keepAliveZ != nil && bytes.Equal(compressedMsg, ms.keepAliveZ) { v.KeepAlive = true return nil } @@ -1315,7 +1367,7 @@ func (sess *mapSession) decodeMsg(compressedMsg []byte, v *tailcfg.MapResponse) if err != nil { return err } - sess.ztdDecodesForTest++ + ms.ztdDecodesForTest++ if DevKnob.DumpNetMaps() { var buf bytes.Buffer @@ -1330,7 +1382,7 @@ func (sess *mapSession) decodeMsg(compressedMsg []byte, v *tailcfg.MapResponse) return fmt.Errorf("response: %v", err) } if v.KeepAlive && string(b) == justKeepAliveStr { - sess.keepAliveZ = compressedMsg + ms.keepAliveZ = compressedMsg } return nil } diff --git a/control/controlclient/map.go b/control/controlclient/map.go index f33620edd..1a0ab0037 100644 --- a/control/controlclient/map.go +++ b/control/controlclient/map.go @@ -9,6 +9,7 @@ import ( "crypto/sha256" "encoding/hex" "encoding/json" + "errors" "io" "maps" "net" @@ -96,6 +97,10 @@ type mapSession struct { lastPopBrowserURL string lastTKAInfo *tailcfg.TKAInfo lastNetmapSummary string // from NetworkMap.VeryConcise + cqmu sync.Mutex + changeQueue chan (*tailcfg.MapResponse) + changeQueueClosed bool + processQueue sync.WaitGroup } // newMapSession returns a mostly unconfigured new mapSession. @@ -118,11 +123,48 @@ func newMapSession(privateNodeKey key.NodePrivate, nu NetmapUpdater, controlKnob cancel: func() {}, onDebug: func(context.Context, *tailcfg.Debug) error { return nil }, onSelfNodeChanged: func(*netmap.NetworkMap) {}, + changeQueue: make(chan *tailcfg.MapResponse), + changeQueueClosed: false, } ms.sessionAliveCtx, ms.sessionAliveCtxClose = context.WithCancel(context.Background()) + ms.processQueue.Add(1) + go ms.run() return ms } +// run starts the mapSession processing a queue of tailcfg.MapResponse one by +// one until close() is called on the mapSession. +// When the mapSession is closed, the remaining queue is locked and processed +// before the mapSession is done processing. +func (ms *mapSession) run() { + defer ms.processQueue.Done() + + for { + select { + case change := <-ms.changeQueue: + ms.handleNonKeepAliveMapResponse(ms.sessionAliveCtx, change) + case <-ms.sessionAliveCtx.Done(): + // Drain any remaining items in the queue before exiting. + // Lock the queue during this time to avoid updates through other channels + // to be overwritten. This is especially relevant for calls to + // updateDiscoForNode. + ms.cqmu.Lock() + ms.changeQueueClosed = true + ms.cqmu.Unlock() + for { + select { + case change := <-ms.changeQueue: + ms.handleNonKeepAliveMapResponse(ms.sessionAliveCtx, change) + default: + // Queue is empty, close it and exit + close(ms.changeQueue) + return + } + } + } + } +} + // occasionallyPrintSummary logs summary at most once very 5 minutes. The // summary is the Netmap.VeryConcise result from the last received map response. func (ms *mapSession) occasionallyPrintSummary(summary string) { @@ -143,9 +185,48 @@ func (ms *mapSession) clock() tstime.Clock { func (ms *mapSession) Close() { ms.sessionAliveCtxClose() + ms.processQueue.Wait() } -// HandleNonKeepAliveMapResponse handles a non-KeepAlive MapResponse (full or +var ErrChangeQueueClosed = errors.New("change queue closed") + +func (ms *mapSession) updateDiscoForNode(id tailcfg.NodeID, key key.DiscoPublic, lastSeen time.Time, online bool) error { + ms.cqmu.Lock() + + if ms.changeQueueClosed { + ms.cqmu.Unlock() + ms.processQueue.Wait() + return ErrChangeQueueClosed + } + + resp := &tailcfg.MapResponse{ + PeersChangedPatch: []*tailcfg.PeerChange{{ + NodeID: id, + LastSeen: &lastSeen, + Online: &online, + DiscoKey: &key, + }}, + } + ms.changeQueue <- resp + ms.cqmu.Unlock() + return nil +} + +func (ms *mapSession) HandleNonKeepAliveMapResponse(ctx context.Context, resp *tailcfg.MapResponse) error { + ms.cqmu.Lock() + + if ms.changeQueueClosed { + ms.cqmu.Unlock() + ms.processQueue.Wait() + return ErrChangeQueueClosed + } + + ms.changeQueue <- resp + ms.cqmu.Unlock() + return nil +} + +// handleNonKeepAliveMapResponse handles a non-KeepAlive MapResponse (full or // incremental). // // All fields that are valid on a KeepAlive MapResponse have already been @@ -153,7 +234,7 @@ func (ms *mapSession) Close() { // // TODO(bradfitz): make this handle all fields later. For now (2023-08-20) this // is [re]factoring progress enough. -func (ms *mapSession) HandleNonKeepAliveMapResponse(ctx context.Context, resp *tailcfg.MapResponse) error { +func (ms *mapSession) handleNonKeepAliveMapResponse(ctx context.Context, resp *tailcfg.MapResponse) error { if debug := resp.Debug; debug != nil { if err := ms.onDebug(ctx, debug); err != nil { return err @@ -199,6 +280,8 @@ func (ms *mapSession) HandleNonKeepAliveMapResponse(ctx context.Context, resp *t ms.patchifyPeersChanged(resp) + ms.removeUnwantedDiscoUpdates(resp) + ms.updateStateFromResponse(resp) if ms.tryHandleIncrementally(resp) { @@ -281,6 +364,48 @@ type updateStats struct { changed int } +// removeUnwantedDiscoUpdates goes over the patchified updates and reject items +// where the node is offline and has last been seen before the recorded last seen. +func (ms *mapSession) removeUnwantedDiscoUpdates(resp *tailcfg.MapResponse) { + existingMap := ms.netmap() + acceptedDiscoUpdates := resp.PeersChangedPatch[:0] + + for _, change := range resp.PeersChangedPatch { + // Accept if: + // - DiscoKey is nil and did not change. + // - Fields we rely on for rejection is missing. + if change.DiscoKey == nil || change.Online == nil || change.LastSeen == nil { + acceptedDiscoUpdates = append(acceptedDiscoUpdates, change) + continue + } + + // Accept if: + // - Node is online. + if *change.Online { + acceptedDiscoUpdates = append(acceptedDiscoUpdates, change) + continue + } + + peerIdx := existingMap.PeerIndexByNodeID(change.NodeID) + // Accept if: + // - Cannot find the peer, don't have enough data + if peerIdx < 0 { + acceptedDiscoUpdates = append(acceptedDiscoUpdates, change) + continue + } + existingNode := existingMap.Peers[peerIdx] + + // Accept if: + // - lastSeen moved forward in time. + if existingLastSeen, ok := existingNode.LastSeen().GetOk(); ok && + change.LastSeen.After(existingLastSeen) { + acceptedDiscoUpdates = append(acceptedDiscoUpdates, change) + } + } + + resp.PeersChangedPatch = acceptedDiscoUpdates +} + // updateStateFromResponse updates ms from res. It takes ownership of res. func (ms *mapSession) updateStateFromResponse(resp *tailcfg.MapResponse) { ms.updatePeersStateFromResponse(resp) diff --git a/control/controlclient/map_test.go b/control/controlclient/map_test.go index 5a0ccfd82..0a2838df9 100644 --- a/control/controlclient/map_test.go +++ b/control/controlclient/map_test.go @@ -623,6 +623,90 @@ func TestNetmapForResponse(t *testing.T) { }) } +func TestUpdateDiscoForNode(t *testing.T) { + tests := []struct { + name string + initialOnline bool + initialLastSeen time.Time + updateOnline bool + updateLastSeen time.Time + wantUpdate bool + }{ + { + name: "newer_key_not_online", + initialOnline: true, + initialLastSeen: time.Unix(1, 0), + updateOnline: false, + updateLastSeen: time.Now(), + wantUpdate: true, + }, + { + name: "newer_key_online", + initialOnline: true, + initialLastSeen: time.Unix(1, 0), + updateOnline: true, + updateLastSeen: time.Now(), + wantUpdate: true, + }, + { + name: "older_key_not_online", + initialOnline: false, + initialLastSeen: time.Now(), + updateOnline: false, + updateLastSeen: time.Unix(1, 0), + wantUpdate: false, + }, + { + name: "older_key_online", + initialOnline: false, + initialLastSeen: time.Now(), + updateOnline: true, + updateLastSeen: time.Unix(1, 0), + wantUpdate: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + nu := &rememberLastNetmapUpdater{ + done: make(chan any), + } + ms := newTestMapSession(t, nu) + + oldKey := key.NewDisco() + + // Insert existing node + node := tailcfg.Node{ + ID: 1, + DiscoKey: oldKey.Public(), + Online: &tt.initialOnline, + LastSeen: &tt.initialLastSeen, + } + + if nm := ms.netmapForResponse(&tailcfg.MapResponse{ + Peers: []*tailcfg.Node{&node}, + }); len(nm.Peers) != 1 { + t.Fatalf("node not inserted") + } + + newKey := key.NewDisco() + ms.updateDiscoForNode(node.ID, newKey.Public(), tt.updateLastSeen, tt.updateOnline) + <-nu.done + + nm := ms.netmap() + peerIdx := nm.PeerIndexByNodeID(node.ID) + if peerIdx == -1 { + t.Fatal("node not found") + } + + updated := nm.Peers[peerIdx].DiscoKey().Compare(newKey.Public()) == 0 + if updated != tt.wantUpdate { + t.Fatalf("Disco key update: %t, wanted update: %t", updated, tt.wantUpdate) + } + }) + } +} + func first[T any](s []T) T { if len(s) == 0 { var zero T @@ -1098,6 +1182,8 @@ func BenchmarkMapSessionDelta(b *testing.B) { ctx := context.Background() nu := &countingNetmapUpdater{} ms := newTestMapSession(b, nu) + // Disable log output for benchmarks to avoid races + ms.logf = func(string, ...any) {} res := &tailcfg.MapResponse{ Node: &tailcfg.Node{ ID: 1, diff --git a/net/tstun/wrap.go b/net/tstun/wrap.go index d0f5b4a0d..83185527e 100644 --- a/net/tstun/wrap.go +++ b/net/tstun/wrap.go @@ -30,6 +30,7 @@ import ( "tailscale.com/net/tsaddr" "tailscale.com/syncs" "tailscale.com/tstime/mono" + "tailscale.com/types/events" "tailscale.com/types/ipproto" "tailscale.com/types/key" "tailscale.com/types/logger" @@ -220,7 +221,7 @@ type Wrapper struct { metrics *metrics eventClient *eventbus.Client - discoKeyAdvertisementPub *eventbus.Publisher[DiscoKeyAdvertisement] + discoKeyAdvertisementPub *eventbus.Publisher[events.DiscoKeyAdvertisement] } type metrics struct { @@ -296,7 +297,7 @@ func wrap(logf logger.Logf, tdev tun.Device, isTAP bool, m *usermetric.Registry, } w.eventClient = bus.Client("net.tstun") - w.discoKeyAdvertisementPub = eventbus.Publish[DiscoKeyAdvertisement](w.eventClient) + w.discoKeyAdvertisementPub = eventbus.Publish[events.DiscoKeyAdvertisement](w.eventClient) w.vectorBuffer = make([][]byte, tdev.BatchSize()) for i := range w.vectorBuffer { @@ -1144,13 +1145,6 @@ func (t *Wrapper) injectedRead(res tunInjectedRead, outBuffs [][]byte, sizes []i return n, err } -// DiscoKeyAdvertisement is a TSMP message used for distributing disco keys. -// This struct is used an an event on the [eventbus.Bus]. -type DiscoKeyAdvertisement struct { - Src netip.Addr // Src field is populated by the IP header of the packet, not from the payload itself. - Key key.DiscoPublic -} - func (t *Wrapper) filterPacketInboundFromWireGuard(p *packet.Parsed, captHook packet.CaptureCallback, pc *peerConfigTable, gro *gro.GRO) (filter.Response, *gro.GRO) { if captHook != nil { captHook(packet.FromPeer, t.now(), p.Buffer(), p.CaptureMeta) @@ -1163,7 +1157,7 @@ func (t *Wrapper) filterPacketInboundFromWireGuard(p *packet.Parsed, captHook pa return filter.DropSilently, gro } else if discoKeyAdvert, ok := p.AsTSMPDiscoAdvertisement(); ok { if buildfeatures.HasCacheNetMap && envknob.Bool("TS_USE_CACHED_NETMAP") { - t.discoKeyAdvertisementPub.Publish(DiscoKeyAdvertisement{ + t.discoKeyAdvertisementPub.Publish(events.DiscoKeyAdvertisement{ Src: discoKeyAdvert.Src, Key: discoKeyAdvert.Key, }) diff --git a/tsnet/depaware.txt b/tsnet/depaware.txt index 79700c713..910314ef1 100644 --- a/tsnet/depaware.txt +++ b/tsnet/depaware.txt @@ -310,6 +310,7 @@ tailscale.com/tsnet dependencies: (generated by github.com/tailscale/depaware) tailscale.com/types/bools from tailscale.com/tsnet+ tailscale.com/types/dnstype from tailscale.com/client/local+ tailscale.com/types/empty from tailscale.com/ipn+ + tailscale.com/types/events from tailscale.com/control/controlclient+ tailscale.com/types/ipproto from tailscale.com/ipn+ tailscale.com/types/key from tailscale.com/client/local+ tailscale.com/types/lazy from tailscale.com/hostinfo+ diff --git a/types/events/disco_update.go b/types/events/disco_update.go new file mode 100644 index 000000000..206c554a1 --- /dev/null +++ b/types/events/disco_update.go @@ -0,0 +1,30 @@ +// Copyright (c) Tailscale Inc & contributors +// SPDX-License-Identifier: BSD-3-Clause + +// Package events contains type used as eventbus topics in tailscaled. +package events + +import ( + "net/netip" + + "tailscale.com/types/key" +) + +// DiscoKeyAdvertisement is an event sent on the [eventbus.Bus] when a disco +// key has been received over TSMP. +// +// Its publisher is [tstun.Wrapper]; its main subscriber is +// [controlclient.Direct], that injects the received key into the netmap as if +// it was a netmap update from control. +type DiscoKeyAdvertisement struct { + Src netip.Addr // Src field is populated by the IP header of the packet, not from the payload itself. + Key key.DiscoPublic +} + +// PeerDiscoKeyUpdate is an event sent on the [eventbus.Bus] when +// [controlclient.Direct] deems that it cannot handle the key update. +// +// Its publisher is [controlclient.Direct]; its main subscriber is +// [wgengine.userspaceengine], that injects the received key into its +// [magicsock.Conn] in order to set up the key directly. +type PeerDiscoKeyUpdate DiscoKeyAdvertisement diff --git a/wgengine/magicsock/endpoint.go b/wgengine/magicsock/endpoint.go index 5f493027b..f322ebaeb 100644 --- a/wgengine/magicsock/endpoint.go +++ b/wgengine/magicsock/endpoint.go @@ -1465,6 +1465,19 @@ func (de *endpoint) setLastPing(ipp netip.AddrPort, now mono.Time) { state.lastPing = now } +// updateDiscoKey replaces the disco key for de. If the key is a zero value key, +// set the key to nil. +func (de *endpoint) updateDiscoKey(key key.DiscoPublic) { + if key.IsZero() { + de.disco.Store(nil) + } else { + de.disco.Store(&endpointDisco{ + key: key, + short: key.ShortString(), + }) + } +} + // updateFromNode updates the endpoint based on a tailcfg.Node from a NetMap // update. func (de *endpoint) updateFromNode(n tailcfg.NodeView, heartbeatDisabled bool, probeUDPLifetimeEnabled bool) { @@ -1490,15 +1503,12 @@ func (de *endpoint) updateFromNode(n tailcfg.NodeView, heartbeatDisabled bool, p if discoKey != n.DiscoKey() { de.c.logf("[v1] magicsock: disco: node %s changed from %s to %s", de.publicKey.ShortString(), discoKey, n.DiscoKey()) - de.disco.Store(&endpointDisco{ - key: n.DiscoKey(), - short: n.DiscoKey().ShortString(), - }) + key := n.DiscoKey() + de.updateDiscoKey(key) de.debugUpdates.Add(EndpointChange{ When: time.Now(), What: "updateFromNode-resetLocked", }) - de.resetLocked() } if n.HomeDERP() == 0 { if de.derpAddr.IsValid() { diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index 5c16750f7..4ad8022d8 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -3144,14 +3144,7 @@ func (c *Conn) updateNodes(self tailcfg.NodeView, peers []tailcfg.NodeView) (pee ep.nodeAddr = n.Addresses().At(0).Addr() } ep.initFakeUDPAddr() - if n.DiscoKey().IsZero() { - ep.disco.Store(nil) - } else { - ep.disco.Store(&endpointDisco{ - key: n.DiscoKey(), - short: n.DiscoKey().ShortString(), - }) - } + ep.updateDiscoKey(n.DiscoKey()) if debugPeerMap() { c.logEndpointCreated(n) @@ -4288,10 +4281,7 @@ func (c *Conn) HandleDiscoKeyAdvertisement(node tailcfg.NodeView, update packet. return } c.discoInfoForKnownPeerLocked(discoKey) - ep.disco.Store(&endpointDisco{ - key: discoKey, - short: discoKey.ShortString(), - }) + ep.updateDiscoKey(discoKey) c.peerMap.upsertEndpoint(ep, oldDiscoKey) c.logf("magicsock: updated disco key for peer %v to %v", nodeKey.ShortString(), discoKey.ShortString()) metricTSMPDiscoKeyAdvertisementApplied.Add(1) diff --git a/wgengine/userspace.go b/wgengine/userspace.go index ecf3c2298..5670541af 100644 --- a/wgengine/userspace.go +++ b/wgengine/userspace.go @@ -42,6 +42,7 @@ import ( "tailscale.com/tailcfg" "tailscale.com/tstime/mono" "tailscale.com/types/dnstype" + "tailscale.com/types/events" "tailscale.com/types/ipproto" "tailscale.com/types/key" "tailscale.com/types/logger" @@ -597,7 +598,7 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) } e.linkChangeQueue.Add(func() { e.linkChange(&cd) }) }) - eventbus.SubscribeFunc(ec, func(update tstun.DiscoKeyAdvertisement) { + eventbus.SubscribeFunc(ec, func(update events.PeerDiscoKeyUpdate) { e.logf("wgengine: got TSMP disco key advertisement from %v via eventbus", update.Src) if e.magicConn == nil { e.logf("wgengine: no magicConn")