|
|
|
|
@ -6,6 +6,7 @@ package magicsock |
|
|
|
|
import ( |
|
|
|
|
"context" |
|
|
|
|
"errors" |
|
|
|
|
"fmt" |
|
|
|
|
"net/netip" |
|
|
|
|
"sync" |
|
|
|
|
"time" |
|
|
|
|
@ -76,8 +77,11 @@ type serverDiscoVNI struct { |
|
|
|
|
// relayHandshakeWork serves to track in-progress relay handshake work for a
|
|
|
|
|
// [udprelay.ServerEndpoint]. This structure is immutable once initialized.
|
|
|
|
|
type relayHandshakeWork struct { |
|
|
|
|
wlb endpointWithLastBest |
|
|
|
|
se udprelay.ServerEndpoint |
|
|
|
|
wlb endpointWithLastBest |
|
|
|
|
se udprelay.ServerEndpoint |
|
|
|
|
server candidatePeerRelay |
|
|
|
|
|
|
|
|
|
handshakeGen uint32 |
|
|
|
|
|
|
|
|
|
// handshakeServerEndpoint() always writes to doneCh (len 1) when it
|
|
|
|
|
// returns. It may end up writing the same event afterward to
|
|
|
|
|
@ -91,6 +95,26 @@ type relayHandshakeWork struct { |
|
|
|
|
cancel context.CancelFunc |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *relayHandshakeWork) dlogf(format string, args ...any) { |
|
|
|
|
if !r.wlb.ep.c.debugLogging.Load() { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
var relay string |
|
|
|
|
if r.server.nodeKey.IsZero() { |
|
|
|
|
relay = "from-call-me-maybe-via" |
|
|
|
|
} else { |
|
|
|
|
relay = r.server.nodeKey.ShortString() |
|
|
|
|
} |
|
|
|
|
r.wlb.ep.c.logf("%s node=%v relay=%v handshakeGen=%d disco[0]=%v disco[1]=%v", |
|
|
|
|
fmt.Sprintf(format, args...), |
|
|
|
|
r.wlb.ep.publicKey.ShortString(), |
|
|
|
|
relay, |
|
|
|
|
r.handshakeGen, |
|
|
|
|
r.se.ClientDisco[0].ShortString(), |
|
|
|
|
r.se.ClientDisco[1].ShortString(), |
|
|
|
|
) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// newRelayServerEndpointEvent indicates a new [udprelay.ServerEndpoint] has
|
|
|
|
|
// become known either via allocation with a relay server, or via
|
|
|
|
|
// [disco.CallMeMaybeVia] reception. This structure is immutable once
|
|
|
|
|
@ -257,7 +281,9 @@ type relayDiscoMsgEvent struct { |
|
|
|
|
type relayEndpointAllocWork struct { |
|
|
|
|
wlb endpointWithLastBest |
|
|
|
|
discoKeys key.SortedPairOfDiscoPublic |
|
|
|
|
candidatePeerRelay candidatePeerRelay |
|
|
|
|
candidatePeerRelay candidatePeerRelay // zero value if learned via [disco.CallMeMaybeVia]
|
|
|
|
|
|
|
|
|
|
allocGen uint32 |
|
|
|
|
|
|
|
|
|
// allocateServerEndpoint() always writes to doneCh (len 1) when it
|
|
|
|
|
// returns. It may end up writing the same event afterward to
|
|
|
|
|
@ -271,6 +297,20 @@ type relayEndpointAllocWork struct { |
|
|
|
|
cancel context.CancelFunc |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *relayEndpointAllocWork) dlogf(format string, args ...any) { |
|
|
|
|
if !r.wlb.ep.c.debugLogging.Load() { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
r.wlb.ep.c.logf("%s node=%v relay=%v allocGen=%d disco[0]=%v disco[1]=%v", |
|
|
|
|
fmt.Sprintf(format, args...), |
|
|
|
|
r.wlb.ep.publicKey.ShortString(), |
|
|
|
|
r.candidatePeerRelay.nodeKey.ShortString(), |
|
|
|
|
r.allocGen, |
|
|
|
|
r.discoKeys.Get()[0].ShortString(), |
|
|
|
|
r.discoKeys.Get()[1].ShortString(), |
|
|
|
|
) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// init initializes [relayManager] if it is not already initialized.
|
|
|
|
|
func (r *relayManager) init() { |
|
|
|
|
r.initOnce.Do(func() { |
|
|
|
|
@ -712,6 +752,7 @@ func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelay |
|
|
|
|
work := &relayHandshakeWork{ |
|
|
|
|
wlb: newServerEndpoint.wlb, |
|
|
|
|
se: newServerEndpoint.se, |
|
|
|
|
server: newServerEndpoint.server, |
|
|
|
|
rxDiscoMsgCh: make(chan relayDiscoMsgEvent), |
|
|
|
|
doneCh: make(chan relayEndpointHandshakeWorkDoneEvent, 1), |
|
|
|
|
ctx: ctx, |
|
|
|
|
@ -728,8 +769,9 @@ func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelay |
|
|
|
|
if r.handshakeGeneration == 0 { // generation must be nonzero
|
|
|
|
|
r.handshakeGeneration++ |
|
|
|
|
} |
|
|
|
|
work.handshakeGen = r.handshakeGeneration |
|
|
|
|
|
|
|
|
|
go r.handshakeServerEndpoint(work, r.handshakeGeneration) |
|
|
|
|
go r.handshakeServerEndpoint(work) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// sendCallMeMaybeVia sends a [disco.CallMeMaybeVia] to ep over DERP. It must be
|
|
|
|
|
@ -758,7 +800,7 @@ func (r *relayManager) sendCallMeMaybeVia(ep *endpoint, se udprelay.ServerEndpoi |
|
|
|
|
ep.c.sendDiscoMessage(epAddr{ap: derpAddr}, ep.publicKey, epDisco.key, callMeMaybeVia, discoVerboseLog) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork, generation uint32) { |
|
|
|
|
func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork) { |
|
|
|
|
done := relayEndpointHandshakeWorkDoneEvent{work: work} |
|
|
|
|
r.ensureDiscoInfoFor(work) |
|
|
|
|
|
|
|
|
|
@ -777,10 +819,13 @@ func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork, generat |
|
|
|
|
|
|
|
|
|
common := disco.BindUDPRelayEndpointCommon{ |
|
|
|
|
VNI: work.se.VNI, |
|
|
|
|
Generation: generation, |
|
|
|
|
Generation: work.handshakeGen, |
|
|
|
|
RemoteKey: epDisco.key, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
work.dlogf("[v1] magicsock: relayManager: starting handshake addrPorts=%v", |
|
|
|
|
work.se.AddrPorts, |
|
|
|
|
) |
|
|
|
|
sentBindAny := false |
|
|
|
|
bind := &disco.BindUDPRelayEndpoint{ |
|
|
|
|
BindUDPRelayEndpointCommon: common, |
|
|
|
|
@ -848,6 +893,7 @@ func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork, generat |
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case <-work.ctx.Done(): |
|
|
|
|
work.dlogf("[v1] magicsock: relayManager: handshake canceled") |
|
|
|
|
return |
|
|
|
|
case msgEvent := <-work.rxDiscoMsgCh: |
|
|
|
|
switch msg := msgEvent.msg.(type) { |
|
|
|
|
@ -859,12 +905,14 @@ func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork, generat |
|
|
|
|
if handshakeState >= disco.BindUDPRelayHandshakeStateAnswerSent { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
work.dlogf("[v1] magicsock: relayManager: got handshake challenge from %v", msgEvent.from) |
|
|
|
|
txPing(msgEvent.from, &msg.Challenge) |
|
|
|
|
handshakeState = disco.BindUDPRelayHandshakeStateAnswerSent |
|
|
|
|
case *disco.Ping: |
|
|
|
|
if handshakeState < disco.BindUDPRelayHandshakeStateAnswerSent { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
work.dlogf("[v1] magicsock: relayManager: got relayed ping from %v", msgEvent.from) |
|
|
|
|
// An inbound ping from the remote peer indicates we completed a
|
|
|
|
|
// handshake with the relay server (our answer msg was
|
|
|
|
|
// received). Chances are our ping was dropped before the remote
|
|
|
|
|
@ -885,6 +933,10 @@ func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork, generat |
|
|
|
|
// round-trip latency and return.
|
|
|
|
|
done.pongReceivedFrom = msgEvent.from |
|
|
|
|
done.latency = time.Since(at) |
|
|
|
|
work.dlogf("[v1] magicsock: relayManager: got relayed pong from %v latency=%v", |
|
|
|
|
msgEvent.from, |
|
|
|
|
done.latency.Round(time.Millisecond), |
|
|
|
|
) |
|
|
|
|
return |
|
|
|
|
default: |
|
|
|
|
// unexpected message type, silently discard
|
|
|
|
|
@ -892,6 +944,7 @@ func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork, generat |
|
|
|
|
} |
|
|
|
|
case <-timer.C: |
|
|
|
|
// The handshake timed out.
|
|
|
|
|
work.dlogf("[v1] magicsock: relayManager: handshake timed out") |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
@ -899,7 +952,7 @@ func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork, generat |
|
|
|
|
|
|
|
|
|
const allocateUDPRelayEndpointRequestTimeout = time.Second * 10 |
|
|
|
|
|
|
|
|
|
func (r *relayManager) allocateServerEndpoint(work *relayEndpointAllocWork, generation uint32) { |
|
|
|
|
func (r *relayManager) allocateServerEndpoint(work *relayEndpointAllocWork) { |
|
|
|
|
done := relayEndpointAllocWorkDoneEvent{work: work} |
|
|
|
|
|
|
|
|
|
defer func() { |
|
|
|
|
@ -910,7 +963,7 @@ func (r *relayManager) allocateServerEndpoint(work *relayEndpointAllocWork, gene |
|
|
|
|
|
|
|
|
|
dm := &disco.AllocateUDPRelayEndpointRequest{ |
|
|
|
|
ClientDisco: work.discoKeys.Get(), |
|
|
|
|
Generation: generation, |
|
|
|
|
Generation: work.allocGen, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
sendAllocReq := func() { |
|
|
|
|
@ -923,6 +976,7 @@ func (r *relayManager) allocateServerEndpoint(work *relayEndpointAllocWork, gene |
|
|
|
|
dm, |
|
|
|
|
discoVerboseLog, |
|
|
|
|
) |
|
|
|
|
work.dlogf("[v1] magicsock: relayManager: sent alloc request") |
|
|
|
|
} |
|
|
|
|
go sendAllocReq() |
|
|
|
|
|
|
|
|
|
@ -938,16 +992,19 @@ func (r *relayManager) allocateServerEndpoint(work *relayEndpointAllocWork, gene |
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case <-work.ctx.Done(): |
|
|
|
|
work.dlogf("[v1] magicsock: relayManager: alloc request canceled") |
|
|
|
|
return |
|
|
|
|
case <-returnAfterTimer.C: |
|
|
|
|
work.dlogf("[v1] magicsock: relayManager: alloc request timed out") |
|
|
|
|
return |
|
|
|
|
case <-retryAfterTimer.C: |
|
|
|
|
go sendAllocReq() |
|
|
|
|
case resp := <-work.rxDiscoMsgCh: |
|
|
|
|
if resp.Generation != generation || |
|
|
|
|
if resp.Generation != work.allocGen || |
|
|
|
|
!work.discoKeys.Equal(key.NewSortedPairOfDiscoPublic(resp.ClientDisco[0], resp.ClientDisco[1])) { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
work.dlogf("[v1] magicsock: relayManager: got alloc response") |
|
|
|
|
done.allocated = udprelay.ServerEndpoint{ |
|
|
|
|
ServerDisco: resp.ServerDisco, |
|
|
|
|
ClientDisco: resp.ClientDisco, |
|
|
|
|
@ -1004,6 +1061,7 @@ func (r *relayManager) allocateAllServersRunLoop(wlb endpointWithLastBest) { |
|
|
|
|
} |
|
|
|
|
byCandidatePeerRelay[v] = started |
|
|
|
|
r.allocGeneration++ |
|
|
|
|
go r.allocateServerEndpoint(started, r.allocGeneration) |
|
|
|
|
started.allocGen = r.allocGeneration |
|
|
|
|
go r.allocateServerEndpoint(started) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|