derp/derpserver: remove global rate limiter
Which can be unfair around varying packet sizes. Updates tailscale/corp#40962 Signed-off-by: Jordan Whited <jordan@tailscale.com>
This commit is contained in:
committed by
Jordan Whited
parent
29122506be
commit
ce76f44df2
@@ -172,7 +172,6 @@ type Server struct {
|
||||
meshUpdateBatchSize *metrics.Histogram
|
||||
meshUpdateLoopCount *metrics.Histogram
|
||||
bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush
|
||||
rateLimitGlobalWaited expvar.Int // number of times global rate limit caused a wait
|
||||
rateLimitPerClientWaited expvar.Int // number of times per-client rate limit caused a wait
|
||||
// TODO(illotum): add metrics for rate limited wait time, consider total seconds vs a histogram.
|
||||
|
||||
@@ -206,8 +205,7 @@ type Server struct {
|
||||
peerGoneWatchers map[key.NodePublic]set.HandleSet[func(key.NodePublic)]
|
||||
// maps from netip.AddrPort to a client's public key
|
||||
keyOfAddr map[netip.AddrPort]key.NodePublic
|
||||
rateConfig RateConfig // server-global and per-client DERP frame rate limiting config
|
||||
recvLim *xrate.Limiter // server-global DERP frame receive limiter
|
||||
rateConfig RateConfig // per-client DERP frame rate limiting config
|
||||
}
|
||||
|
||||
// clientSet represents 1 or more *sclients.
|
||||
@@ -519,25 +517,13 @@ const minRateLimitTokenBucketSize = derp.MaxPacketSize + derp.KeyLen
|
||||
// in bytes.
|
||||
type RateConfig struct {
|
||||
// PerClientRateLimitBytesPerSec represents the per-client
|
||||
// rate limit in bytes per second. A zero value disables per-client rate limiting,
|
||||
// but global (GlobalRate...) configuration may still apply.
|
||||
// rate limit in bytes per second. A zero value disables all rate limiting.
|
||||
PerClientRateLimitBytesPerSec uint64 `json:",omitzero"`
|
||||
// PerClientRateBurstBytes represents the per-client token bucket depth,
|
||||
// or burst, in bytes. Any value lower than [minRateLimitTokenBucketSize]
|
||||
// will be increased to [minRateLimitTokenBucketSize] before application. Only
|
||||
// relevant if PerClientRateLimitBytesPerSec is nonzero.
|
||||
PerClientRateBurstBytes uint64 `json:",omitzero"`
|
||||
// GlobalRateLimitBytesPerSec represents the global rate limit in bytes per
|
||||
// second. A zero value disables global rate limiting, but per-client (PerClient...)
|
||||
// configuration may still apply. If GlobalRateLimitBytesPerSec is nonzero and less than
|
||||
// PerClientRateLimitBytesPerSec, then GlobalRateLimitBytesPerSec will be set
|
||||
// equal to PerClientRateLimitBytesPerSec before application.
|
||||
GlobalRateLimitBytesPerSec uint64 `json:",omitzero"`
|
||||
// GlobalRateBurstBytes represents the global token bucket depth, or burst,
|
||||
// in bytes. Any value lower than [minRateLimitTokenBucketSize] will be increased to
|
||||
// [minRateLimitTokenBucketSize] before application. Only relevant if
|
||||
// GlobalRateLimitBytesPerSec is nonzero.
|
||||
GlobalRateBurstBytes uint64 `json:",omitzero"`
|
||||
}
|
||||
|
||||
// LoadRateConfig reads and JSON-unmarshals a [RateConfig] from the file at path.
|
||||
@@ -564,36 +550,28 @@ func (s *Server) LoadAndApplyRateConfig(path string) error {
|
||||
return err
|
||||
}
|
||||
applied := s.UpdateRateLimits(rc)
|
||||
s.logf("rate config applied: global-rate=%d bytes/sec global-burst=%d bytes client-rate=%d bytes/sec, client-burst=%d bytes",
|
||||
applied.GlobalRateLimitBytesPerSec, applied.GlobalRateBurstBytes, applied.PerClientRateLimitBytesPerSec, applied.PerClientRateBurstBytes)
|
||||
s.logf("rate config applied: client-rate=%d bytes/sec, client-burst=%d bytes",
|
||||
applied.PerClientRateLimitBytesPerSec, applied.PerClientRateBurstBytes)
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateRateLimits sets the receive rate limits, updating all existing client
|
||||
// connections. It returns the applied config, which may differ from rc. If both
|
||||
// the per-client and global rate limits are 0, rate limiting is disabled. Mesh
|
||||
// peers are always exempt from rate limiting.
|
||||
// connections. It returns the applied config, which may differ from rc. If the
|
||||
// per-client rate limits is 0, rate limiting is disabled. Mesh peers are always
|
||||
// exempt from rate limiting.
|
||||
func (s *Server) UpdateRateLimits(rc RateConfig) (applied RateConfig) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if rc.PerClientRateLimitBytesPerSec == 0 && rc.GlobalRateLimitBytesPerSec == 0 {
|
||||
// if per-client and global are disabled, all rate limiting is disabled
|
||||
if rc.PerClientRateLimitBytesPerSec == 0 {
|
||||
// all rate limiting is disabled
|
||||
rc = RateConfig{}
|
||||
}
|
||||
if rc.PerClientRateLimitBytesPerSec != 0 {
|
||||
rc.PerClientRateBurstBytes = max(rc.PerClientRateBurstBytes, minRateLimitTokenBucketSize)
|
||||
}
|
||||
if rc.GlobalRateLimitBytesPerSec != 0 {
|
||||
rc.GlobalRateLimitBytesPerSec = max(rc.GlobalRateLimitBytesPerSec, rc.PerClientRateLimitBytesPerSec)
|
||||
rc.GlobalRateBurstBytes = max(rc.GlobalRateBurstBytes, minRateLimitTokenBucketSize)
|
||||
s.recvLim = xrate.NewLimiter(xrate.Limit(rc.GlobalRateLimitBytesPerSec), int(rc.GlobalRateBurstBytes))
|
||||
} else {
|
||||
s.recvLim = nil
|
||||
rc.PerClientRateBurstBytes = max(rc.PerClientRateBurstBytes, minRateLimitTokenBucketSize)
|
||||
}
|
||||
s.rateConfig = rc
|
||||
for _, cs := range s.clients {
|
||||
cs.ForeachClient(func(c *sclient) {
|
||||
c.setRateLimit(rc.PerClientRateLimitBytesPerSec, rc.PerClientRateBurstBytes, s.recvLim)
|
||||
c.setRateLimit(rc.PerClientRateLimitBytesPerSec, rc.PerClientRateBurstBytes)
|
||||
})
|
||||
}
|
||||
return rc
|
||||
@@ -761,7 +739,7 @@ func (s *Server) registerClient(c *sclient) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
c.setRateLimit(s.rateConfig.PerClientRateLimitBytesPerSec, s.rateConfig.PerClientRateBurstBytes, s.recvLim)
|
||||
c.setRateLimit(s.rateConfig.PerClientRateLimitBytesPerSec, s.rateConfig.PerClientRateBurstBytes)
|
||||
|
||||
cs, ok := s.clients[c.key]
|
||||
if !ok {
|
||||
@@ -1336,22 +1314,15 @@ func (c *sclient) handleFrameSendPacket(_ derp.FrameType, fl uint32) error {
|
||||
return c.sendPkt(dst, p)
|
||||
}
|
||||
|
||||
// setRateLimit updates the receive rate limiter. When bytesPerSec is 0 and parent is nil, or the
|
||||
// setRateLimit updates the receive rate limiter. When bytesPerSec is 0, or the
|
||||
// client is a mesh peer, the limiter is set to nil so that [sclient.rateLimit] is a no-op.
|
||||
func (c *sclient) setRateLimit(bytesPerSec, burst uint64, parent *xrate.Limiter) {
|
||||
if c.canMesh || (bytesPerSec == 0 && parent == nil) {
|
||||
func (c *sclient) setRateLimit(bytesPerSec, burst uint64) {
|
||||
if c.canMesh || bytesPerSec == 0 {
|
||||
c.recvLim.Store(nil)
|
||||
return
|
||||
}
|
||||
var child *xrate.Limiter
|
||||
if bytesPerSec != 0 {
|
||||
child = xrate.NewLimiter(xrate.Limit(bytesPerSec), int(burst))
|
||||
}
|
||||
lim := &parentChildTokenBuckets{
|
||||
parent: parent,
|
||||
child: child,
|
||||
}
|
||||
c.recvLim.Store(lim)
|
||||
limiter := xrate.NewLimiter(xrate.Limit(bytesPerSec), int(burst))
|
||||
c.recvLim.Store(limiter)
|
||||
}
|
||||
|
||||
// rateLimitWait is a reimplementation of [xrate.Limiter.WaitN] via [xrate.Limiter.ReserveN].
|
||||
@@ -1378,9 +1349,6 @@ func rateLimitWait(ctx context.Context, lim *xrate.Limiter, n int, now time.Time
|
||||
}
|
||||
|
||||
// rateLimit applies the receive rate limit.
|
||||
// Per-client rate limiting is applied before global.
|
||||
// The former lets us differentiate classes of service,
|
||||
// the latter sets the overall pace of reading.
|
||||
// By limiting here we prevent reading from the buffered reader
|
||||
// [sclient.br] if the limit has been exceeded. Any reads done here provide space
|
||||
// within the buffered reader to fill back in with data from
|
||||
@@ -1409,26 +1377,12 @@ func (c *sclient) rateLimit(n int) error {
|
||||
durationWaited time.Duration
|
||||
err error
|
||||
)
|
||||
if lim.child != nil {
|
||||
durationWaited, err = rateLimitWait(c.ctx, lim.child, clampedN, now, newTimer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if durationWaited > 0 {
|
||||
c.s.rateLimitPerClientWaited.Add(1)
|
||||
}
|
||||
durationWaited, err = rateLimitWait(c.ctx, lim, clampedN, now, newTimer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if lim.parent != nil {
|
||||
if durationWaited > 0 {
|
||||
now = c.s.clock.Now() // update 'now' if we already waited
|
||||
}
|
||||
durationWaited, err = rateLimitWait(c.ctx, lim.parent, clampedN, now, newTimer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if durationWaited > 0 {
|
||||
c.s.rateLimitGlobalWaited.Add(1)
|
||||
}
|
||||
if durationWaited > 0 {
|
||||
c.s.rateLimitPerClientWaited.Add(1)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@@ -1867,21 +1821,14 @@ type sclient struct {
|
||||
peerGoneLim *rate.Limiter
|
||||
|
||||
// recvLim is the receive rate limiter. When rate limiting is enabled for a
|
||||
// non-mesh client, it points to a [parentChildTokenBuckets]. When rate limiting
|
||||
// non-mesh client, it points to a [xrate.Limiter]. When rate limiting
|
||||
// is disabled or the client is a mesh peer, it is nil and [sclient.rateLimit]
|
||||
// is a no-op. Updated atomically by [sclient.setRateLimit] so that
|
||||
// [sclient.rateLimit] can load it without holding [Server.mu].
|
||||
recvLim atomic.Pointer[parentChildTokenBuckets]
|
||||
}
|
||||
|
||||
// parentChildTokenBuckets contains a parent and child token bucket for the
|
||||
// purpose of applying in a hierarchical topology.
|
||||
//
|
||||
// TODO: consider porting the required APIs from [xrate.Limiter] to [rate.Limiter],
|
||||
// which is already optimized to use [mono.Time].
|
||||
type parentChildTokenBuckets struct {
|
||||
parent *xrate.Limiter // parent may be nil
|
||||
child *xrate.Limiter // child may be nil
|
||||
//
|
||||
// TODO: consider porting the required APIs from [xrate.Limiter] to [rate.Limiter],
|
||||
// which is already optimized to use [mono.Time].
|
||||
recvLim atomic.Pointer[xrate.Limiter]
|
||||
}
|
||||
|
||||
func (c *sclient) presentFlags() derp.PeerPresentFlags {
|
||||
@@ -2475,14 +2422,7 @@ func (s *Server) ExpVar(rateLimitEnabled bool) expvar.Var {
|
||||
m.Set("rate_limit_per_client_burst_bytes", s.expVarFunc(func() any {
|
||||
return s.rateConfig.PerClientRateBurstBytes
|
||||
}))
|
||||
m.Set("rate_limit_global_bytes_per_second", s.expVarFunc(func() any {
|
||||
return s.rateConfig.GlobalRateLimitBytesPerSec
|
||||
}))
|
||||
m.Set("rate_limit_global_burst_bytes", s.expVarFunc(func() any {
|
||||
return s.rateConfig.GlobalRateBurstBytes
|
||||
}))
|
||||
m.Set("rate_limit_per_client_waited", &s.rateLimitPerClientWaited)
|
||||
m.Set("rate_limit_global_waited", &s.rateLimitGlobalWaited)
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user