@ -52,14 +52,12 @@ var (
// linuxBatchingConn is a UDP socket that provides batched i/o. It implements
// [Conn].
type linuxBatchingConn struct {
pc * net . UDPConn
xpc xnetBatchReaderWriter
rxOffload bool // supports UDP GRO or similar
txOffload atomic . Bool // supports UDP GSO or similar
setGSOSizeInControl func ( control * [ ] byte , gsoSize uint16 ) // typically setGSOSizeInControl(); swappable for testing
getGSOSizeFromControl func ( control [ ] byte ) ( int , error ) // typically getGSOSizeFromControl(); swappable for testing
sendBatchPool sync . Pool
rxqOverflowsMetric * clientmetric . Metric
pc * net . UDPConn
xpc xnetBatchReaderWriter
rxOffload bool // supports UDP GRO or similar
txOffload atomic . Bool // supports UDP GSO or similar
sendBatchPool sync . Pool
rxqOverflowsMetric * clientmetric . Metric
// readOpMu guards read operations that must perform accounting against
// rxqOverflows in single-threaded fashion. There are no concurrent usages
@ -134,7 +132,7 @@ func (c *linuxBatchingConn) coalesceMessages(addr *net.UDPAddr, geneve packet.Ge
msgs [ base ] . Buffers [ 0 ] = append ( msgs [ base ] . Buffers [ 0 ] , make ( [ ] byte , msgLen ) ... )
copy ( msgs [ base ] . Buffers [ 0 ] [ baseLenBefore : ] , buff )
if i == len ( buffs ) - 1 {
c . setGSOSizeInControl ( & msgs [ base ] . OOB , uint16 ( gsoSize ) )
setGSOSizeInControl ( & msgs [ base ] . OOB , uint16 ( gsoSize ) )
}
dgramCnt ++
if msgLen < gsoSize {
@ -146,7 +144,7 @@ func (c *linuxBatchingConn) coalesceMessages(addr *net.UDPAddr, geneve packet.Ge
}
}
if dgramCnt > 1 {
c . setGSOSizeInControl ( & msgs [ base ] . OOB , uint16 ( gsoSize ) )
setGSOSizeInControl ( & msgs [ base ] . OOB , uint16 ( gsoSize ) )
}
// Reset prior to incrementing base since we are preparing to start a
// new potential batch.
@ -262,7 +260,7 @@ func (c *linuxBatchingConn) splitCoalescedMessages(msgs []ipv6.Message, firstMsg
end = msg . N
numToSplit = 1
)
gsoSize , err = c . getGSOSizeFromControl ( msg . OOB [ : msg . NN ] )
gsoSize , err = getGSOSizeFromControl ( msg . OOB [ : msg . NN ] )
if err != nil {
return n , err
}
@ -426,9 +424,10 @@ func tryEnableUDPOffload(pconn nettype.PacketConn) (hasTX bool, hasRX bool) {
return hasTX , hasRX
}
// getGSOSizeFromControl returns the GSO size found in control. If no GSO size
// is found or the len(control) < unix.SizeofCmsghdr, this function returns 0.
// A non-nil error will be returned if control is malformed.
// getGSOSizeFromControl returns the GSO size found in control associated with a
// cmsg type of UDP_GRO, which the kernel populates in the read direction. If no
// GSO size is found or the len(control) < unix.SizeofCmsghdr, this function
// returns 0. A non-nil error will be returned if control is malformed.
func getGSOSizeFromControl ( control [ ] byte ) ( int , error ) {
data , err := getDataFromControl ( control , unix . SOL_UDP , unix . UDP_GRO , 2 )
if err != nil {
@ -441,7 +440,9 @@ func getGSOSizeFromControl(control []byte) (int, error) {
}
// setGSOSizeInControl sets a socket control message in control containing
// gsoSize. If len(control) < controlMessageSize control's len will be set to 0.
// gsoSize with an associated cmsg type of UDP_SEGMENT, which we are responsible
// for populating prior to writing towards the kernel. If len(control) < controlMessageSize
// control's len will be set to 0.
func setGSOSizeInControl ( control * [ ] byte , gsoSize uint16 ) {
* control = ( * control ) [ : 0 ]
if cap ( * control ) < int ( unsafe . Sizeof ( unix . Cmsghdr { } ) ) {
@ -513,9 +514,7 @@ func TryUpgradeToConn(pconn nettype.PacketConn, network string, batchSize int, r
return pconn
}
b := & linuxBatchingConn {
pc : uc ,
getGSOSizeFromControl : getGSOSizeFromControl ,
setGSOSizeInControl : setGSOSizeInControl ,
pc : uc ,
sendBatchPool : sync . Pool {
New : func ( ) any {
ua := & net . UDPAddr {