diff --git a/net/batching/conn_linux.go b/net/batching/conn_linux.go index 036fa1318..ea11f439a 100644 --- a/net/batching/conn_linux.go +++ b/net/batching/conn_linux.go @@ -87,6 +87,15 @@ const ( // This was initially established for Linux, but may split out to // GOOS-specific values later. It originates as UDP_MAX_SEGMENTS in the // kernel's TX path, and UDP_GRO_CNT_MAX for RX. + // + // As long as we use one fragment per datagram, this also serves as a + // limit for the number of fragments we can coalesce during scatter-gather writes. + // + // 64 is below the 1024 of IOV_MAX (Linux) or UIO_MAXIOV (BSD), + // and the 256 of WSABUF_MAX_COUNT (Windows). + // + // (2026-04) If we begin shipping datagrams in more than one fragment, + // an independent fragment count limit needs to be implemented. udpSegmentMaxDatagrams = 64 ) @@ -99,15 +108,24 @@ const ( // coalesceMessages iterates 'buffs', setting and coalescing them in 'msgs' // where possible while maintaining datagram order. // +// It aggregates message components as a list of buffers without copying, +// and expects to be used only on Linux with scatter-gather writes via sendmmsg(2). +// +// All msgs[i].Buffers len must be one. Will panic if there is not enough msgs +// to coalesce all buffs. +// // All msgs have their Addr field set to addr. // // All msgs[i].Buffers[0] are preceded by a Geneve header (geneve) if geneve.VNI.IsSet(). +// +// TODO(illotum) explore MSG_ZEROCOPY for large writes (>10KB). func (c *linuxBatchingConn) coalesceMessages(addr *net.UDPAddr, geneve packet.GeneveHeader, buffs [][]byte, msgs []ipv6.Message, offset int) int { var ( - base = -1 // index of msg we are currently coalescing into - gsoSize int // segmentation size of msgs[base] - dgramCnt int // number of dgrams coalesced into msgs[base] - endBatch bool // tracking flag to start a new batch on next iteration of buffs + base = -1 // index of msg we are currently coalescing into + gsoSize int // segmentation size of msgs[base] + dgramCnt int // number of dgrams coalesced into msgs[base] + endBatch bool // tracking flag to start a new batch on next iteration of buffs + coalescedLen int // bytes coalesced into msgs[base] ) maxPayloadLen := maxIPv4PayloadLen if addr.IP.To4() == nil { @@ -122,19 +140,18 @@ func (c *linuxBatchingConn) coalesceMessages(addr *net.UDPAddr, geneve packet.Ge } if i > 0 { msgLen := len(buff) - baseLenBefore := len(msgs[base].Buffers[0]) - freeBaseCap := cap(msgs[base].Buffers[0]) - baseLenBefore - if msgLen+baseLenBefore <= maxPayloadLen && + if msgLen+coalescedLen <= maxPayloadLen && msgLen <= gsoSize && - msgLen <= freeBaseCap && dgramCnt < udpSegmentMaxDatagrams && !endBatch { - msgs[base].Buffers[0] = append(msgs[base].Buffers[0], make([]byte, msgLen)...) - copy(msgs[base].Buffers[0][baseLenBefore:], buff) + // msgs[base].Buffers[0] is set to buff[i] when a new base is set. + // This appends a struct iovec element in the underlying struct msghdr (scatter-gather). + msgs[base].Buffers = append(msgs[base].Buffers, buff) if i == len(buffs)-1 { setGSOSizeInControl(&msgs[base].OOB, uint16(gsoSize)) } dgramCnt++ + coalescedLen += msgLen if msgLen < gsoSize { // A smaller than gsoSize packet on the tail is legal, but // it must end the batch. @@ -155,6 +172,7 @@ func (c *linuxBatchingConn) coalesceMessages(addr *net.UDPAddr, geneve packet.Ge msgs[base].Buffers[0] = buff msgs[base].Addr = addr dgramCnt = 1 + coalescedLen = len(buff) } return base + 1 } @@ -171,7 +189,10 @@ func (c *linuxBatchingConn) getSendBatch() *sendBatch { func (c *linuxBatchingConn) putSendBatch(batch *sendBatch) { for i := range batch.msgs { - batch.msgs[i] = ipv6.Message{Buffers: batch.msgs[i].Buffers, OOB: batch.msgs[i].OOB} + // Non coalesced write paths access only batch.msgs[i].Buffers[0], + // but we append more during [linuxBatchingConn.coalesceMessages]. + // Leave index zero accessible: + batch.msgs[i] = ipv6.Message{Buffers: batch.msgs[i].Buffers[:1], OOB: batch.msgs[i].OOB} } c.sendBatchPool.Put(batch) } diff --git a/net/batching/conn_linux_test.go b/net/batching/conn_linux_test.go index 62bdb9d81..8f211cf36 100644 --- a/net/batching/conn_linux_test.go +++ b/net/batching/conn_linux_test.go @@ -152,10 +152,12 @@ func Test_linuxBatchingConn_coalesceMessages(t *testing.T) { geneve.VNI.Set(1) cases := []struct { - name string - buffs [][]byte - geneve packet.GeneveHeader - wantLens []int + name string + buffs [][]byte + geneve packet.GeneveHeader + // Each wantLens slice corresponds to the Buffers of a single coalesced message, + // and each int is the expected length of the corresponding Buffer[i]. + wantLens [][]int wantGSO []int }{ { @@ -163,7 +165,7 @@ func Test_linuxBatchingConn_coalesceMessages(t *testing.T) { buffs: [][]byte{ withGeneveSpace(1, 1), }, - wantLens: []int{1}, + wantLens: [][]int{{1}}, wantGSO: []int{0}, }, { @@ -172,7 +174,7 @@ func Test_linuxBatchingConn_coalesceMessages(t *testing.T) { withGeneveSpace(1, 1), }, geneve: geneve, - wantLens: []int{1 + packet.GeneveFixedHeaderLength}, + wantLens: [][]int{{1 + packet.GeneveFixedHeaderLength}}, wantGSO: []int{0}, }, { @@ -181,7 +183,7 @@ func Test_linuxBatchingConn_coalesceMessages(t *testing.T) { withGeneveSpace(1, 2), withGeneveSpace(1, 1), }, - wantLens: []int{2}, + wantLens: [][]int{{1, 1}}, wantGSO: []int{1}, }, { @@ -191,7 +193,7 @@ func Test_linuxBatchingConn_coalesceMessages(t *testing.T) { withGeneveSpace(1, 1), }, geneve: geneve, - wantLens: []int{2 + (2 * packet.GeneveFixedHeaderLength)}, + wantLens: [][]int{{1 + packet.GeneveFixedHeaderLength, 1 + packet.GeneveFixedHeaderLength}}, wantGSO: []int{1 + packet.GeneveFixedHeaderLength}, }, { @@ -200,7 +202,7 @@ func Test_linuxBatchingConn_coalesceMessages(t *testing.T) { withGeneveSpace(2, 3), withGeneveSpace(1, 1), }, - wantLens: []int{3}, + wantLens: [][]int{{2, 1}}, wantGSO: []int{2}, }, { @@ -210,7 +212,7 @@ func Test_linuxBatchingConn_coalesceMessages(t *testing.T) { withGeneveSpace(1, 1), }, geneve: geneve, - wantLens: []int{3 + (2 * packet.GeneveFixedHeaderLength)}, + wantLens: [][]int{{2 + packet.GeneveFixedHeaderLength, 1 + packet.GeneveFixedHeaderLength}}, wantGSO: []int{2 + packet.GeneveFixedHeaderLength}, }, { @@ -220,7 +222,7 @@ func Test_linuxBatchingConn_coalesceMessages(t *testing.T) { withGeneveSpace(1, 1), withGeneveSpace(2, 2), }, - wantLens: []int{3, 2}, + wantLens: [][]int{{2, 1}, {2}}, wantGSO: []int{2, 0}, }, { @@ -231,7 +233,7 @@ func Test_linuxBatchingConn_coalesceMessages(t *testing.T) { withGeneveSpace(2, 2), }, geneve: geneve, - wantLens: []int{3 + (2 * packet.GeneveFixedHeaderLength), 2 + packet.GeneveFixedHeaderLength}, + wantLens: [][]int{{2 + packet.GeneveFixedHeaderLength, 1 + packet.GeneveFixedHeaderLength}, {2 + packet.GeneveFixedHeaderLength}}, wantGSO: []int{2 + packet.GeneveFixedHeaderLength, 0}, }, { @@ -241,8 +243,8 @@ func Test_linuxBatchingConn_coalesceMessages(t *testing.T) { withGeneveSpace(2, 2), withGeneveSpace(2, 2), }, - wantLens: []int{4, 2}, - wantGSO: []int{2, 0}, + wantLens: [][]int{{2, 2, 2}}, + wantGSO: []int{2}, }, { name: "three messages limited cap coalesce vni.isSet", @@ -252,8 +254,8 @@ func Test_linuxBatchingConn_coalesceMessages(t *testing.T) { withGeneveSpace(2, 2), }, geneve: geneve, - wantLens: []int{4 + (2 * packet.GeneveFixedHeaderLength), 2 + packet.GeneveFixedHeaderLength}, - wantGSO: []int{2 + packet.GeneveFixedHeaderLength, 0}, + wantLens: [][]int{{2 + packet.GeneveFixedHeaderLength, 2 + packet.GeneveFixedHeaderLength, 2 + packet.GeneveFixedHeaderLength}}, + wantGSO: []int{2 + packet.GeneveFixedHeaderLength}, }, } @@ -276,10 +278,16 @@ func Test_linuxBatchingConn_coalesceMessages(t *testing.T) { if msgs[i].Addr != addr { t.Errorf("msgs[%d].Addr != passed addr", i) } - gotLen := len(msgs[i].Buffers[0]) - if gotLen != tt.wantLens[i] { - t.Errorf("len(msgs[%d].Buffers[0]) %d != %d", i, gotLen, tt.wantLens[i]) + if len(msgs[i].Buffers) != len(tt.wantLens[i]) { + t.Fatalf("len(msgs[%d].Buffers) %d != %d", i, len(msgs[i].Buffers), len(tt.wantLens[i])) + } + for j := range tt.wantLens[i] { + gotLen := len(msgs[i].Buffers[j]) + if gotLen != tt.wantLens[i][j] { + t.Errorf("len(msgs[%d].Buffers[%d]) %d != %d", i, j, gotLen, tt.wantLens[i][j]) + } } + // coalesceMessages calls setGSOSizeInControl, which uses a cmsg // type of UDP_SEGMENT, and getGSOSizeInControl scans for a cmsg // type of UDP_GRO. Therefore, we have to use the lower-level