@ -37,6 +37,11 @@ import (
"tailscale.com/wgengine/filter"
)
type responseWithSource struct {
response * tailcfg . MapResponse
viaTSMP bool
}
// mapSession holds the state over a long-polled "map" request to the
// control plane.
//
@ -98,7 +103,7 @@ type mapSession struct {
lastTKAInfo * tailcfg . TKAInfo
lastNetmapSummary string // from NetworkMap.VeryConcise
cqmu sync . Mutex
changeQueue chan ( * tailcfg . MapResponse )
changeQueue chan responseWithSource
changeQueueClosed bool
processQueue sync . WaitGroup
}
@ -123,7 +128,7 @@ func newMapSession(privateNodeKey key.NodePrivate, nu NetmapUpdater, controlKnob
cancel : func ( ) { } ,
onDebug : func ( context . Context , * tailcfg . Debug ) error { return nil } ,
onSelfNodeChanged : func ( * netmap . NetworkMap ) { } ,
changeQueue : make ( chan * tailcfg . MapRespons e) ,
changeQueue : make ( chan responseWithSourc e) ,
changeQueueClosed : false ,
}
ms . sessionAliveCtx , ms . sessionAliveCtxClose = context . WithCancel ( context . Background ( ) )
@ -142,7 +147,7 @@ func (ms *mapSession) run() {
for {
select {
case change := <- ms . changeQueue :
ms . handleNonKeepAliveMapResponse ( ms . sessionAliveCtx , change )
ms . handleNonKeepAliveMapResponse ( ms . sessionAliveCtx , change . response , change . viaTSMP )
case <- ms . sessionAliveCtx . Done ( ) :
// Drain any remaining items in the queue before exiting.
// Lock the queue during this time to avoid updates through other channels
@ -154,7 +159,7 @@ func (ms *mapSession) run() {
for {
select {
case change := <- ms . changeQueue :
ms . handleNonKeepAliveMapResponse ( ms . sessionAliveCtx , change )
ms . handleNonKeepAliveMapResponse ( ms . sessionAliveCtx , change . response , change . viaTSMP )
default :
// Queue is empty, close it and exit
close ( ms . changeQueue )
@ -190,7 +195,7 @@ func (ms *mapSession) Close() {
var ErrChangeQueueClosed = errors . New ( "change queue closed" )
func ( ms * mapSession ) updateDiscoForNode ( id tailcfg . NodeID , key key . DiscoPublic , lastSeen time . Time , online bool ) error {
func ( ms * mapSession ) updateDiscoForNode ( id tailcfg . NodeID , key key . NodePublic , discoKey key . DiscoPublic , lastSeen time . Time , online bool ) error {
ms . cqmu . Lock ( )
if ms . changeQueueClosed {
@ -199,13 +204,17 @@ func (ms *mapSession) updateDiscoForNode(id tailcfg.NodeID, key key.DiscoPublic,
return ErrChangeQueueClosed
}
resp := & tailcfg . MapResponse {
PeersChangedPatch : [ ] * tailcfg . PeerChange { {
NodeID : id ,
LastSeen : & lastSeen ,
Online : & online ,
DiscoKey : & key ,
} } ,
resp := responseWithSource {
response : & tailcfg . MapResponse {
PeersChangedPatch : [ ] * tailcfg . PeerChange { {
NodeID : id ,
Key : & key ,
LastSeen : & lastSeen ,
Online : & online ,
DiscoKey : & discoKey ,
} } ,
} ,
viaTSMP : true ,
}
ms . changeQueue <- resp
ms . cqmu . Unlock ( )
@ -221,7 +230,12 @@ func (ms *mapSession) HandleNonKeepAliveMapResponse(ctx context.Context, resp *t
return ErrChangeQueueClosed
}
ms . changeQueue <- resp
change := responseWithSource {
response : resp ,
viaTSMP : false ,
}
ms . changeQueue <- change
ms . cqmu . Unlock ( )
return nil
}
@ -234,7 +248,7 @@ func (ms *mapSession) HandleNonKeepAliveMapResponse(ctx context.Context, resp *t
//
// TODO(bradfitz): make this handle all fields later. For now (2023-08-20) this
// is [re]factoring progress enough.
func ( ms * mapSession ) handleNonKeepAliveMapResponse ( ctx context . Context , resp * tailcfg . MapResponse ) error {
func ( ms * mapSession ) handleNonKeepAliveMapResponse ( ctx context . Context , resp * tailcfg . MapResponse , viaTSMP bool ) error {
if debug := resp . Debug ; debug != nil {
if err := ms . onDebug ( ctx , debug ) ; err != nil {
return err
@ -284,6 +298,13 @@ func (ms *mapSession) handleNonKeepAliveMapResponse(ctx context.Context, resp *t
ms . updateStateFromResponse ( resp )
// If source was learned via TSMP, the updated disco key need to be marked in
// userspaceEngine as an update that should not reconfigure the wireguard
// connection.
if viaTSMP {
ms . tryMarkDiscoAsLearnedFromTSMP ( resp )
}
if ms . tryHandleIncrementally ( resp ) {
ms . occasionallyPrintSummary ( ms . lastNetmapSummary )
return nil
@ -312,6 +333,21 @@ func (ms *mapSession) handleNonKeepAliveMapResponse(ctx context.Context, resp *t
return nil
}
func ( ms * mapSession ) tryMarkDiscoAsLearnedFromTSMP ( res * tailcfg . MapResponse ) {
dun , ok := ms . netmapUpdater . ( patchDiscoKeyer )
if ! ok {
return
}
// In reality we should never really have more than one change here over TSMP.
for _ , change := range res . PeersChangedPatch {
if change == nil || change . DiscoKey == nil || change . Key == nil {
continue
}
dun . PatchDiscoKey ( * change . Key , * change . DiscoKey )
}
}
// upgradeNode upgrades Node fields from the server into the modern forms
// not using deprecated fields.
func upgradeNode ( n * tailcfg . Node ) {