Updates #8720 Signed-off-by: David Anderson <danderson@tailscale.com>main
parent
9a76deb4b0
commit
84777354a0
@ -0,0 +1,194 @@ |
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package magicsock |
||||
|
||||
import ( |
||||
"errors" |
||||
"net" |
||||
"net/netip" |
||||
"sync" |
||||
"sync/atomic" |
||||
"time" |
||||
|
||||
"golang.org/x/net/ipv6" |
||||
"tailscale.com/net/neterror" |
||||
"tailscale.com/types/nettype" |
||||
) |
||||
|
||||
// xnetBatchReaderWriter defines the batching i/o methods of
|
||||
// golang.org/x/net/ipv4.PacketConn (and ipv6.PacketConn).
|
||||
// TODO(jwhited): This should eventually be replaced with the standard library
|
||||
// implementation of https://github.com/golang/go/issues/45886
|
||||
type xnetBatchReaderWriter interface { |
||||
xnetBatchReader |
||||
xnetBatchWriter |
||||
} |
||||
|
||||
type xnetBatchReader interface { |
||||
ReadBatch([]ipv6.Message, int) (int, error) |
||||
} |
||||
|
||||
type xnetBatchWriter interface { |
||||
WriteBatch([]ipv6.Message, int) (int, error) |
||||
} |
||||
|
||||
// batchingUDPConn is a UDP socket that provides batched i/o.
|
||||
type batchingUDPConn struct { |
||||
pc nettype.PacketConn |
||||
xpc xnetBatchReaderWriter |
||||
rxOffload bool // supports UDP GRO or similar
|
||||
txOffload atomic.Bool // supports UDP GSO or similar
|
||||
setGSOSizeInControl func(control *[]byte, gsoSize uint16) // typically setGSOSizeInControl(); swappable for testing
|
||||
getGSOSizeFromControl func(control []byte) (int, error) // typically getGSOSizeFromControl(); swappable for testing
|
||||
sendBatchPool sync.Pool |
||||
} |
||||
|
||||
func (c *batchingUDPConn) ReadFromUDPAddrPort(p []byte) (n int, addr netip.AddrPort, err error) { |
||||
if c.rxOffload { |
||||
// UDP_GRO is opt-in on Linux via setsockopt(). Once enabled you may
|
||||
// receive a "monster datagram" from any read call. The ReadFrom() API
|
||||
// does not support passing the GSO size and is unsafe to use in such a
|
||||
// case. Other platforms may vary in behavior, but we go with the most
|
||||
// conservative approach to prevent this from becoming a footgun in the
|
||||
// future.
|
||||
return 0, netip.AddrPort{}, errors.New("rx UDP offload is enabled on this socket, single packet reads are unavailable") |
||||
} |
||||
return c.pc.ReadFromUDPAddrPort(p) |
||||
} |
||||
|
||||
func (c *batchingUDPConn) SetDeadline(t time.Time) error { |
||||
return c.pc.SetDeadline(t) |
||||
} |
||||
|
||||
func (c *batchingUDPConn) SetReadDeadline(t time.Time) error { |
||||
return c.pc.SetReadDeadline(t) |
||||
} |
||||
|
||||
func (c *batchingUDPConn) SetWriteDeadline(t time.Time) error { |
||||
return c.pc.SetWriteDeadline(t) |
||||
} |
||||
|
||||
const ( |
||||
// This was initially established for Linux, but may split out to
|
||||
// GOOS-specific values later. It originates as UDP_MAX_SEGMENTS in the
|
||||
// kernel's TX path, and UDP_GRO_CNT_MAX for RX.
|
||||
udpSegmentMaxDatagrams = 64 |
||||
) |
||||
|
||||
const ( |
||||
// Exceeding these values results in EMSGSIZE.
|
||||
maxIPv4PayloadLen = 1<<16 - 1 - 20 - 8 |
||||
maxIPv6PayloadLen = 1<<16 - 1 - 8 |
||||
) |
||||
|
||||
// coalesceMessages iterates msgs, coalescing them where possible while
|
||||
// maintaining datagram order. All msgs have their Addr field set to addr.
|
||||
func (c *batchingUDPConn) coalesceMessages(addr *net.UDPAddr, buffs [][]byte, msgs []ipv6.Message) int { |
||||
var ( |
||||
base = -1 // index of msg we are currently coalescing into
|
||||
gsoSize int // segmentation size of msgs[base]
|
||||
dgramCnt int // number of dgrams coalesced into msgs[base]
|
||||
endBatch bool // tracking flag to start a new batch on next iteration of buffs
|
||||
) |
||||
maxPayloadLen := maxIPv4PayloadLen |
||||
if addr.IP.To4() == nil { |
||||
maxPayloadLen = maxIPv6PayloadLen |
||||
} |
||||
for i, buff := range buffs { |
||||
if i > 0 { |
||||
msgLen := len(buff) |
||||
baseLenBefore := len(msgs[base].Buffers[0]) |
||||
freeBaseCap := cap(msgs[base].Buffers[0]) - baseLenBefore |
||||
if msgLen+baseLenBefore <= maxPayloadLen && |
||||
msgLen <= gsoSize && |
||||
msgLen <= freeBaseCap && |
||||
dgramCnt < udpSegmentMaxDatagrams && |
||||
!endBatch { |
||||
msgs[base].Buffers[0] = append(msgs[base].Buffers[0], make([]byte, msgLen)...) |
||||
copy(msgs[base].Buffers[0][baseLenBefore:], buff) |
||||
if i == len(buffs)-1 { |
||||
c.setGSOSizeInControl(&msgs[base].OOB, uint16(gsoSize)) |
||||
} |
||||
dgramCnt++ |
||||
if msgLen < gsoSize { |
||||
// A smaller than gsoSize packet on the tail is legal, but
|
||||
// it must end the batch.
|
||||
endBatch = true |
||||
} |
||||
continue |
||||
} |
||||
} |
||||
if dgramCnt > 1 { |
||||
c.setGSOSizeInControl(&msgs[base].OOB, uint16(gsoSize)) |
||||
} |
||||
// Reset prior to incrementing base since we are preparing to start a
|
||||
// new potential batch.
|
||||
endBatch = false |
||||
base++ |
||||
gsoSize = len(buff) |
||||
msgs[base].OOB = msgs[base].OOB[:0] |
||||
msgs[base].Buffers[0] = buff |
||||
msgs[base].Addr = addr |
||||
dgramCnt = 1 |
||||
} |
||||
return base + 1 |
||||
} |
||||
|
||||
type sendBatch struct { |
||||
msgs []ipv6.Message |
||||
ua *net.UDPAddr |
||||
} |
||||
|
||||
func (c *batchingUDPConn) getSendBatch() *sendBatch { |
||||
batch := c.sendBatchPool.Get().(*sendBatch) |
||||
return batch |
||||
} |
||||
|
||||
func (c *batchingUDPConn) putSendBatch(batch *sendBatch) { |
||||
for i := range batch.msgs { |
||||
batch.msgs[i] = ipv6.Message{Buffers: batch.msgs[i].Buffers, OOB: batch.msgs[i].OOB} |
||||
} |
||||
c.sendBatchPool.Put(batch) |
||||
} |
||||
|
||||
func (c *batchingUDPConn) WriteBatchTo(buffs [][]byte, addr netip.AddrPort) error { |
||||
batch := c.getSendBatch() |
||||
defer c.putSendBatch(batch) |
||||
if addr.Addr().Is6() { |
||||
as16 := addr.Addr().As16() |
||||
copy(batch.ua.IP, as16[:]) |
||||
batch.ua.IP = batch.ua.IP[:16] |
||||
} else { |
||||
as4 := addr.Addr().As4() |
||||
copy(batch.ua.IP, as4[:]) |
||||
batch.ua.IP = batch.ua.IP[:4] |
||||
} |
||||
batch.ua.Port = int(addr.Port()) |
||||
var ( |
||||
n int |
||||
retried bool |
||||
) |
||||
retry: |
||||
if c.txOffload.Load() { |
||||
n = c.coalesceMessages(batch.ua, buffs, batch.msgs) |
||||
} else { |
||||
for i := range buffs { |
||||
batch.msgs[i].Buffers[0] = buffs[i] |
||||
batch.msgs[i].Addr = batch.ua |
||||
batch.msgs[i].OOB = batch.msgs[i].OOB[:0] |
||||
} |
||||
n = len(buffs) |
||||
} |
||||
|
||||
err := c.writeBatch(batch.msgs[:n]) |
||||
if err != nil && c.txOffload.Load() && neterror.ShouldDisableUDPGSO(err) { |
||||
c.txOffload.Store(false) |
||||
retried = true |
||||
goto retry |
||||
} |
||||
if retried { |
||||
return neterror.ErrUDPGSODisabled{OnLaddr: c.pc.LocalAddr().String(), RetryErr: err} |
||||
} |
||||
return err |
||||
} |
||||
@ -0,0 +1,53 @@ |
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package magicsock |
||||
|
||||
import ( |
||||
"errors" |
||||
"net" |
||||
"net/netip" |
||||
"sync" |
||||
"time" |
||||
) |
||||
|
||||
// blockForeverConn is a net.PacketConn whose reads block until it is closed.
|
||||
type blockForeverConn struct { |
||||
mu sync.Mutex |
||||
cond *sync.Cond |
||||
closed bool |
||||
} |
||||
|
||||
func (c *blockForeverConn) ReadFromUDPAddrPort(p []byte) (n int, addr netip.AddrPort, err error) { |
||||
c.mu.Lock() |
||||
for !c.closed { |
||||
c.cond.Wait() |
||||
} |
||||
c.mu.Unlock() |
||||
return 0, netip.AddrPort{}, net.ErrClosed |
||||
} |
||||
|
||||
func (c *blockForeverConn) WriteToUDPAddrPort(p []byte, addr netip.AddrPort) (int, error) { |
||||
// Silently drop writes.
|
||||
return len(p), nil |
||||
} |
||||
|
||||
func (c *blockForeverConn) LocalAddr() net.Addr { |
||||
// Return a *net.UDPAddr because lots of code assumes that it will.
|
||||
return new(net.UDPAddr) |
||||
} |
||||
|
||||
func (c *blockForeverConn) Close() error { |
||||
c.mu.Lock() |
||||
defer c.mu.Unlock() |
||||
if c.closed { |
||||
return net.ErrClosed |
||||
} |
||||
c.closed = true |
||||
c.cond.Broadcast() |
||||
return nil |
||||
} |
||||
|
||||
func (c *blockForeverConn) SetDeadline(t time.Time) error { return errors.New("unimplemented") } |
||||
func (c *blockForeverConn) SetReadDeadline(t time.Time) error { return errors.New("unimplemented") } |
||||
func (c *blockForeverConn) SetWriteDeadline(t time.Time) error { return errors.New("unimplemented") } |
||||
@ -0,0 +1,927 @@ |
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package magicsock |
||||
|
||||
import ( |
||||
"bufio" |
||||
"context" |
||||
"fmt" |
||||
"hash/fnv" |
||||
"math/rand" |
||||
"net" |
||||
"net/netip" |
||||
"reflect" |
||||
"runtime" |
||||
"sort" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/tailscale/wireguard-go/conn" |
||||
"tailscale.com/control/controlclient" |
||||
"tailscale.com/derp" |
||||
"tailscale.com/derp/derphttp" |
||||
"tailscale.com/health" |
||||
"tailscale.com/logtail/backoff" |
||||
"tailscale.com/net/dnscache" |
||||
"tailscale.com/net/tsaddr" |
||||
"tailscale.com/syncs" |
||||
"tailscale.com/tailcfg" |
||||
"tailscale.com/types/key" |
||||
"tailscale.com/types/logger" |
||||
"tailscale.com/util/mak" |
||||
"tailscale.com/util/sysresources" |
||||
) |
||||
|
||||
// useDerpRoute reports whether magicsock should enable the DERP
|
||||
// return path optimization (Issue 150).
|
||||
func useDerpRoute() bool { |
||||
if b, ok := debugUseDerpRoute().Get(); ok { |
||||
return b |
||||
} |
||||
ob := controlclient.DERPRouteFlag() |
||||
if v, ok := ob.Get(); ok { |
||||
return v |
||||
} |
||||
return true // as of 1.21.x
|
||||
} |
||||
|
||||
// derpRoute is a route entry for a public key, saying that a certain
|
||||
// peer should be available at DERP node derpID, as long as the
|
||||
// current connection for that derpID is dc. (but dc should not be
|
||||
// used to write directly; it's owned by the read/write loops)
|
||||
type derpRoute struct { |
||||
derpID int |
||||
dc *derphttp.Client // don't use directly; see comment above
|
||||
} |
||||
|
||||
// removeDerpPeerRoute removes a DERP route entry previously added by addDerpPeerRoute.
|
||||
func (c *Conn) removeDerpPeerRoute(peer key.NodePublic, derpID int, dc *derphttp.Client) { |
||||
c.mu.Lock() |
||||
defer c.mu.Unlock() |
||||
r2 := derpRoute{derpID, dc} |
||||
if r, ok := c.derpRoute[peer]; ok && r == r2 { |
||||
delete(c.derpRoute, peer) |
||||
} |
||||
} |
||||
|
||||
// addDerpPeerRoute adds a DERP route entry, noting that peer was seen
|
||||
// on DERP node derpID, at least on the connection identified by dc.
|
||||
// See issue 150 for details.
|
||||
func (c *Conn) addDerpPeerRoute(peer key.NodePublic, derpID int, dc *derphttp.Client) { |
||||
c.mu.Lock() |
||||
defer c.mu.Unlock() |
||||
mak.Set(&c.derpRoute, peer, derpRoute{derpID, dc}) |
||||
} |
||||
|
||||
// activeDerp contains fields for an active DERP connection.
|
||||
type activeDerp struct { |
||||
c *derphttp.Client |
||||
cancel context.CancelFunc |
||||
writeCh chan<- derpWriteRequest |
||||
// lastWrite is the time of the last request for its write
|
||||
// channel (currently even if there was no write).
|
||||
// It is always non-nil and initialized to a non-zero Time.
|
||||
lastWrite *time.Time |
||||
createTime time.Time |
||||
} |
||||
|
||||
var processStartUnixNano = time.Now().UnixNano() |
||||
|
||||
// pickDERPFallback returns a non-zero but deterministic DERP node to
|
||||
// connect to. This is only used if netcheck couldn't find the
|
||||
// nearest one (for instance, if UDP is blocked and thus STUN latency
|
||||
// checks aren't working).
|
||||
//
|
||||
// c.mu must NOT be held.
|
||||
func (c *Conn) pickDERPFallback() int { |
||||
c.mu.Lock() |
||||
defer c.mu.Unlock() |
||||
|
||||
if !c.wantDerpLocked() { |
||||
return 0 |
||||
} |
||||
ids := c.derpMap.RegionIDs() |
||||
if len(ids) == 0 { |
||||
// No DERP regions in non-nil map.
|
||||
return 0 |
||||
} |
||||
|
||||
// TODO: figure out which DERP region most of our peers are using,
|
||||
// and use that region as our fallback.
|
||||
//
|
||||
// If we already had selected something in the past and it has any
|
||||
// peers, we want to stay on it. If there are no peers at all,
|
||||
// stay on whatever DERP we previously picked. If we need to pick
|
||||
// one and have no peer info, pick a region randomly.
|
||||
//
|
||||
// We used to do the above for legacy clients, but never updated
|
||||
// it for disco.
|
||||
|
||||
if c.myDerp != 0 { |
||||
return c.myDerp |
||||
} |
||||
|
||||
h := fnv.New64() |
||||
fmt.Fprintf(h, "%p/%d", c, processStartUnixNano) // arbitrary
|
||||
return ids[rand.New(rand.NewSource(int64(h.Sum64()))).Intn(len(ids))] |
||||
} |
||||
|
||||
func (c *Conn) derpRegionCodeLocked(regionID int) string { |
||||
if c.derpMap == nil { |
||||
return "" |
||||
} |
||||
if dr, ok := c.derpMap.Regions[regionID]; ok { |
||||
return dr.RegionCode |
||||
} |
||||
return "" |
||||
} |
||||
|
||||
// c.mu must NOT be held.
|
||||
func (c *Conn) setNearestDERP(derpNum int) (wantDERP bool) { |
||||
c.mu.Lock() |
||||
defer c.mu.Unlock() |
||||
if !c.wantDerpLocked() { |
||||
c.myDerp = 0 |
||||
health.SetMagicSockDERPHome(0) |
||||
return false |
||||
} |
||||
if derpNum == c.myDerp { |
||||
// No change.
|
||||
return true |
||||
} |
||||
if c.myDerp != 0 && derpNum != 0 { |
||||
metricDERPHomeChange.Add(1) |
||||
} |
||||
c.myDerp = derpNum |
||||
health.SetMagicSockDERPHome(derpNum) |
||||
|
||||
if c.privateKey.IsZero() { |
||||
// No private key yet, so DERP connections won't come up anyway.
|
||||
// Return early rather than ultimately log a couple lines of noise.
|
||||
return true |
||||
} |
||||
|
||||
// On change, notify all currently connected DERP servers and
|
||||
// start connecting to our home DERP if we are not already.
|
||||
dr := c.derpMap.Regions[derpNum] |
||||
if dr == nil { |
||||
c.logf("[unexpected] magicsock: derpMap.Regions[%v] is nil", derpNum) |
||||
} else { |
||||
c.logf("magicsock: home is now derp-%v (%v)", derpNum, c.derpMap.Regions[derpNum].RegionCode) |
||||
} |
||||
for i, ad := range c.activeDerp { |
||||
go ad.c.NotePreferred(i == c.myDerp) |
||||
} |
||||
c.goDerpConnect(derpNum) |
||||
return true |
||||
} |
||||
|
||||
// startDerpHomeConnectLocked starts connecting to our DERP home, if any.
|
||||
//
|
||||
// c.mu must be held.
|
||||
func (c *Conn) startDerpHomeConnectLocked() { |
||||
c.goDerpConnect(c.myDerp) |
||||
} |
||||
|
||||
// goDerpConnect starts a goroutine to start connecting to the given
|
||||
// DERP node.
|
||||
//
|
||||
// c.mu may be held, but does not need to be.
|
||||
func (c *Conn) goDerpConnect(node int) { |
||||
if node == 0 { |
||||
return |
||||
} |
||||
go c.derpWriteChanOfAddr(netip.AddrPortFrom(tailcfg.DerpMagicIPAddr, uint16(node)), key.NodePublic{}) |
||||
} |
||||
|
||||
var ( |
||||
bufferedDerpWrites int |
||||
bufferedDerpWritesOnce sync.Once |
||||
) |
||||
|
||||
// bufferedDerpWritesBeforeDrop returns how many packets writes can be queued
|
||||
// up the DERP client to write on the wire before we start dropping.
|
||||
func bufferedDerpWritesBeforeDrop() int { |
||||
// For mobile devices, always return the previous minimum value of 32;
|
||||
// we can do this outside the sync.Once to avoid that overhead.
|
||||
if runtime.GOOS == "ios" || runtime.GOOS == "android" { |
||||
return 32 |
||||
} |
||||
|
||||
bufferedDerpWritesOnce.Do(func() { |
||||
// Some rough sizing: for the previous fixed value of 32, the
|
||||
// total consumed memory can be:
|
||||
// = numDerpRegions * messages/region * sizeof(message)
|
||||
//
|
||||
// For sake of this calculation, assume 100 DERP regions; at
|
||||
// time of writing (2023-04-03), we have 24.
|
||||
//
|
||||
// A reasonable upper bound for the worst-case average size of
|
||||
// a message is a *disco.CallMeMaybe message with 16 endpoints;
|
||||
// since sizeof(netip.AddrPort) = 32, that's 512 bytes. Thus:
|
||||
// = 100 * 32 * 512
|
||||
// = 1638400 (1.6MiB)
|
||||
//
|
||||
// On a reasonably-small node with 4GiB of memory that's
|
||||
// connected to each region and handling a lot of load, 1.6MiB
|
||||
// is about 0.04% of the total system memory.
|
||||
//
|
||||
// For sake of this calculation, then, let's double that memory
|
||||
// usage to 0.08% and scale based on total system memory.
|
||||
//
|
||||
// For a 16GiB Linux box, this should buffer just over 256
|
||||
// messages.
|
||||
systemMemory := sysresources.TotalMemory() |
||||
memoryUsable := float64(systemMemory) * 0.0008 |
||||
|
||||
const ( |
||||
theoreticalDERPRegions = 100 |
||||
messageMaximumSizeBytes = 512 |
||||
) |
||||
bufferedDerpWrites = int(memoryUsable / (theoreticalDERPRegions * messageMaximumSizeBytes)) |
||||
|
||||
// Never drop below the previous minimum value.
|
||||
if bufferedDerpWrites < 32 { |
||||
bufferedDerpWrites = 32 |
||||
} |
||||
}) |
||||
return bufferedDerpWrites |
||||
} |
||||
|
||||
// derpWriteChanOfAddr returns a DERP client for fake UDP addresses that
|
||||
// represent DERP servers, creating them as necessary. For real UDP
|
||||
// addresses, it returns nil.
|
||||
//
|
||||
// If peer is non-zero, it can be used to find an active reverse
|
||||
// path, without using addr.
|
||||
func (c *Conn) derpWriteChanOfAddr(addr netip.AddrPort, peer key.NodePublic) chan<- derpWriteRequest { |
||||
if addr.Addr() != tailcfg.DerpMagicIPAddr { |
||||
return nil |
||||
} |
||||
regionID := int(addr.Port()) |
||||
|
||||
if c.networkDown() { |
||||
return nil |
||||
} |
||||
|
||||
c.mu.Lock() |
||||
defer c.mu.Unlock() |
||||
if !c.wantDerpLocked() || c.closed { |
||||
return nil |
||||
} |
||||
if c.derpMap == nil || c.derpMap.Regions[regionID] == nil { |
||||
return nil |
||||
} |
||||
if c.privateKey.IsZero() { |
||||
c.logf("magicsock: DERP lookup of %v with no private key; ignoring", addr) |
||||
return nil |
||||
} |
||||
|
||||
// See if we have a connection open to that DERP node ID
|
||||
// first. If so, might as well use it. (It's a little
|
||||
// arbitrary whether we use this one vs. the reverse route
|
||||
// below when we have both.)
|
||||
ad, ok := c.activeDerp[regionID] |
||||
if ok { |
||||
*ad.lastWrite = time.Now() |
||||
c.setPeerLastDerpLocked(peer, regionID, regionID) |
||||
return ad.writeCh |
||||
} |
||||
|
||||
// If we don't have an open connection to the peer's home DERP
|
||||
// node, see if we have an open connection to a DERP node
|
||||
// where we'd heard from that peer already. For instance,
|
||||
// perhaps peer's home is Frankfurt, but they dialed our home DERP
|
||||
// node in SF to reach us, so we can reply to them using our
|
||||
// SF connection rather than dialing Frankfurt. (Issue 150)
|
||||
if !peer.IsZero() && useDerpRoute() { |
||||
if r, ok := c.derpRoute[peer]; ok { |
||||
if ad, ok := c.activeDerp[r.derpID]; ok && ad.c == r.dc { |
||||
c.setPeerLastDerpLocked(peer, r.derpID, regionID) |
||||
*ad.lastWrite = time.Now() |
||||
return ad.writeCh |
||||
} |
||||
} |
||||
} |
||||
|
||||
why := "home-keep-alive" |
||||
if !peer.IsZero() { |
||||
why = peer.ShortString() |
||||
} |
||||
c.logf("magicsock: adding connection to derp-%v for %v", regionID, why) |
||||
|
||||
firstDerp := false |
||||
if c.activeDerp == nil { |
||||
firstDerp = true |
||||
c.activeDerp = make(map[int]activeDerp) |
||||
c.prevDerp = make(map[int]*syncs.WaitGroupChan) |
||||
} |
||||
|
||||
// Note that derphttp.NewRegionClient does not dial the server
|
||||
// (it doesn't block) so it is safe to do under the c.mu lock.
|
||||
dc := derphttp.NewRegionClient(c.privateKey, c.logf, c.netMon, func() *tailcfg.DERPRegion { |
||||
// Warning: it is not legal to acquire
|
||||
// magicsock.Conn.mu from this callback.
|
||||
// It's run from derphttp.Client.connect (via Send, etc)
|
||||
// and the lock ordering rules are that magicsock.Conn.mu
|
||||
// must be acquired before derphttp.Client.mu.
|
||||
// See https://github.com/tailscale/tailscale/issues/3726
|
||||
if c.connCtx.Err() != nil { |
||||
// We're closing anyway; return nil to stop dialing.
|
||||
return nil |
||||
} |
||||
derpMap := c.derpMapAtomic.Load() |
||||
if derpMap == nil { |
||||
return nil |
||||
} |
||||
return derpMap.Regions[regionID] |
||||
}) |
||||
|
||||
dc.SetCanAckPings(true) |
||||
dc.NotePreferred(c.myDerp == regionID) |
||||
dc.SetAddressFamilySelector(derpAddrFamSelector{c}) |
||||
dc.DNSCache = dnscache.Get() |
||||
|
||||
ctx, cancel := context.WithCancel(c.connCtx) |
||||
ch := make(chan derpWriteRequest, bufferedDerpWritesBeforeDrop()) |
||||
|
||||
ad.c = dc |
||||
ad.writeCh = ch |
||||
ad.cancel = cancel |
||||
ad.lastWrite = new(time.Time) |
||||
*ad.lastWrite = time.Now() |
||||
ad.createTime = time.Now() |
||||
c.activeDerp[regionID] = ad |
||||
metricNumDERPConns.Set(int64(len(c.activeDerp))) |
||||
c.logActiveDerpLocked() |
||||
c.setPeerLastDerpLocked(peer, regionID, regionID) |
||||
c.scheduleCleanStaleDerpLocked() |
||||
|
||||
// Build a startGate for the derp reader+writer
|
||||
// goroutines, so they don't start running until any
|
||||
// previous generation is closed.
|
||||
startGate := syncs.ClosedChan() |
||||
if prev := c.prevDerp[regionID]; prev != nil { |
||||
startGate = prev.DoneChan() |
||||
} |
||||
// And register a WaitGroup(Chan) for this generation.
|
||||
wg := syncs.NewWaitGroupChan() |
||||
wg.Add(2) |
||||
c.prevDerp[regionID] = wg |
||||
|
||||
if firstDerp { |
||||
startGate = c.derpStarted |
||||
go func() { |
||||
dc.Connect(ctx) |
||||
close(c.derpStarted) |
||||
c.muCond.Broadcast() |
||||
}() |
||||
} |
||||
|
||||
go c.runDerpReader(ctx, addr, dc, wg, startGate) |
||||
go c.runDerpWriter(ctx, dc, ch, wg, startGate) |
||||
go c.derpActiveFunc() |
||||
|
||||
return ad.writeCh |
||||
} |
||||
|
||||
// setPeerLastDerpLocked notes that peer is now being written to via
|
||||
// the provided DERP regionID, and that the peer advertises a DERP
|
||||
// home region ID of homeID.
|
||||
//
|
||||
// If there's any change, it logs.
|
||||
//
|
||||
// c.mu must be held.
|
||||
func (c *Conn) setPeerLastDerpLocked(peer key.NodePublic, regionID, homeID int) { |
||||
if peer.IsZero() { |
||||
return |
||||
} |
||||
old := c.peerLastDerp[peer] |
||||
if old == regionID { |
||||
return |
||||
} |
||||
c.peerLastDerp[peer] = regionID |
||||
|
||||
var newDesc string |
||||
switch { |
||||
case regionID == homeID && regionID == c.myDerp: |
||||
newDesc = "shared home" |
||||
case regionID == homeID: |
||||
newDesc = "their home" |
||||
case regionID == c.myDerp: |
||||
newDesc = "our home" |
||||
case regionID != homeID: |
||||
newDesc = "alt" |
||||
} |
||||
if old == 0 { |
||||
c.logf("[v1] magicsock: derp route for %s set to derp-%d (%s)", peer.ShortString(), regionID, newDesc) |
||||
} else { |
||||
c.logf("[v1] magicsock: derp route for %s changed from derp-%d => derp-%d (%s)", peer.ShortString(), old, regionID, newDesc) |
||||
} |
||||
} |
||||
|
||||
// derpReadResult is the type sent by runDerpClient to ReceiveIPv4
|
||||
// when a DERP packet is available.
|
||||
//
|
||||
// Notably, it doesn't include the derp.ReceivedPacket because we
|
||||
// don't want to give the receiver access to the aliased []byte. To
|
||||
// get at the packet contents they need to call copyBuf to copy it
|
||||
// out, which also releases the buffer.
|
||||
type derpReadResult struct { |
||||
regionID int |
||||
n int // length of data received
|
||||
src key.NodePublic |
||||
// copyBuf is called to copy the data to dst. It returns how
|
||||
// much data was copied, which will be n if dst is large
|
||||
// enough. copyBuf can only be called once.
|
||||
// If copyBuf is nil, that's a signal from the sender to ignore
|
||||
// this message.
|
||||
copyBuf func(dst []byte) int |
||||
} |
||||
|
||||
// runDerpReader runs in a goroutine for the life of a DERP
|
||||
// connection, handling received packets.
|
||||
func (c *Conn) runDerpReader(ctx context.Context, derpFakeAddr netip.AddrPort, dc *derphttp.Client, wg *syncs.WaitGroupChan, startGate <-chan struct{}) { |
||||
defer wg.Decr() |
||||
defer dc.Close() |
||||
|
||||
select { |
||||
case <-startGate: |
||||
case <-ctx.Done(): |
||||
return |
||||
} |
||||
|
||||
didCopy := make(chan struct{}, 1) |
||||
regionID := int(derpFakeAddr.Port()) |
||||
res := derpReadResult{regionID: regionID} |
||||
var pkt derp.ReceivedPacket |
||||
res.copyBuf = func(dst []byte) int { |
||||
n := copy(dst, pkt.Data) |
||||
didCopy <- struct{}{} |
||||
return n |
||||
} |
||||
|
||||
defer health.SetDERPRegionConnectedState(regionID, false) |
||||
defer health.SetDERPRegionHealth(regionID, "") |
||||
|
||||
// peerPresent is the set of senders we know are present on this
|
||||
// connection, based on messages we've received from the server.
|
||||
peerPresent := map[key.NodePublic]bool{} |
||||
bo := backoff.NewBackoff(fmt.Sprintf("derp-%d", regionID), c.logf, 5*time.Second) |
||||
var lastPacketTime time.Time |
||||
var lastPacketSrc key.NodePublic |
||||
|
||||
for { |
||||
msg, connGen, err := dc.RecvDetail() |
||||
if err != nil { |
||||
health.SetDERPRegionConnectedState(regionID, false) |
||||
// Forget that all these peers have routes.
|
||||
for peer := range peerPresent { |
||||
delete(peerPresent, peer) |
||||
c.removeDerpPeerRoute(peer, regionID, dc) |
||||
} |
||||
if err == derphttp.ErrClientClosed { |
||||
return |
||||
} |
||||
if c.networkDown() { |
||||
c.logf("[v1] magicsock: derp.Recv(derp-%d): network down, closing", regionID) |
||||
return |
||||
} |
||||
select { |
||||
case <-ctx.Done(): |
||||
return |
||||
default: |
||||
} |
||||
|
||||
c.logf("magicsock: [%p] derp.Recv(derp-%d): %v", dc, regionID, err) |
||||
|
||||
// If our DERP connection broke, it might be because our network
|
||||
// conditions changed. Start that check.
|
||||
c.ReSTUN("derp-recv-error") |
||||
|
||||
// Back off a bit before reconnecting.
|
||||
bo.BackOff(ctx, err) |
||||
select { |
||||
case <-ctx.Done(): |
||||
return |
||||
default: |
||||
} |
||||
continue |
||||
} |
||||
bo.BackOff(ctx, nil) // reset
|
||||
|
||||
now := time.Now() |
||||
if lastPacketTime.IsZero() || now.Sub(lastPacketTime) > 5*time.Second { |
||||
health.NoteDERPRegionReceivedFrame(regionID) |
||||
lastPacketTime = now |
||||
} |
||||
|
||||
switch m := msg.(type) { |
||||
case derp.ServerInfoMessage: |
||||
health.SetDERPRegionConnectedState(regionID, true) |
||||
health.SetDERPRegionHealth(regionID, "") // until declared otherwise
|
||||
c.logf("magicsock: derp-%d connected; connGen=%v", regionID, connGen) |
||||
continue |
||||
case derp.ReceivedPacket: |
||||
pkt = m |
||||
res.n = len(m.Data) |
||||
res.src = m.Source |
||||
if logDerpVerbose() { |
||||
c.logf("magicsock: got derp-%v packet: %q", regionID, m.Data) |
||||
} |
||||
// If this is a new sender we hadn't seen before, remember it and
|
||||
// register a route for this peer.
|
||||
if res.src != lastPacketSrc { // avoid map lookup w/ high throughput single peer
|
||||
lastPacketSrc = res.src |
||||
if _, ok := peerPresent[res.src]; !ok { |
||||
peerPresent[res.src] = true |
||||
c.addDerpPeerRoute(res.src, regionID, dc) |
||||
} |
||||
} |
||||
case derp.PingMessage: |
||||
// Best effort reply to the ping.
|
||||
pingData := [8]byte(m) |
||||
go func() { |
||||
if err := dc.SendPong(pingData); err != nil { |
||||
c.logf("magicsock: derp-%d SendPong error: %v", regionID, err) |
||||
} |
||||
}() |
||||
continue |
||||
case derp.HealthMessage: |
||||
health.SetDERPRegionHealth(regionID, m.Problem) |
||||
case derp.PeerGoneMessage: |
||||
switch m.Reason { |
||||
case derp.PeerGoneReasonDisconnected: |
||||
// Do nothing.
|
||||
case derp.PeerGoneReasonNotHere: |
||||
metricRecvDiscoDERPPeerNotHere.Add(1) |
||||
c.logf("[unexpected] magicsock: derp-%d does not know about peer %s, removing route", |
||||
regionID, key.NodePublic(m.Peer).ShortString()) |
||||
default: |
||||
metricRecvDiscoDERPPeerGoneUnknown.Add(1) |
||||
c.logf("[unexpected] magicsock: derp-%d peer %s gone, reason %v, removing route", |
||||
regionID, key.NodePublic(m.Peer).ShortString(), m.Reason) |
||||
} |
||||
c.removeDerpPeerRoute(key.NodePublic(m.Peer), regionID, dc) |
||||
default: |
||||
// Ignore.
|
||||
continue |
||||
} |
||||
|
||||
select { |
||||
case <-ctx.Done(): |
||||
return |
||||
case c.derpRecvCh <- res: |
||||
} |
||||
|
||||
select { |
||||
case <-ctx.Done(): |
||||
return |
||||
case <-didCopy: |
||||
continue |
||||
} |
||||
} |
||||
} |
||||
|
||||
type derpWriteRequest struct { |
||||
addr netip.AddrPort |
||||
pubKey key.NodePublic |
||||
b []byte // copied; ownership passed to receiver
|
||||
} |
||||
|
||||
// runDerpWriter runs in a goroutine for the life of a DERP
|
||||
// connection, handling received packets.
|
||||
func (c *Conn) runDerpWriter(ctx context.Context, dc *derphttp.Client, ch <-chan derpWriteRequest, wg *syncs.WaitGroupChan, startGate <-chan struct{}) { |
||||
defer wg.Decr() |
||||
select { |
||||
case <-startGate: |
||||
case <-ctx.Done(): |
||||
return |
||||
} |
||||
|
||||
for { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return |
||||
case wr := <-ch: |
||||
err := dc.Send(wr.pubKey, wr.b) |
||||
if err != nil { |
||||
c.logf("magicsock: derp.Send(%v): %v", wr.addr, err) |
||||
metricSendDERPError.Add(1) |
||||
} else { |
||||
metricSendDERP.Add(1) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (c *connBind) receiveDERP(buffs [][]byte, sizes []int, eps []conn.Endpoint) (int, error) { |
||||
health.ReceiveDERP.Enter() |
||||
defer health.ReceiveDERP.Exit() |
||||
|
||||
for dm := range c.derpRecvCh { |
||||
if c.isClosed() { |
||||
break |
||||
} |
||||
n, ep := c.processDERPReadResult(dm, buffs[0]) |
||||
if n == 0 { |
||||
// No data read occurred. Wait for another packet.
|
||||
continue |
||||
} |
||||
metricRecvDataDERP.Add(1) |
||||
sizes[0] = n |
||||
eps[0] = ep |
||||
return 1, nil |
||||
} |
||||
return 0, net.ErrClosed |
||||
} |
||||
|
||||
func (c *Conn) processDERPReadResult(dm derpReadResult, b []byte) (n int, ep *endpoint) { |
||||
if dm.copyBuf == nil { |
||||
return 0, nil |
||||
} |
||||
var regionID int |
||||
n, regionID = dm.n, dm.regionID |
||||
ncopy := dm.copyBuf(b) |
||||
if ncopy != n { |
||||
err := fmt.Errorf("received DERP packet of length %d that's too big for WireGuard buf size %d", n, ncopy) |
||||
c.logf("magicsock: %v", err) |
||||
return 0, nil |
||||
} |
||||
|
||||
ipp := netip.AddrPortFrom(tailcfg.DerpMagicIPAddr, uint16(regionID)) |
||||
if c.handleDiscoMessage(b[:n], ipp, dm.src, discoRXPathDERP) { |
||||
return 0, nil |
||||
} |
||||
|
||||
var ok bool |
||||
c.mu.Lock() |
||||
ep, ok = c.peerMap.endpointForNodeKey(dm.src) |
||||
c.mu.Unlock() |
||||
if !ok { |
||||
// We don't know anything about this node key, nothing to
|
||||
// record or process.
|
||||
return 0, nil |
||||
} |
||||
|
||||
ep.noteRecvActivity() |
||||
if stats := c.stats.Load(); stats != nil { |
||||
stats.UpdateRxPhysical(ep.nodeAddr, ipp, dm.n) |
||||
} |
||||
return n, ep |
||||
} |
||||
|
||||
// SetDERPMap controls which (if any) DERP servers are used.
|
||||
// A nil value means to disable DERP; it's disabled by default.
|
||||
func (c *Conn) SetDERPMap(dm *tailcfg.DERPMap) { |
||||
c.mu.Lock() |
||||
defer c.mu.Unlock() |
||||
|
||||
var derpAddr = debugUseDERPAddr() |
||||
if derpAddr != "" { |
||||
derpPort := 443 |
||||
if debugUseDERPHTTP() { |
||||
// Match the port for -dev in derper.go
|
||||
derpPort = 3340 |
||||
} |
||||
dm = &tailcfg.DERPMap{ |
||||
OmitDefaultRegions: true, |
||||
Regions: map[int]*tailcfg.DERPRegion{ |
||||
999: { |
||||
RegionID: 999, |
||||
Nodes: []*tailcfg.DERPNode{{ |
||||
Name: "999dev", |
||||
RegionID: 999, |
||||
HostName: derpAddr, |
||||
DERPPort: derpPort, |
||||
}}, |
||||
}, |
||||
}, |
||||
} |
||||
} |
||||
|
||||
if reflect.DeepEqual(dm, c.derpMap) { |
||||
return |
||||
} |
||||
|
||||
c.derpMapAtomic.Store(dm) |
||||
old := c.derpMap |
||||
c.derpMap = dm |
||||
if dm == nil { |
||||
c.closeAllDerpLocked("derp-disabled") |
||||
return |
||||
} |
||||
|
||||
// Reconnect any DERP region that changed definitions.
|
||||
if old != nil { |
||||
changes := false |
||||
for rid, oldDef := range old.Regions { |
||||
if reflect.DeepEqual(oldDef, dm.Regions[rid]) { |
||||
continue |
||||
} |
||||
changes = true |
||||
if rid == c.myDerp { |
||||
c.myDerp = 0 |
||||
} |
||||
c.closeDerpLocked(rid, "derp-region-redefined") |
||||
} |
||||
if changes { |
||||
c.logActiveDerpLocked() |
||||
} |
||||
} |
||||
|
||||
go c.ReSTUN("derp-map-update") |
||||
} |
||||
func (c *Conn) wantDerpLocked() bool { return c.derpMap != nil } |
||||
|
||||
// c.mu must be held.
|
||||
func (c *Conn) closeAllDerpLocked(why string) { |
||||
if len(c.activeDerp) == 0 { |
||||
return // without the useless log statement
|
||||
} |
||||
for i := range c.activeDerp { |
||||
c.closeDerpLocked(i, why) |
||||
} |
||||
c.logActiveDerpLocked() |
||||
} |
||||
|
||||
// maybeCloseDERPsOnRebind, in response to a rebind, closes all
|
||||
// DERP connections that don't have a local address in okayLocalIPs
|
||||
// and pings all those that do.
|
||||
func (c *Conn) maybeCloseDERPsOnRebind(okayLocalIPs []netip.Prefix) { |
||||
c.mu.Lock() |
||||
defer c.mu.Unlock() |
||||
for regionID, ad := range c.activeDerp { |
||||
la, err := ad.c.LocalAddr() |
||||
if err != nil { |
||||
c.closeOrReconnectDERPLocked(regionID, "rebind-no-localaddr") |
||||
continue |
||||
} |
||||
if !tsaddr.PrefixesContainsIP(okayLocalIPs, la.Addr()) { |
||||
c.closeOrReconnectDERPLocked(regionID, "rebind-default-route-change") |
||||
continue |
||||
} |
||||
regionID := regionID |
||||
dc := ad.c |
||||
go func() { |
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) |
||||
defer cancel() |
||||
if err := dc.Ping(ctx); err != nil { |
||||
c.mu.Lock() |
||||
defer c.mu.Unlock() |
||||
c.closeOrReconnectDERPLocked(regionID, "rebind-ping-fail") |
||||
return |
||||
} |
||||
c.logf("post-rebind ping of DERP region %d okay", regionID) |
||||
}() |
||||
} |
||||
c.logActiveDerpLocked() |
||||
} |
||||
|
||||
// closeOrReconnectDERPLocked closes the DERP connection to the
|
||||
// provided regionID and starts reconnecting it if it's our current
|
||||
// home DERP.
|
||||
//
|
||||
// why is a reason for logging.
|
||||
//
|
||||
// c.mu must be held.
|
||||
func (c *Conn) closeOrReconnectDERPLocked(regionID int, why string) { |
||||
c.closeDerpLocked(regionID, why) |
||||
if !c.privateKey.IsZero() && c.myDerp == regionID { |
||||
c.startDerpHomeConnectLocked() |
||||
} |
||||
} |
||||
|
||||
// c.mu must be held.
|
||||
// It is the responsibility of the caller to call logActiveDerpLocked after any set of closes.
|
||||
func (c *Conn) closeDerpLocked(regionID int, why string) { |
||||
if ad, ok := c.activeDerp[regionID]; ok { |
||||
c.logf("magicsock: closing connection to derp-%v (%v), age %v", regionID, why, time.Since(ad.createTime).Round(time.Second)) |
||||
go ad.c.Close() |
||||
ad.cancel() |
||||
delete(c.activeDerp, regionID) |
||||
metricNumDERPConns.Set(int64(len(c.activeDerp))) |
||||
} |
||||
} |
||||
|
||||
// c.mu must be held.
|
||||
func (c *Conn) logActiveDerpLocked() { |
||||
now := time.Now() |
||||
c.logf("magicsock: %v active derp conns%s", len(c.activeDerp), logger.ArgWriter(func(buf *bufio.Writer) { |
||||
if len(c.activeDerp) == 0 { |
||||
return |
||||
} |
||||
buf.WriteString(":") |
||||
c.foreachActiveDerpSortedLocked(func(node int, ad activeDerp) { |
||||
fmt.Fprintf(buf, " derp-%d=cr%v,wr%v", node, simpleDur(now.Sub(ad.createTime)), simpleDur(now.Sub(*ad.lastWrite))) |
||||
}) |
||||
})) |
||||
} |
||||
|
||||
// c.mu must be held.
|
||||
func (c *Conn) foreachActiveDerpSortedLocked(fn func(regionID int, ad activeDerp)) { |
||||
if len(c.activeDerp) < 2 { |
||||
for id, ad := range c.activeDerp { |
||||
fn(id, ad) |
||||
} |
||||
return |
||||
} |
||||
ids := make([]int, 0, len(c.activeDerp)) |
||||
for id := range c.activeDerp { |
||||
ids = append(ids, id) |
||||
} |
||||
sort.Ints(ids) |
||||
for _, id := range ids { |
||||
fn(id, c.activeDerp[id]) |
||||
} |
||||
} |
||||
|
||||
func (c *Conn) cleanStaleDerp() { |
||||
c.mu.Lock() |
||||
defer c.mu.Unlock() |
||||
if c.closed { |
||||
return |
||||
} |
||||
c.derpCleanupTimerArmed = false |
||||
|
||||
tooOld := time.Now().Add(-derpInactiveCleanupTime) |
||||
dirty := false |
||||
someNonHomeOpen := false |
||||
for i, ad := range c.activeDerp { |
||||
if i == c.myDerp { |
||||
continue |
||||
} |
||||
if ad.lastWrite.Before(tooOld) { |
||||
c.closeDerpLocked(i, "idle") |
||||
dirty = true |
||||
} else { |
||||
someNonHomeOpen = true |
||||
} |
||||
} |
||||
if dirty { |
||||
c.logActiveDerpLocked() |
||||
} |
||||
if someNonHomeOpen { |
||||
c.scheduleCleanStaleDerpLocked() |
||||
} |
||||
} |
||||
|
||||
func (c *Conn) scheduleCleanStaleDerpLocked() { |
||||
if c.derpCleanupTimerArmed { |
||||
// Already going to fire soon. Let the existing one
|
||||
// fire lest it get infinitely delayed by repeated
|
||||
// calls to scheduleCleanStaleDerpLocked.
|
||||
return |
||||
} |
||||
c.derpCleanupTimerArmed = true |
||||
if c.derpCleanupTimer != nil { |
||||
c.derpCleanupTimer.Reset(derpCleanStaleInterval) |
||||
} else { |
||||
c.derpCleanupTimer = time.AfterFunc(derpCleanStaleInterval, c.cleanStaleDerp) |
||||
} |
||||
} |
||||
|
||||
// DERPs reports the number of active DERP connections.
|
||||
func (c *Conn) DERPs() int { |
||||
c.mu.Lock() |
||||
defer c.mu.Unlock() |
||||
|
||||
return len(c.activeDerp) |
||||
} |
||||
|
||||
func (c *Conn) derpRegionCodeOfIDLocked(regionID int) string { |
||||
if c.derpMap == nil { |
||||
return "" |
||||
} |
||||
if r, ok := c.derpMap.Regions[regionID]; ok { |
||||
return r.RegionCode |
||||
} |
||||
return "" |
||||
} |
||||
|
||||
// derpAddrFamSelector is the derphttp.AddressFamilySelector we pass
|
||||
// to derphttp.Client.SetAddressFamilySelector.
|
||||
//
|
||||
// It provides the hint as to whether in an IPv4-vs-IPv6 race that
|
||||
// IPv4 should be held back a bit to give IPv6 a better-than-50/50
|
||||
// chance of winning. We only return true when we believe IPv6 will
|
||||
// work anyway, so we don't artificially delay the connection speed.
|
||||
type derpAddrFamSelector struct{ c *Conn } |
||||
|
||||
func (s derpAddrFamSelector) PreferIPv6() bool { |
||||
if r := s.c.lastNetCheckReport.Load(); r != nil { |
||||
return r.IPv6 |
||||
} |
||||
return false |
||||
} |
||||
|
||||
const ( |
||||
// derpInactiveCleanupTime is how long a non-home DERP connection
|
||||
// needs to be idle (last written to) before we close it.
|
||||
derpInactiveCleanupTime = 60 * time.Second |
||||
|
||||
// derpCleanStaleInterval is how often cleanStaleDerp runs when there
|
||||
// are potentially-stale DERP connections to close.
|
||||
derpCleanStaleInterval = 15 * time.Second |
||||
) |
||||
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,168 @@ |
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package magicsock |
||||
|
||||
import ( |
||||
"errors" |
||||
"net" |
||||
"net/netip" |
||||
"sync" |
||||
"sync/atomic" |
||||
|
||||
"golang.org/x/net/ipv6" |
||||
"tailscale.com/net/netaddr" |
||||
"tailscale.com/types/nettype" |
||||
) |
||||
|
||||
// RebindingUDPConn is a UDP socket that can be re-bound.
|
||||
// Unix has no notion of re-binding a socket, so we swap it out for a new one.
|
||||
type RebindingUDPConn struct { |
||||
// pconnAtomic is a pointer to the value stored in pconn, but doesn't
|
||||
// require acquiring mu. It's used for reads/writes and only upon failure
|
||||
// do the reads/writes then check pconn (after acquiring mu) to see if
|
||||
// there's been a rebind meanwhile.
|
||||
// pconn isn't really needed, but makes some of the code simpler
|
||||
// to keep it distinct.
|
||||
// Neither is expected to be nil, sockets are bound on creation.
|
||||
pconnAtomic atomic.Pointer[nettype.PacketConn] |
||||
|
||||
mu sync.Mutex // held while changing pconn (and pconnAtomic)
|
||||
pconn nettype.PacketConn |
||||
port uint16 |
||||
} |
||||
|
||||
// setConnLocked sets the provided nettype.PacketConn. It should be called only
|
||||
// after acquiring RebindingUDPConn.mu. It upgrades the provided
|
||||
// nettype.PacketConn to a *batchingUDPConn when appropriate. This upgrade
|
||||
// is intentionally pushed closest to where read/write ops occur in order to
|
||||
// avoid disrupting surrounding code that assumes nettype.PacketConn is a
|
||||
// *net.UDPConn.
|
||||
func (c *RebindingUDPConn) setConnLocked(p nettype.PacketConn, network string, batchSize int) { |
||||
upc := tryUpgradeToBatchingUDPConn(p, network, batchSize) |
||||
c.pconn = upc |
||||
c.pconnAtomic.Store(&upc) |
||||
c.port = uint16(c.localAddrLocked().Port) |
||||
} |
||||
|
||||
// currentConn returns c's current pconn, acquiring c.mu in the process.
|
||||
func (c *RebindingUDPConn) currentConn() nettype.PacketConn { |
||||
c.mu.Lock() |
||||
defer c.mu.Unlock() |
||||
return c.pconn |
||||
} |
||||
|
||||
func (c *RebindingUDPConn) readFromWithInitPconn(pconn nettype.PacketConn, b []byte) (int, netip.AddrPort, error) { |
||||
for { |
||||
n, addr, err := pconn.ReadFromUDPAddrPort(b) |
||||
if err != nil && pconn != c.currentConn() { |
||||
pconn = *c.pconnAtomic.Load() |
||||
continue |
||||
} |
||||
return n, addr, err |
||||
} |
||||
} |
||||
|
||||
// ReadFromUDPAddrPort reads a packet from c into b.
|
||||
// It returns the number of bytes copied and the source address.
|
||||
func (c *RebindingUDPConn) ReadFromUDPAddrPort(b []byte) (int, netip.AddrPort, error) { |
||||
return c.readFromWithInitPconn(*c.pconnAtomic.Load(), b) |
||||
} |
||||
|
||||
// WriteBatchTo writes buffs to addr.
|
||||
func (c *RebindingUDPConn) WriteBatchTo(buffs [][]byte, addr netip.AddrPort) error { |
||||
for { |
||||
pconn := *c.pconnAtomic.Load() |
||||
b, ok := pconn.(*batchingUDPConn) |
||||
if !ok { |
||||
for _, buf := range buffs { |
||||
_, err := c.writeToUDPAddrPortWithInitPconn(pconn, buf, addr) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
err := b.WriteBatchTo(buffs, addr) |
||||
if err != nil { |
||||
if pconn != c.currentConn() { |
||||
continue |
||||
} |
||||
return err |
||||
} |
||||
return err |
||||
} |
||||
} |
||||
|
||||
// ReadBatch reads messages from c into msgs. It returns the number of messages
|
||||
// the caller should evaluate for nonzero len, as a zero len message may fall
|
||||
// on either side of a nonzero.
|
||||
func (c *RebindingUDPConn) ReadBatch(msgs []ipv6.Message, flags int) (int, error) { |
||||
for { |
||||
pconn := *c.pconnAtomic.Load() |
||||
b, ok := pconn.(*batchingUDPConn) |
||||
if !ok { |
||||
n, ap, err := c.readFromWithInitPconn(pconn, msgs[0].Buffers[0]) |
||||
if err == nil { |
||||
msgs[0].N = n |
||||
msgs[0].Addr = net.UDPAddrFromAddrPort(netaddr.Unmap(ap)) |
||||
return 1, nil |
||||
} |
||||
return 0, err |
||||
} |
||||
n, err := b.ReadBatch(msgs, flags) |
||||
if err != nil && pconn != c.currentConn() { |
||||
continue |
||||
} |
||||
return n, err |
||||
} |
||||
} |
||||
|
||||
func (c *RebindingUDPConn) Port() uint16 { |
||||
c.mu.Lock() |
||||
defer c.mu.Unlock() |
||||
return c.port |
||||
} |
||||
|
||||
func (c *RebindingUDPConn) LocalAddr() *net.UDPAddr { |
||||
c.mu.Lock() |
||||
defer c.mu.Unlock() |
||||
return c.localAddrLocked() |
||||
} |
||||
|
||||
func (c *RebindingUDPConn) localAddrLocked() *net.UDPAddr { |
||||
return c.pconn.LocalAddr().(*net.UDPAddr) |
||||
} |
||||
|
||||
// errNilPConn is returned by RebindingUDPConn.Close when there is no current pconn.
|
||||
// It is for internal use only and should not be returned to users.
|
||||
var errNilPConn = errors.New("nil pconn") |
||||
|
||||
func (c *RebindingUDPConn) Close() error { |
||||
c.mu.Lock() |
||||
defer c.mu.Unlock() |
||||
return c.closeLocked() |
||||
} |
||||
|
||||
func (c *RebindingUDPConn) closeLocked() error { |
||||
if c.pconn == nil { |
||||
return errNilPConn |
||||
} |
||||
c.port = 0 |
||||
return c.pconn.Close() |
||||
} |
||||
|
||||
func (c *RebindingUDPConn) writeToUDPAddrPortWithInitPconn(pconn nettype.PacketConn, b []byte, addr netip.AddrPort) (int, error) { |
||||
for { |
||||
n, err := pconn.WriteToUDPAddrPort(b, addr) |
||||
if err != nil && pconn != c.currentConn() { |
||||
pconn = *c.pconnAtomic.Load() |
||||
continue |
||||
} |
||||
return n, err |
||||
} |
||||
} |
||||
|
||||
func (c *RebindingUDPConn) WriteToUDPAddrPort(b []byte, addr netip.AddrPort) (int, error) { |
||||
return c.writeToUDPAddrPortWithInitPconn(*c.pconnAtomic.Load(), b, addr) |
||||
} |
||||
Loading…
Reference in new issue