net/batching: use vectored writes on Linux (#19054)

On Linux batching.Conn will now write a vector of
coalesced buffers via sendmmsg(2) instead of copying
fragments into a single buffer.

Scatter-gather I/O has been available on Linux since the
earliest days (reworked in 2.6.24). Kernel passes fragments
to the driver if it supports it, otherwise linearizes
upon receiving the data.

Removing the copy overhead from userspace yields up to 4-5%
packet and bitrate improvement on Linux with GSO enabled:
46Gb/s 4.4m pps vs 44Gb/s 4.2m pps w/32 Peer Relay client flows.

Updates tailscale/corp#36989


Change-Id: Idb2248d0964fb011f1c8f957ca555eab6a6a6964

Signed-off-by: Alex Valiushko <alexvaliushko@tailscale.com>
main
Alex Valiushko 3 weeks ago committed by GitHub
parent 18983eca66
commit 330a17b7d7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 43
      net/batching/conn_linux.go
  2. 46
      net/batching/conn_linux_test.go

@ -87,6 +87,15 @@ const (
// This was initially established for Linux, but may split out to
// GOOS-specific values later. It originates as UDP_MAX_SEGMENTS in the
// kernel's TX path, and UDP_GRO_CNT_MAX for RX.
//
// As long as we use one fragment per datagram, this also serves as a
// limit for the number of fragments we can coalesce during scatter-gather writes.
//
// 64 is below the 1024 of IOV_MAX (Linux) or UIO_MAXIOV (BSD),
// and the 256 of WSABUF_MAX_COUNT (Windows).
//
// (2026-04) If we begin shipping datagrams in more than one fragment,
// an independent fragment count limit needs to be implemented.
udpSegmentMaxDatagrams = 64
)
@ -99,15 +108,24 @@ const (
// coalesceMessages iterates 'buffs', setting and coalescing them in 'msgs'
// where possible while maintaining datagram order.
//
// It aggregates message components as a list of buffers without copying,
// and expects to be used only on Linux with scatter-gather writes via sendmmsg(2).
//
// All msgs[i].Buffers len must be one. Will panic if there is not enough msgs
// to coalesce all buffs.
//
// All msgs have their Addr field set to addr.
//
// All msgs[i].Buffers[0] are preceded by a Geneve header (geneve) if geneve.VNI.IsSet().
//
// TODO(illotum) explore MSG_ZEROCOPY for large writes (>10KB).
func (c *linuxBatchingConn) coalesceMessages(addr *net.UDPAddr, geneve packet.GeneveHeader, buffs [][]byte, msgs []ipv6.Message, offset int) int {
var (
base = -1 // index of msg we are currently coalescing into
gsoSize int // segmentation size of msgs[base]
dgramCnt int // number of dgrams coalesced into msgs[base]
endBatch bool // tracking flag to start a new batch on next iteration of buffs
base = -1 // index of msg we are currently coalescing into
gsoSize int // segmentation size of msgs[base]
dgramCnt int // number of dgrams coalesced into msgs[base]
endBatch bool // tracking flag to start a new batch on next iteration of buffs
coalescedLen int // bytes coalesced into msgs[base]
)
maxPayloadLen := maxIPv4PayloadLen
if addr.IP.To4() == nil {
@ -122,19 +140,18 @@ func (c *linuxBatchingConn) coalesceMessages(addr *net.UDPAddr, geneve packet.Ge
}
if i > 0 {
msgLen := len(buff)
baseLenBefore := len(msgs[base].Buffers[0])
freeBaseCap := cap(msgs[base].Buffers[0]) - baseLenBefore
if msgLen+baseLenBefore <= maxPayloadLen &&
if msgLen+coalescedLen <= maxPayloadLen &&
msgLen <= gsoSize &&
msgLen <= freeBaseCap &&
dgramCnt < udpSegmentMaxDatagrams &&
!endBatch {
msgs[base].Buffers[0] = append(msgs[base].Buffers[0], make([]byte, msgLen)...)
copy(msgs[base].Buffers[0][baseLenBefore:], buff)
// msgs[base].Buffers[0] is set to buff[i] when a new base is set.
// This appends a struct iovec element in the underlying struct msghdr (scatter-gather).
msgs[base].Buffers = append(msgs[base].Buffers, buff)
if i == len(buffs)-1 {
setGSOSizeInControl(&msgs[base].OOB, uint16(gsoSize))
}
dgramCnt++
coalescedLen += msgLen
if msgLen < gsoSize {
// A smaller than gsoSize packet on the tail is legal, but
// it must end the batch.
@ -155,6 +172,7 @@ func (c *linuxBatchingConn) coalesceMessages(addr *net.UDPAddr, geneve packet.Ge
msgs[base].Buffers[0] = buff
msgs[base].Addr = addr
dgramCnt = 1
coalescedLen = len(buff)
}
return base + 1
}
@ -171,7 +189,10 @@ func (c *linuxBatchingConn) getSendBatch() *sendBatch {
func (c *linuxBatchingConn) putSendBatch(batch *sendBatch) {
for i := range batch.msgs {
batch.msgs[i] = ipv6.Message{Buffers: batch.msgs[i].Buffers, OOB: batch.msgs[i].OOB}
// Non coalesced write paths access only batch.msgs[i].Buffers[0],
// but we append more during [linuxBatchingConn.coalesceMessages].
// Leave index zero accessible:
batch.msgs[i] = ipv6.Message{Buffers: batch.msgs[i].Buffers[:1], OOB: batch.msgs[i].OOB}
}
c.sendBatchPool.Put(batch)
}

@ -152,10 +152,12 @@ func Test_linuxBatchingConn_coalesceMessages(t *testing.T) {
geneve.VNI.Set(1)
cases := []struct {
name string
buffs [][]byte
geneve packet.GeneveHeader
wantLens []int
name string
buffs [][]byte
geneve packet.GeneveHeader
// Each wantLens slice corresponds to the Buffers of a single coalesced message,
// and each int is the expected length of the corresponding Buffer[i].
wantLens [][]int
wantGSO []int
}{
{
@ -163,7 +165,7 @@ func Test_linuxBatchingConn_coalesceMessages(t *testing.T) {
buffs: [][]byte{
withGeneveSpace(1, 1),
},
wantLens: []int{1},
wantLens: [][]int{{1}},
wantGSO: []int{0},
},
{
@ -172,7 +174,7 @@ func Test_linuxBatchingConn_coalesceMessages(t *testing.T) {
withGeneveSpace(1, 1),
},
geneve: geneve,
wantLens: []int{1 + packet.GeneveFixedHeaderLength},
wantLens: [][]int{{1 + packet.GeneveFixedHeaderLength}},
wantGSO: []int{0},
},
{
@ -181,7 +183,7 @@ func Test_linuxBatchingConn_coalesceMessages(t *testing.T) {
withGeneveSpace(1, 2),
withGeneveSpace(1, 1),
},
wantLens: []int{2},
wantLens: [][]int{{1, 1}},
wantGSO: []int{1},
},
{
@ -191,7 +193,7 @@ func Test_linuxBatchingConn_coalesceMessages(t *testing.T) {
withGeneveSpace(1, 1),
},
geneve: geneve,
wantLens: []int{2 + (2 * packet.GeneveFixedHeaderLength)},
wantLens: [][]int{{1 + packet.GeneveFixedHeaderLength, 1 + packet.GeneveFixedHeaderLength}},
wantGSO: []int{1 + packet.GeneveFixedHeaderLength},
},
{
@ -200,7 +202,7 @@ func Test_linuxBatchingConn_coalesceMessages(t *testing.T) {
withGeneveSpace(2, 3),
withGeneveSpace(1, 1),
},
wantLens: []int{3},
wantLens: [][]int{{2, 1}},
wantGSO: []int{2},
},
{
@ -210,7 +212,7 @@ func Test_linuxBatchingConn_coalesceMessages(t *testing.T) {
withGeneveSpace(1, 1),
},
geneve: geneve,
wantLens: []int{3 + (2 * packet.GeneveFixedHeaderLength)},
wantLens: [][]int{{2 + packet.GeneveFixedHeaderLength, 1 + packet.GeneveFixedHeaderLength}},
wantGSO: []int{2 + packet.GeneveFixedHeaderLength},
},
{
@ -220,7 +222,7 @@ func Test_linuxBatchingConn_coalesceMessages(t *testing.T) {
withGeneveSpace(1, 1),
withGeneveSpace(2, 2),
},
wantLens: []int{3, 2},
wantLens: [][]int{{2, 1}, {2}},
wantGSO: []int{2, 0},
},
{
@ -231,7 +233,7 @@ func Test_linuxBatchingConn_coalesceMessages(t *testing.T) {
withGeneveSpace(2, 2),
},
geneve: geneve,
wantLens: []int{3 + (2 * packet.GeneveFixedHeaderLength), 2 + packet.GeneveFixedHeaderLength},
wantLens: [][]int{{2 + packet.GeneveFixedHeaderLength, 1 + packet.GeneveFixedHeaderLength}, {2 + packet.GeneveFixedHeaderLength}},
wantGSO: []int{2 + packet.GeneveFixedHeaderLength, 0},
},
{
@ -241,8 +243,8 @@ func Test_linuxBatchingConn_coalesceMessages(t *testing.T) {
withGeneveSpace(2, 2),
withGeneveSpace(2, 2),
},
wantLens: []int{4, 2},
wantGSO: []int{2, 0},
wantLens: [][]int{{2, 2, 2}},
wantGSO: []int{2},
},
{
name: "three messages limited cap coalesce vni.isSet",
@ -252,8 +254,8 @@ func Test_linuxBatchingConn_coalesceMessages(t *testing.T) {
withGeneveSpace(2, 2),
},
geneve: geneve,
wantLens: []int{4 + (2 * packet.GeneveFixedHeaderLength), 2 + packet.GeneveFixedHeaderLength},
wantGSO: []int{2 + packet.GeneveFixedHeaderLength, 0},
wantLens: [][]int{{2 + packet.GeneveFixedHeaderLength, 2 + packet.GeneveFixedHeaderLength, 2 + packet.GeneveFixedHeaderLength}},
wantGSO: []int{2 + packet.GeneveFixedHeaderLength},
},
}
@ -276,10 +278,16 @@ func Test_linuxBatchingConn_coalesceMessages(t *testing.T) {
if msgs[i].Addr != addr {
t.Errorf("msgs[%d].Addr != passed addr", i)
}
gotLen := len(msgs[i].Buffers[0])
if gotLen != tt.wantLens[i] {
t.Errorf("len(msgs[%d].Buffers[0]) %d != %d", i, gotLen, tt.wantLens[i])
if len(msgs[i].Buffers) != len(tt.wantLens[i]) {
t.Fatalf("len(msgs[%d].Buffers) %d != %d", i, len(msgs[i].Buffers), len(tt.wantLens[i]))
}
for j := range tt.wantLens[i] {
gotLen := len(msgs[i].Buffers[j])
if gotLen != tt.wantLens[i][j] {
t.Errorf("len(msgs[%d].Buffers[%d]) %d != %d", i, j, gotLen, tt.wantLens[i][j])
}
}
// coalesceMessages calls setGSOSizeInControl, which uses a cmsg
// type of UDP_SEGMENT, and getGSOSizeInControl scans for a cmsg
// type of UDP_GRO. Therefore, we have to use the lower-level

Loading…
Cancel
Save