logtail: optimize JSON processing (#11671)

Changes made:

* Avoid "encoding/json" for JSON processing, and instead use
"github.com/go-json-experiment/json/jsontext".
Use jsontext.Value.IsValid for validation, which is much faster.
Use jsontext.AppendQuote instead of our own JSON escaping.

* In drainPending, use a different maxLen depending on lowMem.
In lowMem mode, it is better to perform multiple uploads
than it is to construct a large body that OOMs the process.

* In drainPending, if an error is encountered draining,
construct an error message in the logtail JSON format
rather than something that is invalid JSON.

* In appendTextOrJSONLocked, use jsontext.Decoder to check
whether the input is a valid JSON object. This is faster than
the previous approach of unmarshaling into map[string]any and
then re-marshaling that data structure.
This is especially beneficial for network flow logging,
which produces relatively large JSON objects.

* In appendTextOrJSONLocked, enforce maxSize on the input.
If too large, then we may end up in a situation where the logs
can never be uploaded because it exceeds the maximum body size
that the Tailscale logs service accepts.

* Use "tailscale.com/util/truncate" to properly truncate a string
on valid UTF-8 boundaries.

* In general, remove unnecessary spaces in JSON output.

Performance:

    name       old time/op    new time/op    delta
    WriteText     776ns ± 2%     596ns ± 1%   -23.24%  (p=0.000 n=10+10)
    WriteJSON     110µs ± 0%       9µs ± 0%   -91.77%  (p=0.000 n=8+8)

    name       old alloc/op   new alloc/op   delta
    WriteText      448B ± 0%        0B       -100.00%  (p=0.000 n=10+10)
    WriteJSON    37.9kB ± 0%     0.0kB ± 0%   -99.87%  (p=0.000 n=10+10)

    name       old allocs/op  new allocs/op  delta
    WriteText      1.00 ± 0%      0.00       -100.00%  (p=0.000 n=10+10)
    WriteJSON     1.08k ± 0%     0.00k ± 0%   -99.91%  (p=0.000 n=10+10)

For text payloads, this is 1.30x faster.
For JSON payloads, this is 12.2x faster.

Updates #cleanup
Updates tailscale/corp#18514

Signed-off-by: Joe Tsai <joetsai@digital-static.net>
This commit is contained in:
Joe Tsai
2024-04-12 12:05:36 -07:00
committed by GitHub
parent 4d5d669cd5
commit 7a77a2edf1
3 changed files with 431 additions and 281 deletions
+256 -180
View File
@@ -9,7 +9,6 @@ import (
"context"
"crypto/rand"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"log"
@@ -19,11 +18,13 @@ import (
"os"
"regexp"
"runtime"
"slices"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/go-json-experiment/json/jsontext"
"tailscale.com/envknob"
"tailscale.com/net/netmon"
"tailscale.com/net/sockstats"
@@ -32,9 +33,26 @@ import (
tslogger "tailscale.com/types/logger"
"tailscale.com/types/logid"
"tailscale.com/util/set"
"tailscale.com/util/truncate"
"tailscale.com/util/zstdframe"
)
// maxSize is the maximum size that a single log entry can be.
// It is also the maximum body size that may be uploaded at a time.
const maxSize = 256 << 10
// maxTextSize is the maximum size for a text log message.
// Note that JSON log messages can be as large as maxSize.
const maxTextSize = 16 << 10
// lowMemRatio reduces maxSize and maxTextSize by this ratio in lowMem mode.
const lowMemRatio = 4
// bufferSize is the typical buffer size to retain.
// It is large enough to handle most log messages,
// but not too large to be a notable waste of memory if retained forever.
const bufferSize = 4 << 10
// DefaultHost is the default host name to upload logs to when
// Config.BaseURL isn't provided.
const DefaultHost = "log.tailscale.io"
@@ -175,7 +193,7 @@ type Logger struct {
netMonitor *netmon.Monitor
buffer Buffer
drainWake chan struct{} // signal to speed up drain
drainBuf bytes.Buffer // owned by drainPending for reuse
drainBuf []byte // owned by drainPending for reuse
flushDelayFn func() time.Duration // negative or zero return value to upload aggressively, or >0 to batch at this delay
flushPending atomic.Bool
sentinel chan int32
@@ -194,6 +212,8 @@ type Logger struct {
writeLock sync.Mutex // guards procSequence, flushTimer, buffer.Write calls
procSequence uint64
flushTimer tstime.TimerController // used when flushDelay is >0
writeBuf [bufferSize]byte // owned by Write for reuse
jsonDec jsontext.Decoder // owned by appendTextOrJSONLocked for reuse
shutdownStartMu sync.Mutex // guards the closing of shutdownStart
shutdownStart chan struct{} // closed when shutdown begins
@@ -290,42 +310,66 @@ func (l *Logger) drainBlock() (shuttingDown bool) {
// drainPending drains and encodes a batch of logs from the buffer for upload.
// If no logs are available, drainPending blocks until logs are available.
func (l *Logger) drainPending() (res []byte) {
buf := &l.drainBuf
buf.Reset()
buf.WriteByte('[')
entries := 0
// The returned buffer is only valid until the next call to drainPending.
func (l *Logger) drainPending() (b []byte) {
b = l.drainBuf[:0]
b = append(b, '[')
defer func() {
b = bytes.TrimRight(b, ",")
b = append(b, ']')
l.drainBuf = b
if len(b) <= len("[]") {
b = nil
}
}()
var batchDone bool
const maxLen = 256 << 10
for buf.Len() < maxLen && !batchDone {
b, err := l.buffer.TryReadLine()
if err == io.EOF {
break
} else if err != nil {
b = fmt.Appendf(nil, "reading ringbuffer: %v", err)
batchDone = true
} else if b == nil {
if entries > 0 {
break
maxLen := maxSize
if l.lowMem {
// When operating in a low memory environment, it is better to upload
// in multiple operations than it is to allocate a large body and OOM.
// Even if maxLen is less than maxSize, we can still upload an entry
// that is up to maxSize if we happen to encounter one.
maxLen /= lowMemRatio
}
for len(b) < maxLen {
line, err := l.buffer.TryReadLine()
switch {
case err == io.EOF:
return b
case err != nil:
b = append(b, '{')
b = l.appendMetadata(b, false, true, 0, 0, "reading ringbuffer: "+err.Error(), nil, 0)
b = bytes.TrimRight(b, ",")
b = append(b, '}')
return b
case line == nil:
// If we read at least some log entries, return immediately.
if len(b) > len("[") {
return b
}
// We're about to block. If we're holding on to too much memory
// in our buffer from a previous large write, let it go.
if buf.Available() > 4<<10 {
cur := buf.Bytes()
l.drainBuf = bytes.Buffer{}
buf.Write(cur)
if cap(b) > bufferSize {
b = bytes.Clone(b)
l.drainBuf = b
}
batchDone = l.drainBlock()
if shuttingDown := l.drainBlock(); shuttingDown {
return b
}
continue
}
if len(b) == 0 {
switch {
case len(line) == 0:
continue
}
if b[0] != '{' || !json.Valid(b) {
case line[0] == '{' && jsontext.Value(line).IsValid():
// This is already a valid JSON object, so just append it.
// This may exceed maxLen, but should be no larger than maxSize
// so long as logic writing into the buffer enforces the limit.
b = append(b, line...)
default:
// This is probably a log added to stderr by filch
// outside of the logtail logger. Encode it.
if !l.explainedRaw {
@@ -336,24 +380,14 @@ func (l *Logger) drainPending() (res []byte) {
l.explainedRaw = true
}
fmt.Fprintf(l.stderr, "RAW-STDERR: %s", b)
// Do not add a client time, as it could have been
// been written a long time ago. Don't include instance key or ID
// either, since this came from a different instance.
b = l.encodeText(b, true, 0, 0, 0)
// Do not add a client time, as it could be really old.
// Do not include instance key or ID either,
// since this came from a different instance.
b = l.appendText(b, line, true, 0, 0, 0)
}
if entries > 0 {
buf.WriteByte(',')
}
buf.Write(b)
entries++
b = append(b, ',')
}
buf.WriteByte(']')
if buf.Len() <= len("[]") {
return nil
}
return buf.Bytes()
return b
}
// This is the goroutine that repeatedly uploads logs in the background.
@@ -573,169 +607,211 @@ func (l *Logger) sendLocked(jsonBlob []byte) (int, error) {
return n, err
}
// TODO: instead of allocating, this should probably just append
// directly into the output log buffer.
func (l *Logger) encodeText(buf []byte, skipClientTime bool, procID uint32, procSequence uint64, level int) []byte {
now := l.clock.Now()
// Factor in JSON encoding overhead to try to only do one alloc
// in the make below (so appends don't resize the buffer).
overhead := len(`{"text": ""}\n`)
includeLogtail := !skipClientTime || procID != 0 || procSequence != 0
if includeLogtail {
overhead += len(`"logtail": {},`)
}
if !skipClientTime {
overhead += len(`"client_time": "2006-01-02T15:04:05.999999999Z07:00",`)
}
if procID != 0 {
overhead += len(`"proc_id": 4294967296,`)
}
if procSequence != 0 {
overhead += len(`"proc_seq": 9007199254740992,`)
}
// TODO: do a pass over buf and count how many backslashes will be needed?
// For now just factor in a dozen.
overhead += 12
// Put a sanity cap on buf's size.
max := 16 << 10
if l.lowMem {
max = 4 << 10
}
var nTruncated int
if len(buf) > max {
nTruncated = len(buf) - max
// TODO: this can break a UTF-8 character
// mid-encoding. We don't tend to log
// non-ASCII stuff ourselves, but e.g. client
// names might be.
buf = buf[:max]
}
b := make([]byte, 0, len(buf)+overhead)
b = append(b, '{')
if includeLogtail {
b = append(b, `"logtail": {`...)
// appendMetadata appends optional "logtail", "metrics", and "v" JSON members.
// This assumes dst is already within a JSON object.
// Each member is comma-terminated.
func (l *Logger) appendMetadata(dst []byte, skipClientTime, skipMetrics bool, procID uint32, procSequence uint64, errDetail string, errData jsontext.Value, level int) []byte {
// Append optional logtail metadata.
if !skipClientTime || procID != 0 || procSequence != 0 || errDetail != "" || errData != nil {
dst = append(dst, `"logtail":{`...)
if !skipClientTime {
b = append(b, `"client_time": "`...)
b = now.UTC().AppendFormat(b, time.RFC3339Nano)
b = append(b, `",`...)
dst = append(dst, `"client_time":"`...)
dst = l.clock.Now().UTC().AppendFormat(dst, time.RFC3339Nano)
dst = append(dst, '"', ',')
}
if procID != 0 {
b = append(b, `"proc_id": `...)
b = strconv.AppendUint(b, uint64(procID), 10)
b = append(b, ',')
dst = append(dst, `"proc_id":`...)
dst = strconv.AppendUint(dst, uint64(procID), 10)
dst = append(dst, ',')
}
if procSequence != 0 {
b = append(b, `"proc_seq": `...)
b = strconv.AppendUint(b, procSequence, 10)
b = append(b, ',')
dst = append(dst, `"proc_seq":`...)
dst = strconv.AppendUint(dst, procSequence, 10)
dst = append(dst, ',')
}
b = bytes.TrimRight(b, ",")
b = append(b, "}, "...)
if errDetail != "" || errData != nil {
dst = append(dst, `"error":{`...)
if errDetail != "" {
dst = append(dst, `"detail":`...)
dst, _ = jsontext.AppendQuote(dst, errDetail)
dst = append(dst, ',')
}
if errData != nil {
dst = append(dst, `"bad_data":`...)
dst = append(dst, errData...)
dst = append(dst, ',')
}
dst = bytes.TrimRight(dst, ",")
dst = append(dst, '}', ',')
}
dst = bytes.TrimRight(dst, ",")
dst = append(dst, '}', ',')
}
if l.metricsDelta != nil {
// Append optional metrics metadata.
if !skipMetrics && l.metricsDelta != nil {
if d := l.metricsDelta(); d != "" {
b = append(b, `"metrics": "`...)
b = append(b, d...)
b = append(b, `",`...)
dst = append(dst, `"metrics":"`...)
dst = append(dst, d...)
dst = append(dst, '"', ',')
}
}
// Add the log level, if non-zero. Note that we only use log
// levels 1 and 2 currently. It's unlikely we'll ever make it
// past 9.
// Add the optional log level, if non-zero.
// Note that we only use log levels 1 and 2 currently.
// It's unlikely we'll ever make it past 9.
if level > 0 && level < 10 {
b = append(b, `"v":`...)
b = append(b, '0'+byte(level))
b = append(b, ',')
dst = append(dst, `"v":`...)
dst = append(dst, '0'+byte(level))
dst = append(dst, ',')
}
b = append(b, "\"text\": \""...)
for _, c := range buf {
switch c {
case '\b':
b = append(b, '\\', 'b')
case '\f':
b = append(b, '\\', 'f')
case '\n':
b = append(b, '\\', 'n')
case '\r':
b = append(b, '\\', 'r')
case '\t':
b = append(b, '\\', 't')
case '"':
b = append(b, '\\', '"')
case '\\':
b = append(b, '\\', '\\')
default:
// TODO: what about binary gibberish or non UTF-8?
b = append(b, c)
}
}
if nTruncated > 0 {
b = append(b, "…+"...)
b = strconv.AppendInt(b, int64(nTruncated), 10)
}
b = append(b, "\"}\n"...)
return b
return dst
}
func (l *Logger) encodeLocked(buf []byte, level int) []byte {
// appendText appends a raw text message in the Tailscale JSON log entry format.
func (l *Logger) appendText(dst, src []byte, skipClientTime bool, procID uint32, procSequence uint64, level int) []byte {
dst = slices.Grow(dst, len(src))
dst = append(dst, '{')
dst = l.appendMetadata(dst, skipClientTime, false, procID, procSequence, "", nil, level)
if len(src) == 0 {
dst = bytes.TrimRight(dst, ",")
return append(dst, "}\n"...)
}
// Append the text string, which may be truncated.
// Invalid UTF-8 will be mangled with the Unicode replacement character.
max := maxTextSize
if l.lowMem {
max /= lowMemRatio
}
dst = append(dst, `"text":`...)
dst = appendTruncatedString(dst, src, max)
return append(dst, "}\n"...)
}
// appendTruncatedString appends a JSON string for src,
// truncating the src to be no larger than n.
func appendTruncatedString(dst, src []byte, n int) []byte {
srcLen := len(src)
src = truncate.String(src, n)
dst, _ = jsontext.AppendQuote(dst, src) // ignore error; only occurs for invalid UTF-8
if srcLen > len(src) {
dst = dst[:len(dst)-len(`"`)] // trim off preceding double-quote
dst = append(dst, "…+"...)
dst = strconv.AppendInt(dst, int64(srcLen-len(src)), 10)
dst = append(dst, '"') // re-append succeeding double-quote
}
return dst
}
func (l *Logger) AppendTextOrJSONLocked(dst, src []byte) []byte {
l.clock = tstime.StdClock{}
return l.appendTextOrJSONLocked(dst, src, 0)
}
// appendTextOrJSONLocked appends a raw text message or a raw JSON object
// in the Tailscale JSON log format.
func (l *Logger) appendTextOrJSONLocked(dst, src []byte, level int) []byte {
if l.includeProcSequence {
l.procSequence++
}
if buf[0] != '{' {
return l.encodeText(buf, l.skipClientTime, l.procID, l.procSequence, level) // text fast-path
if len(src) == 0 || src[0] != '{' {
return l.appendText(dst, src, l.skipClientTime, l.procID, l.procSequence, level)
}
now := l.clock.Now()
obj := make(map[string]any)
if err := json.Unmarshal(buf, &obj); err != nil {
for k := range obj {
delete(obj, k)
// Check whether the input is a valid JSON object and
// whether it contains the reserved "logtail" name at the top-level.
var logtailKeyOffset, logtailValOffset, logtailValLength int
validJSON := func() bool {
// TODO(dsnet): Avoid allocation of bytes.Buffer struct.
dec := &l.jsonDec
dec.Reset(bytes.NewBuffer(src))
if tok, err := dec.ReadToken(); tok.Kind() != '{' || err != nil {
return false
}
obj["text"] = string(buf)
}
if txt, isStr := obj["text"].(string); l.lowMem && isStr && len(txt) > 254 {
// TODO(crawshaw): trim to unicode code point
obj["text"] = txt[:254] + "…"
for dec.PeekKind() != '}' {
keyOffset := dec.InputOffset()
tok, err := dec.ReadToken()
if err != nil {
return false
}
isLogtail := tok.String() == "logtail"
valOffset := dec.InputOffset()
if dec.SkipValue() != nil {
return false
}
if isLogtail {
logtailKeyOffset = int(keyOffset)
logtailValOffset = int(valOffset)
logtailValLength = int(dec.InputOffset()) - logtailValOffset
}
}
if tok, err := dec.ReadToken(); tok.Kind() != '}' || err != nil {
return false
}
if _, err := dec.ReadToken(); err != io.EOF {
return false // trailing junk after JSON object
}
return true
}()
// Treat invalid JSON as a raw text message.
if !validJSON {
return l.appendText(dst, src, l.skipClientTime, l.procID, l.procSequence, level)
}
hasLogtail := obj["logtail"] != nil
if hasLogtail {
obj["error_has_logtail"] = obj["logtail"]
obj["logtail"] = nil
}
if !l.skipClientTime || l.procID != 0 || l.procSequence != 0 {
logtail := map[string]any{}
if !l.skipClientTime {
logtail["client_time"] = now.UTC().Format(time.RFC3339Nano)
}
if l.procID != 0 {
logtail["proc_id"] = l.procID
}
if l.procSequence != 0 {
logtail["proc_seq"] = l.procSequence
}
obj["logtail"] = logtail
}
if level > 0 {
obj["v"] = level
// Check whether the JSON payload is too large.
// Due to logtail metadata, the formatted log entry could exceed maxSize.
// That's okay as the Tailscale log service limit is actually 2*maxSize.
// However, so long as logging applications aim to target the maxSize limit,
// there should be no trouble eventually uploading logs.
if len(src) > maxSize {
errDetail := fmt.Sprintf("entry too large: %d bytes", len(src))
errData := appendTruncatedString(nil, src, maxSize/len(`\uffff`)) // escaping could increase size
dst = append(dst, '{')
dst = l.appendMetadata(dst, l.skipClientTime, true, l.procID, l.procSequence, errDetail, errData, level)
dst = bytes.TrimRight(dst, ",")
return append(dst, "}\n"...)
}
b, err := json.Marshal(obj)
if err != nil {
fmt.Fprintf(l.stderr, "logtail: re-encoding JSON failed: %v\n", err)
// I know of no conditions under which this could fail.
// Report it very loudly.
panic("logtail: re-encoding JSON failed: " + err.Error())
// Check whether the reserved logtail member occurs in the log data.
// If so, it is moved to the the logtail/error member.
const jsonSeperators = ",:" // per RFC 8259, section 2
const jsonWhitespace = " \n\r\t" // per RFC 8259, section 2
var errDetail string
var errData jsontext.Value
if logtailValLength > 0 {
errDetail = "duplicate logtail member"
errData = bytes.Trim(src[logtailValOffset:][:logtailValLength], jsonSeperators+jsonWhitespace)
}
b = append(b, '\n')
return b
dst = slices.Grow(dst, len(src))
dst = append(dst, '{')
dst = l.appendMetadata(dst, l.skipClientTime, true, l.procID, l.procSequence, errDetail, errData, level)
if logtailValLength > 0 {
// Exclude original logtail member from the message.
dst = appendWithoutNewline(dst, src[len("{"):logtailKeyOffset])
dst = bytes.TrimRight(dst, jsonSeperators+jsonWhitespace)
dst = appendWithoutNewline(dst, src[logtailValOffset+logtailValLength:])
} else {
dst = appendWithoutNewline(dst, src[len("{"):])
}
dst = bytes.TrimRight(dst, jsonWhitespace)
dst = dst[:len(dst)-len("}")]
dst = bytes.TrimRight(dst, jsonSeperators+jsonWhitespace)
return append(dst, "}\n"...)
}
// appendWithoutNewline appends src to dst except that it ignores newlines
// since newlines are used to frame individual log entries.
func appendWithoutNewline(dst, src []byte) []byte {
for _, c := range src {
if c != '\n' {
dst = append(dst, c)
}
}
return dst
}
// Logf logs to l using the provided fmt-style format and optional arguments.
@@ -776,7 +852,7 @@ func (l *Logger) Write(buf []byte) (int, error) {
l.writeLock.Lock()
defer l.writeLock.Unlock()
b := l.encodeLocked(buf, level)
b := l.appendTextOrJSONLocked(l.writeBuf[:0], buf, level)
_, err := l.sendLocked(b)
return inLen, err
}