derp/derpserver: increase minimum token bucket size
And cap WaitN calls to prevent token bucket errors. Frame length is inclusive of DERP key for FrameSendPacket frames. Updates tailscale/corp#40171 Signed-off-by: Jordan Whited <jordan@tailscale.com>
This commit is contained in:
committed by
Jordan Whited
parent
ab74ea0a67
commit
75819aeed0
@@ -514,6 +514,15 @@ func (s *Server) SetTCPWriteTimeout(d time.Duration) {
|
|||||||
s.tcpWriteTimeout = d
|
s.tcpWriteTimeout = d
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// minRateLimitTokenBucketSize represents the minimum size of a token bucket
|
||||||
|
// applied for the purposes of rate limiting a DERP connection per received DERP
|
||||||
|
// frame.
|
||||||
|
//
|
||||||
|
// Note: The DERP protocol supports frames larger than this ([math.MaxUint32] length),
|
||||||
|
// but a [derp.FrameSendPacket] cannot exceed this value, which is what we optimize
|
||||||
|
// our token bucket calls for.
|
||||||
|
const minRateLimitTokenBucketSize = derp.MaxPacketSize + derp.KeyLen
|
||||||
|
|
||||||
// RateConfig is a JSON-serializable configuration for per-client rate limits.
|
// RateConfig is a JSON-serializable configuration for per-client rate limits.
|
||||||
// Values are in bytes.
|
// Values are in bytes.
|
||||||
type RateConfig struct {
|
type RateConfig struct {
|
||||||
@@ -521,8 +530,8 @@ type RateConfig struct {
|
|||||||
// rate limit in bytes per second. A zero value disables rate-limiting.
|
// rate limit in bytes per second. A zero value disables rate-limiting.
|
||||||
PerClientRateLimitBytesPerSec uint64 `json:",omitzero"`
|
PerClientRateLimitBytesPerSec uint64 `json:",omitzero"`
|
||||||
// PerClientRateBurstBytes represents the per-client token bucket depth,
|
// PerClientRateBurstBytes represents the per-client token bucket depth,
|
||||||
// or burst, in bytes. Any value lower than [derp.MaxPacketSize]
|
// or burst, in bytes. Any value lower than [minRateLimitTokenBucketSize]
|
||||||
// will be increased to [derp.MaxPacketSize] before application.
|
// will be increased to [minRateLimitTokenBucketSize] before application.
|
||||||
PerClientRateBurstBytes uint64 `json:",omitzero"`
|
PerClientRateBurstBytes uint64 `json:",omitzero"`
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -556,15 +565,13 @@ func (s *Server) LoadAndApplyRateConfig(path string) error {
|
|||||||
|
|
||||||
// UpdatePerClientRateLimit sets the per-client receive rate limit in bytes per
|
// UpdatePerClientRateLimit sets the per-client receive rate limit in bytes per
|
||||||
// second and the burst size in bytes, updating all existing client connections.
|
// second and the burst size in bytes, updating all existing client connections.
|
||||||
// The burst is at least [derp.MaxPacketSize], ensuring at least a full packet
|
// The applied burst will be at least [minRateLimitTokenBucketSize]. If bytesPerSec is
|
||||||
// can be received in a burst even if the rate limit is low. If bytesPerSec is
|
// 0, rate limiting is disabled. Mesh peers are always exempt from rate limiting.
|
||||||
// 0, rate limiting is set to infinity. Mesh peers are always exempt from rate
|
|
||||||
// limiting.
|
|
||||||
func (s *Server) UpdatePerClientRateLimit(bytesPerSec, burst uint64) {
|
func (s *Server) UpdatePerClientRateLimit(bytesPerSec, burst uint64) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
s.perClientRecvBytesPerSec = bytesPerSec
|
s.perClientRecvBytesPerSec = bytesPerSec
|
||||||
s.perClientRecvBurst = max(burst, derp.MaxPacketSize)
|
s.perClientRecvBurst = max(burst, minRateLimitTokenBucketSize)
|
||||||
for _, cs := range s.clients {
|
for _, cs := range s.clients {
|
||||||
cs.ForeachClient(func(c *sclient) {
|
cs.ForeachClient(func(c *sclient) {
|
||||||
c.setRateLimit(s.perClientRecvBytesPerSec, s.perClientRecvBurst)
|
c.setRateLimit(s.perClientRecvBytesPerSec, s.perClientRecvBurst)
|
||||||
@@ -1093,10 +1100,9 @@ func (c *sclient) run(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
return fmt.Errorf("client %s: readFrameHeader: %w", c.key.ShortString(), err)
|
return fmt.Errorf("client %s: readFrameHeader: %w", c.key.ShortString(), err)
|
||||||
}
|
}
|
||||||
// Rate limit by DERP frame length (fl), which excludes DERP
|
// Rate limit by DERP frame length (fl), which excludes TLS protocol and
|
||||||
// and TLS protocol overheads.
|
// DERP frame length field overheads.
|
||||||
// Note: meshed clients are exempt from rate limits.
|
// Note: meshed clients are exempt from rate limits.
|
||||||
// meshed clients are exempt from rate limits
|
|
||||||
if err := c.rateLimit(int(fl)); err != nil {
|
if err := c.rateLimit(int(fl)); err != nil {
|
||||||
return err // context canceled, connection closing
|
return err // context canceled, connection closing
|
||||||
}
|
}
|
||||||
@@ -1339,7 +1345,17 @@ func (c *sclient) setRateLimit(bytesPerSec uint64, burst uint64) {
|
|||||||
// and this is a no-op.
|
// and this is a no-op.
|
||||||
func (c *sclient) rateLimit(n int) error {
|
func (c *sclient) rateLimit(n int) error {
|
||||||
if lim := c.recvLim.Load(); lim != nil {
|
if lim := c.recvLim.Load(); lim != nil {
|
||||||
return lim.WaitN(c.ctx, n)
|
// If n exceeds the capacity of the bucket, then WaitN will return
|
||||||
|
// an error and consume zero tokens. To prevent this, clamp n to
|
||||||
|
// [minRateLimitTokenBucketSize].
|
||||||
|
//
|
||||||
|
// While we could call WaitN multiple times and/or more precisely for
|
||||||
|
// lim.Burst(), it's better to return early as a larger DERP frame is:
|
||||||
|
// 1. unexpected
|
||||||
|
// 2. only partially read off the socket (bufio)
|
||||||
|
// 3. would cause the connection to close shortly after rate limiting, anyway.
|
||||||
|
clampedN := min(n, minRateLimitTokenBucketSize)
|
||||||
|
return lim.WaitN(c.ctx, clampedN)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -959,25 +959,21 @@ func BenchmarkHyperLogLogEstimate(b *testing.B) {
|
|||||||
func TestPerClientRateLimit(t *testing.T) {
|
func TestPerClientRateLimit(t *testing.T) {
|
||||||
t.Run("throttled", func(t *testing.T) {
|
t.Run("throttled", func(t *testing.T) {
|
||||||
synctest.Test(t, func(t *testing.T) {
|
synctest.Test(t, func(t *testing.T) {
|
||||||
// 100 bytes/sec with a burst of 100 bytes.
|
|
||||||
const bytesPerSec = 100
|
|
||||||
const burst = 100
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
t.Cleanup(cancel)
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
c := &sclient{
|
c := &sclient{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
}
|
}
|
||||||
c.recvLim.Store(rate.NewLimiter(rate.Limit(bytesPerSec), burst))
|
c.recvLim.Store(rate.NewLimiter(rate.Limit(minRateLimitTokenBucketSize), minRateLimitTokenBucketSize))
|
||||||
|
|
||||||
// First call within burst should not block.
|
// First call within burst should not block.
|
||||||
c.rateLimit(burst)
|
c.rateLimit(minRateLimitTokenBucketSize)
|
||||||
|
|
||||||
// Next call exceeds burst, should block until tokens replenish.
|
// Next call exceeds burst, should block until tokens replenish.
|
||||||
done := make(chan error, 1)
|
done := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
done <- c.rateLimit(burst)
|
done <- c.rateLimit(minRateLimitTokenBucketSize)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// After settling, the goroutine should be blocked (no result yet).
|
// After settling, the goroutine should be blocked (no result yet).
|
||||||
@@ -988,7 +984,7 @@ func TestPerClientRateLimit(t *testing.T) {
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
// Advance time by 1 second; 100 bytes/sec * 1s = 100 bytes = burst.
|
// Advance time by 1 second
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
synctest.Wait()
|
synctest.Wait()
|
||||||
|
|
||||||
@@ -1010,16 +1006,16 @@ func TestPerClientRateLimit(t *testing.T) {
|
|||||||
c := &sclient{
|
c := &sclient{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
}
|
}
|
||||||
c.recvLim.Store(rate.NewLimiter(rate.Limit(100), 100))
|
c.recvLim.Store(rate.NewLimiter(rate.Limit(minRateLimitTokenBucketSize), minRateLimitTokenBucketSize))
|
||||||
|
|
||||||
// Exhaust burst.
|
// Exhaust burst.
|
||||||
if err := c.rateLimit(100); err != nil {
|
if err := c.rateLimit(minRateLimitTokenBucketSize); err != nil {
|
||||||
t.Fatalf("rateLimit: %v", err)
|
t.Fatalf("rateLimit: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
done := make(chan error, 1)
|
done := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
done <- c.rateLimit(100)
|
done <- c.rateLimit(minRateLimitTokenBucketSize)
|
||||||
}()
|
}()
|
||||||
synctest.Wait()
|
synctest.Wait()
|
||||||
|
|
||||||
@@ -1310,8 +1306,8 @@ func TestLoadAndApplyRateConfig(t *testing.T) {
|
|||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
gotBurst := s.perClientRecvBurst
|
gotBurst := s.perClientRecvBurst
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
if gotBurst != derp.MaxPacketSize {
|
if gotBurst != minRateLimitTokenBucketSize {
|
||||||
t.Errorf("burst = %d; want at least %d", gotBurst, derp.MaxPacketSize)
|
t.Errorf("burst = %d; want at least %d", gotBurst, minRateLimitTokenBucketSize)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user