derp, magicsock: send new "peer gone" frames when previous sender disconnects

Updates #150 (not yet enabled by default in magicsock)

Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
Brad Fitzpatrick
2020-03-21 18:24:28 -07:00
committed by Brad Fitzpatrick
parent e60b433831
commit 1ab5b31c4b
5 changed files with 174 additions and 6 deletions
+7
View File
@@ -74,6 +74,13 @@ const (
frameRecvPacket = frameType(0x05) // v0/1: packet bytes, v2: 32B src pub key + packet bytes
frameKeepAlive = frameType(0x06) // no payload, no-op (to be replaced with ping/pong)
frameNotePreferred = frameType(0x07) // 1 byte payload: 0x01 or 0x00 for whether this is client's home node
// framePeerGone is sent from server to client to signal that
// a previous sender is no longer connected. That is, if A
// sent to B, and then if A disconnects, the server sends
// framePeerGone to B so B can forget that a reverse path
// exists on that connection to get back to A.
framePeerGone = frameType(0x08) // 32B pub key of peer that's gone
)
var bin = binary.BigEndian
+16
View File
@@ -204,6 +204,13 @@ type ReceivedPacket struct {
func (ReceivedPacket) msg() {}
// PeerGoneMessage is a ReceivedMessage that indicates that the client
// identified by the underlying public key had previously sent you a
// packet but has now disconnected from the server.
type PeerGoneMessage key.Public
func (PeerGoneMessage) msg() {}
// Recv reads a message from the DERP server.
// The provided buffer must be large enough to receive a complete packet,
// which in practice are are 1.5-4 KB, but can be up to 64 KB.
@@ -232,6 +239,15 @@ func (c *Client) Recv(b []byte) (m ReceivedMessage, err error) {
// TODO: eventually we'll have server->client pings that
// require ack pongs.
continue
case framePeerGone:
if n < keyLen {
c.logf("[unexpected] dropping short peerGone frame from DERP server")
continue
}
var pg PeerGoneMessage
copy(pg[:], b[:keyLen])
return pg, nil
case frameRecvPacket:
var rp ReceivedPacket
if c.protoVersion < protocolSrcAddrs {
+53 -3
View File
@@ -58,6 +58,7 @@ type Server struct {
packetsDroppedQueueHead *expvar.Int // queue full, drop head packet
packetsDroppedQueueTail *expvar.Int // queue full, drop tail packet
packetsDroppedWrite *expvar.Int // error writing to dst conn
peerGoneFrames expvar.Int // number of peer gone frames sent
accepts expvar.Int
curClients expvar.Int
curHomeClients expvar.Int // ones with preferred
@@ -151,8 +152,9 @@ func (s *Server) isClosed() bool {
func (s *Server) Accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) {
closed := make(chan struct{})
s.accepts.Add(1)
s.mu.Lock()
s.accepts.Add(1) // while holding s.mu for connNum read on next line
connNum := s.accepts.Value() // expvar sadly doesn't return new value on Add(1)
s.netConns[nc] = closed
s.mu.Unlock()
@@ -165,7 +167,7 @@ func (s *Server) Accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) {
s.mu.Unlock()
}()
if err := s.accept(nc, brw, remoteAddr); err != nil && !s.isClosed() {
if err := s.accept(nc, brw, remoteAddr, connNum); err != nil && !s.isClosed() {
s.logf("derp: %s: %v", remoteAddr, err)
}
}
@@ -202,9 +204,17 @@ func (s *Server) unregisterClient(c *sclient) {
if c.preferred {
s.curHomeClients.Add(-1)
}
// Find still-connected peers to notify that we've gone away
// so they can drop their route entries to us. (issue 150)
for pubKey, connNum := range c.sentTo {
if peer, ok := s.clients[pubKey]; ok && peer.connNum == connNum {
go peer.requestPeerGoneWrite(c.key)
}
}
}
func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) error {
func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connNum int64) error {
br, bw := brw.Reader, brw.Writer
nc.SetDeadline(time.Now().Add(10 * time.Second))
if err := s.sendServerKey(bw); err != nil {
@@ -226,6 +236,7 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) error
defer cancel()
c := &sclient{
connNum: connNum,
s: s,
key: clientKey,
nc: nc,
@@ -236,6 +247,8 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) error
remoteAddr: remoteAddr,
connectedAt: time.Now(),
sendQueue: make(chan pkt, perClientSendQueueDepth),
peerGone: make(chan key.Public),
sentTo: make(map[key.Public]int64),
}
if clientInfo != nil {
c.info = *clientInfo
@@ -330,6 +343,11 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
return nil
}
// Track that we've sent to this peer, so if/when we
// disconnect first, the server can inform all our old
// recipients that we're gone. (Issue 150 optimization)
c.sentTo[dstKey] = dst.connNum
p := pkt{
bs: contents,
}
@@ -378,6 +396,16 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
return nil
}
// requestPeerGoneWrite sends a request to write a "peer gone" frame
// that the provided peer has disconnected. It blocks until either the
// write request is scheduled, or the client has closed.
func (c *sclient) requestPeerGoneWrite(peer key.Public) {
select {
case c.peerGone <- peer:
case <-c.done:
}
}
func (s *Server) verifyClient(clientKey key.Public, info *clientInfo) error {
// TODO(crawshaw): implement policy constraints on who can use the DERP server
// TODO(bradfitz): ... and at what rate.
@@ -483,6 +511,7 @@ func (s *Server) recvPacket(br *bufio.Reader, frameLen uint32) (dstKey key.Publi
// (The "s" prefix is to more explicitly distinguish it from Client in derp_client.go)
type sclient struct {
// Static after construction.
connNum int64 // process-wide unique counter, incremented each Accept
s *Server
nc Conn
key key.Public
@@ -491,11 +520,15 @@ type sclient struct {
done <-chan struct{} // closed when connection closes
remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String()
sendQueue chan pkt // packets queued to this client; never closed
peerGone chan key.Public // write request that a previous sender has disconnected
// Owned by run, not thread-safe.
br *bufio.Reader
connectedAt time.Time
preferred bool
// sentTo tracks all the peers this client has ever sent a packet to, and at which
// connection number.
sentTo map[key.Public]int64 // recipient => rcpt's latest sclient.connNum
// Owned by sender, not thread-safe.
bw *bufio.Writer
@@ -577,6 +610,9 @@ func (c *sclient) sendLoop(ctx context.Context) error {
select {
case <-ctx.Done():
return nil
case peer := <-c.peerGone:
werr = c.sendPeerGone(peer)
continue
case msg := <-c.sendQueue:
werr = c.sendPacket(msg.src, msg.bs)
continue
@@ -595,6 +631,8 @@ func (c *sclient) sendLoop(ctx context.Context) error {
select {
case <-ctx.Done():
return nil
case peer := <-c.peerGone:
werr = c.sendPeerGone(peer)
case msg := <-c.sendQueue:
werr = c.sendPacket(msg.src, msg.bs)
case <-keepAliveTick.C:
@@ -613,6 +651,17 @@ func (c *sclient) sendKeepAlive() error {
return writeFrameHeader(c.bw, frameKeepAlive, 0)
}
// sendPeerGone sends a peerGone frame, without flushing.
func (c *sclient) sendPeerGone(peer key.Public) error {
c.s.peerGoneFrames.Add(1)
c.setWriteDeadline()
if err := writeFrameHeader(c.bw, framePeerGone, keyLen); err != nil {
return err
}
_, err := c.bw.Write(peer[:])
return err
}
// sendPacket writes contents to the client in a RecvPacket frame. If
// srcKey.IsZero, uses the old DERPv1 framing format, otherwise uses
// DERPv2. The bytes of contents are only valid until this function
@@ -678,5 +727,6 @@ func (s *Server) ExpVar() expvar.Var {
m.Set("unknown_frames", &s.unknownFrames)
m.Set("home_moves_in", &s.homeMovesIn)
m.Set("home_moves_out", &s.homeMovesOut)
m.Set("peer_gone_frames", &s.peerGoneFrames)
return m
}
+20
View File
@@ -9,6 +9,7 @@ import (
"context"
crand "crypto/rand"
"errors"
"expvar"
"fmt"
"io"
"net"
@@ -80,6 +81,8 @@ func TestSendRecv(t *testing.T) {
t.Logf("Connected client %d.", i)
}
var peerGoneCount expvar.Int
t.Logf("Starting read loops")
for i := 0; i < numClients; i++ {
go func(i int) {
@@ -94,6 +97,8 @@ func TestSendRecv(t *testing.T) {
default:
t.Errorf("unexpected message type %T", m)
continue
case PeerGoneMessage:
peerGoneCount.Add(1)
case ReceivedPacket:
if m.Source.IsZero() {
t.Errorf("zero Source address in ReceivedPacket")
@@ -138,6 +143,18 @@ func TestSendRecv(t *testing.T) {
t.Errorf("total/home=%v/%v; want %v/%v", gotTotal, gotHome, total, home)
}
wantClosedPeers := func(want int64) {
t.Helper()
var got int64
dl := time.Now().Add(5 * time.Second)
for time.Now().Before(dl) {
if got = peerGoneCount.Value(); got == want {
return
}
}
t.Errorf("peer gone count = %v; want %v", got, want)
}
msg1 := []byte("hello 0->1\n")
if err := clients[0].Send(clientKeys[1], msg1); err != nil {
t.Fatal(err)
@@ -167,15 +184,18 @@ func TestSendRecv(t *testing.T) {
wantActive(3, 1)
connsOut[1].Close()
wantActive(2, 0)
wantClosedPeers(1)
clients[2].NotePreferred(true)
wantActive(2, 1)
clients[2].NotePreferred(false)
wantActive(2, 0)
connsOut[2].Close()
wantActive(1, 0)
wantClosedPeers(1)
t.Logf("passed")
s.Close()
}
func TestSendFreeze(t *testing.T) {