cmd/stunstamp: implement service to measure DERP STUN RTT (#12241)
stunstamp timestamping includes userspace and SO_TIMESTAMPING kernel timestamping where available. Measurements are written locally to a sqlite DB, exposed over an HTTP API, and written to prometheus via remote-write protocol. Updates tailscale/corp#20344 Signed-off-by: Jordan Whited <jordan@tailscale.com>main
parent
1fad06429e
commit
d21c00205d
@ -0,0 +1,141 @@ |
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package main |
||||
|
||||
import ( |
||||
"compress/gzip" |
||||
"encoding/json" |
||||
"errors" |
||||
"net/http" |
||||
"net/url" |
||||
"strconv" |
||||
"strings" |
||||
"time" |
||||
|
||||
sq "github.com/Masterminds/squirrel" |
||||
) |
||||
|
||||
type api struct { |
||||
db *db |
||||
mux *http.ServeMux |
||||
} |
||||
|
||||
func newAPI(db *db) *api { |
||||
a := &api{ |
||||
db: db, |
||||
} |
||||
mux := http.NewServeMux() |
||||
mux.HandleFunc("/query", a.query) |
||||
a.mux = mux |
||||
return a |
||||
} |
||||
|
||||
type apiResult struct { |
||||
At int `json:"at"` // time.Time.Unix()
|
||||
RegionID int `json:"regionID"` |
||||
Hostname string `json:"hostname"` |
||||
Af int `json:"af"` // 4 or 6
|
||||
Addr string `json:"addr"` |
||||
Source int `json:"source"` // timestampSourceUserspace (0) or timestampSourceKernel (1)
|
||||
StableConn bool `json:"stableConn"` |
||||
RttNS *int `json:"rttNS"` |
||||
} |
||||
|
||||
func getTimeBounds(vals url.Values) (from time.Time, to time.Time, err error) { |
||||
lastForm, ok := vals["last"] |
||||
if ok && len(lastForm) > 0 { |
||||
dur, err := time.ParseDuration(lastForm[0]) |
||||
if err != nil { |
||||
return time.Time{}, time.Time{}, err |
||||
} |
||||
now := time.Now() |
||||
return now.Add(-dur), now, nil |
||||
} |
||||
|
||||
fromForm, ok := vals["from"] |
||||
if ok && len(fromForm) > 0 { |
||||
fromUnixSec, err := strconv.Atoi(fromForm[0]) |
||||
if err != nil { |
||||
return time.Time{}, time.Time{}, err |
||||
} |
||||
from = time.Unix(int64(fromUnixSec), 0) |
||||
toForm, ok := vals["to"] |
||||
if ok && len(toForm) > 0 { |
||||
toUnixSec, err := strconv.Atoi(toForm[0]) |
||||
if err != nil { |
||||
return time.Time{}, time.Time{}, err |
||||
} |
||||
to = time.Unix(int64(toUnixSec), 0) |
||||
} else { |
||||
return time.Time{}, time.Time{}, errors.New("from specified without to") |
||||
} |
||||
return from, to, nil |
||||
} |
||||
|
||||
// no time bounds specified, default to last 1h
|
||||
now := time.Now() |
||||
return now.Add(-time.Hour), now, nil |
||||
} |
||||
|
||||
func (a *api) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
||||
a.mux.ServeHTTP(w, r) |
||||
} |
||||
|
||||
func (a *api) query(w http.ResponseWriter, r *http.Request) { |
||||
err := r.ParseForm() |
||||
if err != nil { |
||||
http.Error(w, err.Error(), 500) |
||||
return |
||||
} |
||||
from, to, err := getTimeBounds(r.Form) |
||||
if err != nil { |
||||
http.Error(w, err.Error(), 500) |
||||
return |
||||
} |
||||
|
||||
sb := sq.Select("at_unix", "region_id", "hostname", "af", "address", "timestamp_source", "stable_conn", "rtt_ns").From("rtt") |
||||
sb = sb.Where(sq.And{ |
||||
sq.GtOrEq{"at_unix": from.Unix()}, |
||||
sq.LtOrEq{"at_unix": to.Unix()}, |
||||
}) |
||||
query, args, err := sb.ToSql() |
||||
if err != nil { |
||||
return |
||||
} |
||||
|
||||
rows, err := a.db.Query(query, args...) |
||||
if err != nil { |
||||
http.Error(w, err.Error(), 500) |
||||
return |
||||
} |
||||
results := make([]apiResult, 0) |
||||
for rows.Next() { |
||||
rtt := 0 |
||||
result := apiResult{ |
||||
RttNS: &rtt, |
||||
} |
||||
err = rows.Scan(&result.At, &result.RegionID, &result.Hostname, &result.Af, &result.Addr, &result.Source, &result.StableConn, &result.RttNS) |
||||
if err != nil { |
||||
http.Error(w, err.Error(), 500) |
||||
return |
||||
} |
||||
results = append(results, result) |
||||
} |
||||
if rows.Err() != nil { |
||||
http.Error(w, rows.Err().Error(), 500) |
||||
return |
||||
} |
||||
if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { |
||||
gz := gzip.NewWriter(w) |
||||
defer gz.Close() |
||||
w.Header().Set("Content-Encoding", "gzip") |
||||
err = json.NewEncoder(gz).Encode(&results) |
||||
} else { |
||||
err = json.NewEncoder(w).Encode(&results) |
||||
} |
||||
if err != nil { |
||||
http.Error(w, err.Error(), 500) |
||||
return |
||||
} |
||||
} |
||||
@ -0,0 +1,782 @@ |
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
// The stunstamp binary measures STUN round-trip latency with DERPs.
|
||||
package main |
||||
|
||||
import ( |
||||
"bytes" |
||||
"cmp" |
||||
"context" |
||||
"encoding/json" |
||||
"errors" |
||||
"flag" |
||||
"fmt" |
||||
"io" |
||||
"log" |
||||
"math" |
||||
"math/rand" |
||||
"net" |
||||
"net/http" |
||||
"net/netip" |
||||
"net/url" |
||||
"os" |
||||
"os/signal" |
||||
"slices" |
||||
"sync" |
||||
"syscall" |
||||
"time" |
||||
|
||||
"github.com/golang/snappy" |
||||
"github.com/prometheus/prometheus/prompb" |
||||
"tailscale.com/logtail/backoff" |
||||
"tailscale.com/net/stun" |
||||
"tailscale.com/tailcfg" |
||||
) |
||||
|
||||
var ( |
||||
flagDERPMap = flag.String("derp-map", "https://login.tailscale.com/derpmap/default", "URL to DERP map") |
||||
flagOut = flag.String("out", "", "output sqlite filename") |
||||
flagInterval = flag.Duration("interval", time.Minute, "interval to probe at in time.ParseDuration() format") |
||||
flagAPI = flag.String("api", "", "listen addr for HTTP API") |
||||
flagIPv6 = flag.Bool("ipv6", false, "probe IPv6 addresses") |
||||
flagRetention = flag.Duration("retention", time.Hour*24*7, "sqlite retention period in time.ParseDuration() format") |
||||
flagRemoteWriteURL = flag.String("rw-url", "", "prometheus remote write URL") |
||||
flagInstance = flag.String("instance", "", "instance label value; defaults to hostname if unspecified") |
||||
) |
||||
|
||||
const ( |
||||
minInterval = time.Second |
||||
maxBufferDuration = time.Hour |
||||
) |
||||
|
||||
func getDERPMap(ctx context.Context, url string) (*tailcfg.DERPMap, error) { |
||||
req, err := http.NewRequestWithContext(ctx, "GET", url, nil) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
resp, err := http.DefaultClient.Do(req) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
defer resp.Body.Close() |
||||
dm := tailcfg.DERPMap{} |
||||
err = json.NewDecoder(resp.Body).Decode(&dm) |
||||
if err != nil { |
||||
return nil, nil |
||||
} |
||||
return &dm, nil |
||||
} |
||||
|
||||
type timestampSource int |
||||
|
||||
const ( |
||||
timestampSourceUserspace timestampSource = iota |
||||
timestampSourceKernel |
||||
) |
||||
|
||||
func (t timestampSource) String() string { |
||||
switch t { |
||||
case timestampSourceUserspace: |
||||
return "userspace" |
||||
case timestampSourceKernel: |
||||
return "kernel" |
||||
default: |
||||
return "unknown" |
||||
} |
||||
} |
||||
|
||||
type result struct { |
||||
at time.Time |
||||
meta nodeMeta |
||||
timestampSource timestampSource |
||||
connStability connStability |
||||
rtt *time.Duration // nil signifies failure, e.g. timeout
|
||||
} |
||||
|
||||
func measureRTT(conn io.ReadWriteCloser, dst *net.UDPAddr, req []byte) (resp []byte, rtt time.Duration, err error) { |
||||
uconn, ok := conn.(*net.UDPConn) |
||||
if !ok { |
||||
return nil, 0, fmt.Errorf("unexpected conn type: %T", conn) |
||||
} |
||||
err = uconn.SetReadDeadline(time.Now().Add(time.Second * 2)) |
||||
if err != nil { |
||||
return nil, 0, fmt.Errorf("error setting read deadline: %w", err) |
||||
} |
||||
txAt := time.Now() |
||||
_, err = uconn.WriteToUDP(req, dst) |
||||
if err != nil { |
||||
return nil, 0, fmt.Errorf("error writing to udp socket: %w", err) |
||||
} |
||||
b := make([]byte, 1460) |
||||
n, err := uconn.Read(b) |
||||
rxAt := time.Now() |
||||
if err != nil { |
||||
return nil, 0, fmt.Errorf("error reading from udp socket: %w", err) |
||||
} |
||||
return b[:n], rxAt.Sub(txAt), nil |
||||
} |
||||
|
||||
func isTemporaryOrTimeoutErr(err error) bool { |
||||
if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, context.DeadlineExceeded) { |
||||
return true |
||||
} |
||||
if err, ok := err.(interface{ Temporary() bool }); ok { |
||||
return err.Temporary() |
||||
} |
||||
return false |
||||
} |
||||
|
||||
type nodeMeta struct { |
||||
regionID int |
||||
regionCode string |
||||
hostname string |
||||
addr netip.Addr |
||||
} |
||||
|
||||
type measureFn func(conn io.ReadWriteCloser, dst *net.UDPAddr, req []byte) (resp []byte, rtt time.Duration, err error) |
||||
|
||||
func probe(meta nodeMeta, conn io.ReadWriteCloser, fn measureFn) (*time.Duration, error) { |
||||
ua := &net.UDPAddr{ |
||||
IP: net.IP(meta.addr.AsSlice()), |
||||
Port: 3478, |
||||
} |
||||
|
||||
var ( |
||||
resp []byte |
||||
rtt time.Duration |
||||
) |
||||
txID := stun.NewTxID() |
||||
req := stun.Request(txID) |
||||
time.Sleep(time.Millisecond * time.Duration(rand.Intn(200))) // jitter across tx
|
||||
resp, rtt, err := fn(conn, ua, req) |
||||
if err != nil { |
||||
if isTemporaryOrTimeoutErr(err) { |
||||
log.Printf("temp error measuring RTT to %s(%s): %v", meta.hostname, meta.addr, err) |
||||
return nil, nil |
||||
} |
||||
} |
||||
_, _, err = stun.ParseResponse(resp) |
||||
if err != nil { |
||||
log.Printf("invalid stun response from %s: %v", meta.hostname, err) |
||||
return nil, nil |
||||
} |
||||
return &rtt, nil |
||||
} |
||||
|
||||
func nodeMetaFromDERPMap(dm *tailcfg.DERPMap, nodeMetaByAddr map[netip.Addr]nodeMeta, ipv6 bool) (stale []nodeMeta, err error) { |
||||
// Parse the new derp map before making any state changes in nodeMetaByAddr.
|
||||
// If parse fails we just stick with the old state.
|
||||
updated := make(map[netip.Addr]nodeMeta) |
||||
for regionID, region := range dm.Regions { |
||||
for _, node := range region.Nodes { |
||||
v4, err := netip.ParseAddr(node.IPv4) |
||||
if err != nil || !v4.Is4() { |
||||
return nil, fmt.Errorf("invalid ipv4 addr for node in derp map: %v", node.Name) |
||||
} |
||||
metas := make([]nodeMeta, 0, 2) |
||||
metas = append(metas, nodeMeta{ |
||||
regionID: regionID, |
||||
regionCode: region.RegionCode, |
||||
hostname: node.HostName, |
||||
addr: v4, |
||||
}) |
||||
if ipv6 { |
||||
v6, err := netip.ParseAddr(node.IPv6) |
||||
if err != nil || !v6.Is6() { |
||||
return nil, fmt.Errorf("invalid ipv6 addr for node in derp map: %v", node.Name) |
||||
} |
||||
metas = append(metas, metas[0]) |
||||
metas[1].addr = v6 |
||||
} |
||||
for _, meta := range metas { |
||||
updated[meta.addr] = meta |
||||
} |
||||
} |
||||
} |
||||
|
||||
// Find nodeMeta that have changed
|
||||
for addr, updatedMeta := range updated { |
||||
previousMeta, ok := nodeMetaByAddr[addr] |
||||
if ok { |
||||
if previousMeta == updatedMeta { |
||||
continue |
||||
} |
||||
stale = append(stale, previousMeta) |
||||
nodeMetaByAddr[addr] = updatedMeta |
||||
} else { |
||||
nodeMetaByAddr[addr] = updatedMeta |
||||
} |
||||
} |
||||
|
||||
// Find nodeMeta that no longer exist
|
||||
for addr, potentialStale := range nodeMetaByAddr { |
||||
_, ok := updated[addr] |
||||
if !ok { |
||||
stale = append(stale, potentialStale) |
||||
} |
||||
} |
||||
|
||||
return stale, nil |
||||
} |
||||
|
||||
func getStableConns(stableConns map[netip.Addr][2]io.ReadWriteCloser, addr netip.Addr) ([2]io.ReadWriteCloser, error) { |
||||
conns, ok := stableConns[addr] |
||||
if ok { |
||||
return conns, nil |
||||
} |
||||
if supportsKernelTS() { |
||||
kconn, err := getConnKernelTimestamp() |
||||
if err != nil { |
||||
return conns, err |
||||
} |
||||
conns[timestampSourceKernel] = kconn |
||||
} |
||||
uconn, err := net.ListenUDP("udp", &net.UDPAddr{}) |
||||
if err != nil { |
||||
return conns, err |
||||
} |
||||
conns[timestampSourceUserspace] = uconn |
||||
stableConns[addr] = conns |
||||
return conns, nil |
||||
} |
||||
|
||||
// probeNodes measures the round-trip time for STUN binding requests against the
|
||||
// DERP nodes described by nodeMetaByAddr while using/updating stableConns for
|
||||
// UDP sockets that should be recycled across runs. It returns the results or
|
||||
// an error if one occurs.
|
||||
func probeNodes(nodeMetaByAddr map[netip.Addr]nodeMeta, stableConns map[netip.Addr][2]io.ReadWriteCloser) ([]result, error) { |
||||
wg := sync.WaitGroup{} |
||||
results := make([]result, 0) |
||||
resultsCh := make(chan result) |
||||
errCh := make(chan error) |
||||
doneCh := make(chan struct{}) |
||||
numProbes := 0 |
||||
at := time.Now() |
||||
addrsToProbe := make(map[netip.Addr]bool) |
||||
|
||||
doProbe := func(conn io.ReadWriteCloser, meta nodeMeta, source timestampSource) { |
||||
defer wg.Done() |
||||
r := result{} |
||||
if conn == nil { |
||||
var err error |
||||
if source == timestampSourceKernel { |
||||
conn, err = getConnKernelTimestamp() |
||||
} else { |
||||
conn, err = net.ListenUDP("udp", &net.UDPAddr{}) |
||||
} |
||||
if err != nil { |
||||
select { |
||||
case <-doneCh: |
||||
return |
||||
case errCh <- err: |
||||
return |
||||
} |
||||
} |
||||
defer conn.Close() |
||||
} else { |
||||
r.connStability = stableConn |
||||
} |
||||
fn := measureRTT |
||||
if source == timestampSourceKernel { |
||||
fn = measureRTTKernel |
||||
} |
||||
rtt, err := probe(meta, conn, fn) |
||||
if err != nil { |
||||
select { |
||||
case <-doneCh: |
||||
return |
||||
case errCh <- err: |
||||
return |
||||
} |
||||
} |
||||
r.at = at |
||||
r.meta = meta |
||||
r.timestampSource = source |
||||
r.rtt = rtt |
||||
select { |
||||
case <-doneCh: |
||||
case resultsCh <- r: |
||||
} |
||||
} |
||||
|
||||
for _, meta := range nodeMetaByAddr { |
||||
addrsToProbe[meta.addr] = true |
||||
stable, err := getStableConns(stableConns, meta.addr) |
||||
if err != nil { |
||||
close(doneCh) |
||||
wg.Wait() |
||||
return nil, err |
||||
} |
||||
|
||||
wg.Add(2) |
||||
numProbes += 2 |
||||
go doProbe(stable[timestampSourceUserspace], meta, timestampSourceUserspace) |
||||
go doProbe(nil, meta, timestampSourceUserspace) |
||||
if supportsKernelTS() { |
||||
wg.Add(2) |
||||
numProbes += 2 |
||||
go doProbe(stable[timestampSourceKernel], meta, timestampSourceKernel) |
||||
go doProbe(nil, meta, timestampSourceKernel) |
||||
} |
||||
} |
||||
|
||||
// cleanup conns we no longer need
|
||||
for k, conns := range stableConns { |
||||
if !addrsToProbe[k] { |
||||
if conns[timestampSourceKernel] != nil { |
||||
conns[timestampSourceKernel].Close() |
||||
} |
||||
conns[timestampSourceUserspace].Close() |
||||
delete(stableConns, k) |
||||
} |
||||
} |
||||
|
||||
for { |
||||
select { |
||||
case err := <-errCh: |
||||
close(doneCh) |
||||
wg.Wait() |
||||
return nil, err |
||||
case result := <-resultsCh: |
||||
results = append(results, result) |
||||
if len(results) == numProbes { |
||||
return results, nil |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
type connStability bool |
||||
|
||||
const ( |
||||
unstableConn connStability = false |
||||
stableConn connStability = true |
||||
) |
||||
|
||||
func timeSeriesLabels(meta nodeMeta, instance string, source timestampSource, stability connStability) []prompb.Label { |
||||
addressFamily := "ipv4" |
||||
if meta.addr.Is6() { |
||||
addressFamily = "ipv6" |
||||
} |
||||
labels := make([]prompb.Label, 0) |
||||
labels = append(labels, prompb.Label{ |
||||
Name: "job", |
||||
Value: "stunstamp-rw", |
||||
}) |
||||
labels = append(labels, prompb.Label{ |
||||
Name: "instance", |
||||
Value: instance, |
||||
}) |
||||
labels = append(labels, prompb.Label{ |
||||
Name: "region_id", |
||||
Value: fmt.Sprintf("%d", meta.regionID), |
||||
}) |
||||
labels = append(labels, prompb.Label{ |
||||
Name: "region_code", |
||||
Value: meta.regionCode, |
||||
}) |
||||
labels = append(labels, prompb.Label{ |
||||
Name: "address_family", |
||||
Value: addressFamily, |
||||
}) |
||||
labels = append(labels, prompb.Label{ |
||||
Name: "hostname", |
||||
Value: meta.hostname, |
||||
}) |
||||
labels = append(labels, prompb.Label{ |
||||
Name: "__name__", |
||||
Value: "stunstamp_derp_stun_rtt_ns", |
||||
}) |
||||
labels = append(labels, prompb.Label{ |
||||
Name: "timestamp_source", |
||||
Value: source.String(), |
||||
}) |
||||
labels = append(labels, prompb.Label{ |
||||
Name: "stable_conn", |
||||
Value: fmt.Sprintf("%v", stability), |
||||
}) |
||||
slices.SortFunc(labels, func(a, b prompb.Label) int { |
||||
// prometheus remote-write spec requires lexicographically sorted label names
|
||||
return cmp.Compare(a.Name, b.Name) |
||||
}) |
||||
return labels |
||||
} |
||||
|
||||
const ( |
||||
// https://prometheus.io/docs/concepts/remote_write_spec/#stale-markers
|
||||
staleNaN uint64 = 0x7ff0000000000002 |
||||
) |
||||
|
||||
func staleMarkersFromNodeMeta(stale []nodeMeta, instance string) []prompb.TimeSeries { |
||||
staleMarkers := make([]prompb.TimeSeries, 0) |
||||
now := time.Now() |
||||
for _, s := range stale { |
||||
samples := []prompb.Sample{ |
||||
{ |
||||
Timestamp: now.UnixMilli(), |
||||
Value: math.Float64frombits(staleNaN), |
||||
}, |
||||
} |
||||
staleMarkers = append(staleMarkers, prompb.TimeSeries{ |
||||
Labels: timeSeriesLabels(s, instance, timestampSourceUserspace, unstableConn), |
||||
Samples: samples, |
||||
}) |
||||
staleMarkers = append(staleMarkers, prompb.TimeSeries{ |
||||
Labels: timeSeriesLabels(s, instance, timestampSourceUserspace, stableConn), |
||||
Samples: samples, |
||||
}) |
||||
if supportsKernelTS() { |
||||
staleMarkers = append(staleMarkers, prompb.TimeSeries{ |
||||
Labels: timeSeriesLabels(s, instance, timestampSourceKernel, unstableConn), |
||||
Samples: samples, |
||||
}) |
||||
staleMarkers = append(staleMarkers, prompb.TimeSeries{ |
||||
Labels: timeSeriesLabels(s, instance, timestampSourceKernel, stableConn), |
||||
Samples: samples, |
||||
}) |
||||
} |
||||
} |
||||
return staleMarkers |
||||
} |
||||
|
||||
func resultToPromTimeSeries(r result, instance string) prompb.TimeSeries { |
||||
labels := timeSeriesLabels(r.meta, instance, r.timestampSource, r.connStability) |
||||
samples := make([]prompb.Sample, 1) |
||||
samples[0].Timestamp = r.at.UnixMilli() |
||||
if r.rtt != nil { |
||||
samples[0].Value = float64(*r.rtt) |
||||
} else { |
||||
samples[0].Value = math.NaN() |
||||
// TODO: timeout counter
|
||||
} |
||||
ts := prompb.TimeSeries{ |
||||
Labels: labels, |
||||
Samples: samples, |
||||
} |
||||
slices.SortFunc(ts.Labels, func(a, b prompb.Label) int { |
||||
// prometheus remote-write spec requires lexicographically sorted label names
|
||||
return cmp.Compare(a.Name, b.Name) |
||||
}) |
||||
return ts |
||||
} |
||||
|
||||
type remoteWriteClient struct { |
||||
c *http.Client |
||||
url string |
||||
} |
||||
|
||||
type recoverableErr struct { |
||||
error |
||||
} |
||||
|
||||
func newRemoteWriteClient(url string) *remoteWriteClient { |
||||
return &remoteWriteClient{ |
||||
c: &http.Client{ |
||||
Timeout: time.Second * 30, |
||||
}, |
||||
url: url, |
||||
} |
||||
} |
||||
|
||||
func (r *remoteWriteClient) write(ctx context.Context, ts []prompb.TimeSeries) error { |
||||
wr := &prompb.WriteRequest{ |
||||
Timeseries: ts, |
||||
} |
||||
b, err := wr.Marshal() |
||||
if err != nil { |
||||
return fmt.Errorf("unable to marshal write request: %w", err) |
||||
} |
||||
compressed := snappy.Encode(nil, b) |
||||
req, err := http.NewRequestWithContext(ctx, "POST", r.url, bytes.NewReader(compressed)) |
||||
if err != nil { |
||||
return fmt.Errorf("unable to create write request: %w", err) |
||||
} |
||||
req.Header.Add("Content-Encoding", "snappy") |
||||
req.Header.Set("Content-Type", "application/x-protobuf") |
||||
req.Header.Set("User-Agent", "stunstamp") |
||||
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") |
||||
resp, err := r.c.Do(req) |
||||
if err != nil { |
||||
return recoverableErr{fmt.Errorf("error performing write request: %w", err)} |
||||
} |
||||
if resp.StatusCode/100 != 2 { |
||||
err = fmt.Errorf("remote server %s returned HTTP status %d", r.url, resp.StatusCode) |
||||
} |
||||
if resp.StatusCode/100 == 5 || resp.StatusCode == http.StatusTooManyRequests { |
||||
return recoverableErr{err} |
||||
} |
||||
return err |
||||
} |
||||
|
||||
func remoteWriteTimeSeries(client *remoteWriteClient, tsCh chan []prompb.TimeSeries) { |
||||
bo := backoff.NewBackoff("remote-write", log.Printf, time.Second*30) |
||||
for ts := range tsCh { |
||||
for { |
||||
reqCtx, cancel := context.WithTimeout(context.Background(), time.Second*30) |
||||
err := client.write(reqCtx, ts) |
||||
cancel() |
||||
// we could parse the Retry-After header, but use a simple exp
|
||||
// backoff for now
|
||||
bo.BackOff(context.Background(), err) |
||||
if err == nil { |
||||
break |
||||
} |
||||
var re recoverableErr |
||||
if !errors.Is(err, &re) { |
||||
log.Printf("unrecoverable remote write error: %v", err) |
||||
break |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
func main() { |
||||
flag.Parse() |
||||
if len(*flagDERPMap) < 1 { |
||||
log.Fatal("derp-map flag is unset") |
||||
} |
||||
if len(*flagOut) < 1 { |
||||
log.Fatal("out flag is unset") |
||||
} |
||||
if *flagInterval < minInterval || *flagInterval > maxBufferDuration { |
||||
log.Fatalf("interval must be >= %s and <= %s", minInterval, maxBufferDuration) |
||||
} |
||||
if *flagRetention < *flagInterval { |
||||
log.Fatalf("retention must be >= interval") |
||||
} |
||||
if len(*flagRemoteWriteURL) < 1 { |
||||
log.Fatalf("rw-url flag is unset") |
||||
} |
||||
_, err := url.Parse(*flagRemoteWriteURL) |
||||
if err != nil { |
||||
log.Fatalf("invalid rw-url flag value: %v", err) |
||||
} |
||||
if len(*flagInstance) < 1 { |
||||
hostname, err := os.Hostname() |
||||
if err != nil { |
||||
log.Fatalf("failed to get hostname: %v", err) |
||||
} |
||||
*flagInstance = hostname |
||||
} |
||||
|
||||
sigCh := make(chan os.Signal, 1) |
||||
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) |
||||
dmCh := make(chan *tailcfg.DERPMap) |
||||
|
||||
go func() { |
||||
bo := backoff.NewBackoff("derp-map", log.Printf, time.Second*30) |
||||
for { |
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) |
||||
dm, err := getDERPMap(ctx, *flagDERPMap) |
||||
cancel() |
||||
bo.BackOff(context.Background(), err) |
||||
if err != nil { |
||||
continue |
||||
} |
||||
dmCh <- dm |
||||
return |
||||
} |
||||
}() |
||||
|
||||
nodeMetaByAddr := make(map[netip.Addr]nodeMeta) |
||||
select { |
||||
case <-sigCh: |
||||
return |
||||
case dm := <-dmCh: |
||||
_, err := nodeMetaFromDERPMap(dm, nodeMetaByAddr, *flagIPv6) |
||||
if err != nil { |
||||
log.Fatalf("error parsing derp map on startup: %v", err) |
||||
} |
||||
} |
||||
|
||||
db, err := newDB(*flagOut) |
||||
if err != nil { |
||||
log.Fatalf("error opening output file for writing: %v", err) |
||||
} |
||||
defer db.Close() |
||||
|
||||
_, err = db.Exec("PRAGMA journal_mode=WAL") |
||||
if err != nil { |
||||
log.Fatalf("error enabling WAL mode: %v", err) |
||||
} |
||||
|
||||
// No indices or primary key. Keep it simple for now. Reads will be full
|
||||
// scans. We can AUTOINCREMENT rowid in the future and hold an in-memory
|
||||
// index to at_unix if needed as reads are almost always going to be
|
||||
// time-bound (e.g. WHERE at_unix >= ?). At the time of authorship we have
|
||||
// ~300 data points per-interval w/o ipv6 w/kernel timestamping resulting
|
||||
// in ~2.6m rows in 24h w/a 10s probe interval.
|
||||
_, err = db.Exec(` |
||||
CREATE TABLE IF NOT EXISTS rtt(at_unix INT, region_id INT, hostname TEXT, af INT, address TEXT, timestamp_source INT, stable_conn INT, rtt_ns INT) |
||||
`) |
||||
if err != nil { |
||||
log.Fatalf("error initializing db: %v", err) |
||||
} |
||||
|
||||
wg := sync.WaitGroup{} |
||||
httpErrCh := make(chan error, 1) |
||||
var httpServer *http.Server |
||||
if len(*flagAPI) > 0 { |
||||
api := newAPI(db) |
||||
httpServer = &http.Server{ |
||||
Addr: *flagAPI, |
||||
Handler: api, |
||||
ReadTimeout: time.Second * 60, |
||||
WriteTimeout: time.Second * 60, |
||||
} |
||||
wg.Add(1) |
||||
go func() { |
||||
err := httpServer.ListenAndServe() |
||||
httpErrCh <- err |
||||
wg.Done() |
||||
}() |
||||
} |
||||
|
||||
tsCh := make(chan []prompb.TimeSeries, maxBufferDuration / *flagInterval) |
||||
remoteWriteDoneCh := make(chan struct{}) |
||||
rwc := newRemoteWriteClient(*flagRemoteWriteURL) |
||||
go func() { |
||||
remoteWriteTimeSeries(rwc, tsCh) |
||||
close(remoteWriteDoneCh) |
||||
}() |
||||
|
||||
shutdown := func() { |
||||
if httpServer != nil { |
||||
httpServer.Close() |
||||
} |
||||
close(tsCh) |
||||
select { |
||||
case <-time.After(time.Second * 10): // give goroutine some time to flush
|
||||
case <-remoteWriteDoneCh: |
||||
} |
||||
|
||||
// send stale markers on shutdown
|
||||
staleMeta := make([]nodeMeta, 0, len(nodeMetaByAddr)) |
||||
for _, v := range nodeMetaByAddr { |
||||
staleMeta = append(staleMeta, v) |
||||
} |
||||
staleMarkers := staleMarkersFromNodeMeta(staleMeta, *flagInstance) |
||||
if len(staleMarkers) > 0 { |
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) |
||||
rwc.write(ctx, staleMarkers) |
||||
cancel() |
||||
} |
||||
|
||||
wg.Wait() |
||||
return |
||||
} |
||||
|
||||
log.Println("stunstamp started") |
||||
|
||||
// Re-using sockets means we get the same 5-tuple across runs. This results
|
||||
// in a higher probability of the packets traversing the same underlay path.
|
||||
// Comparison of stable and unstable 5-tuple results can shed light on
|
||||
// differences between paths where hashing (multipathing/load balancing)
|
||||
// comes into play.
|
||||
stableConns := make(map[netip.Addr][2]io.ReadWriteCloser) |
||||
|
||||
derpMapTicker := time.NewTicker(time.Minute * 5) |
||||
defer derpMapTicker.Stop() |
||||
probeTicker := time.NewTicker(*flagInterval) |
||||
defer probeTicker.Stop() |
||||
cleanupTicker := time.NewTicker(time.Hour) |
||||
defer cleanupTicker.Stop() |
||||
|
||||
for { |
||||
select { |
||||
case <-cleanupTicker.C: |
||||
older := time.Now().Add(-*flagRetention) |
||||
log.Printf("cleaning up measurements older than %v", older) |
||||
_, err := db.Exec("DELETE FROM rtt WHERE at_unix < ?", older.Unix()) |
||||
if err != nil { |
||||
log.Printf("error cleaning up old data: %v", err) |
||||
shutdown() |
||||
return |
||||
} |
||||
case <-probeTicker.C: |
||||
results, err := probeNodes(nodeMetaByAddr, stableConns) |
||||
if err != nil { |
||||
log.Printf("unrecoverable error while probing: %v", err) |
||||
shutdown() |
||||
return |
||||
} |
||||
ts := make([]prompb.TimeSeries, 0, len(results)) |
||||
for _, r := range results { |
||||
ts = append(ts, resultToPromTimeSeries(r, *flagInstance)) |
||||
} |
||||
select { |
||||
case tsCh <- ts: |
||||
default: |
||||
select { |
||||
case <-tsCh: |
||||
log.Println("prometheus remote-write buffer full, dropped measurements") |
||||
default: |
||||
tsCh <- ts |
||||
} |
||||
} |
||||
tx, err := db.Begin() |
||||
if err != nil { |
||||
log.Printf("error beginning sqlite tx: %v", err) |
||||
shutdown() |
||||
return |
||||
} |
||||
for _, result := range results { |
||||
af := 4 |
||||
if result.meta.addr.Is6() { |
||||
af = 6 |
||||
} |
||||
_, err = tx.Exec("INSERT INTO rtt(at_unix, region_id, hostname, af, address, timestamp_source, stable_conn, rtt_ns) VALUES(?, ?, ?, ?, ?, ?, ?, ?)", |
||||
result.at.Unix(), result.meta.regionID, result.meta.hostname, af, result.meta.addr.String(), result.timestampSource, result.connStability, result.rtt) |
||||
if err != nil { |
||||
tx.Rollback() |
||||
log.Printf("error adding result to tx: %v", err) |
||||
shutdown() |
||||
return |
||||
} |
||||
} |
||||
err = tx.Commit() |
||||
if err != nil { |
||||
log.Printf("error committing tx: %v", err) |
||||
shutdown() |
||||
return |
||||
} |
||||
case dm := <-dmCh: |
||||
staleMeta, err := nodeMetaFromDERPMap(dm, nodeMetaByAddr, *flagIPv6) |
||||
if err != nil { |
||||
log.Printf("error parsing DERP map, continuing with stale map: %v", err) |
||||
continue |
||||
} |
||||
staleMarkers := staleMarkersFromNodeMeta(staleMeta, *flagInstance) |
||||
if len(staleMarkers) < 1 { |
||||
continue |
||||
} |
||||
select { |
||||
case tsCh <- staleMarkers: |
||||
default: |
||||
select { |
||||
case <-tsCh: |
||||
log.Println("prometheus remote-write buffer full, dropped measurements") |
||||
default: |
||||
tsCh <- staleMarkers |
||||
} |
||||
} |
||||
case <-derpMapTicker.C: |
||||
go func() { |
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) |
||||
defer cancel() |
||||
updatedDM, err := getDERPMap(ctx, *flagDERPMap) |
||||
if err != nil { |
||||
dmCh <- updatedDM |
||||
} |
||||
}() |
||||
case err := <-httpErrCh: |
||||
log.Printf("http server error: %v", err) |
||||
shutdown() |
||||
return |
||||
case <-sigCh: |
||||
shutdown() |
||||
return |
||||
} |
||||
} |
||||
} |
||||
@ -0,0 +1,26 @@ |
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
//go:build !(windows && 386)
|
||||
|
||||
package main |
||||
|
||||
import ( |
||||
"database/sql" |
||||
|
||||
_ "modernc.org/sqlite" |
||||
) |
||||
|
||||
type db struct { |
||||
*sql.DB |
||||
} |
||||
|
||||
func newDB(path string) (*db, error) { |
||||
d, err := sql.Open("sqlite", *flagOut) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return &db{ |
||||
DB: d, |
||||
}, nil |
||||
} |
||||
@ -0,0 +1,17 @@ |
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package main |
||||
|
||||
import ( |
||||
"database/sql" |
||||
"errors" |
||||
) |
||||
|
||||
type db struct { |
||||
*sql.DB |
||||
} |
||||
|
||||
func newDB(path string) (*db, error) { |
||||
return nil, errors.New("unsupported platform") |
||||
} |
||||
@ -0,0 +1,25 @@ |
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
//go:build !linux
|
||||
|
||||
package main |
||||
|
||||
import ( |
||||
"errors" |
||||
"io" |
||||
"net" |
||||
"time" |
||||
) |
||||
|
||||
func getConnKernelTimestamp() (io.ReadWriteCloser, error) { |
||||
return nil, errors.New("unimplemented") |
||||
} |
||||
|
||||
func measureRTTKernel(conn io.ReadWriteCloser, dst *net.UDPAddr, req []byte) (resp []byte, rtt time.Duration, err error) { |
||||
return nil, 0, errors.New("unimplemented") |
||||
} |
||||
|
||||
func supportsKernelTS() bool { |
||||
return false |
||||
} |
||||
@ -0,0 +1,126 @@ |
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package main |
||||
|
||||
import ( |
||||
"bytes" |
||||
"context" |
||||
"encoding/binary" |
||||
"errors" |
||||
"fmt" |
||||
"io" |
||||
"net" |
||||
"time" |
||||
|
||||
"github.com/mdlayher/socket" |
||||
"golang.org/x/sys/unix" |
||||
) |
||||
|
||||
const ( |
||||
flags = unix.SOF_TIMESTAMPING_TX_SOFTWARE | // tx timestamp generation in device driver
|
||||
unix.SOF_TIMESTAMPING_RX_SOFTWARE | // rx timestamp generation in the kernel
|
||||
unix.SOF_TIMESTAMPING_SOFTWARE // report software timestamps
|
||||
) |
||||
|
||||
func getConnKernelTimestamp() (io.ReadWriteCloser, error) { |
||||
sconn, err := socket.Socket(unix.AF_INET6, unix.SOCK_DGRAM, unix.IPPROTO_UDP, "udp", nil) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
sa := unix.SockaddrInet6{} |
||||
err = sconn.Bind(&sa) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
err = sconn.SetsockoptInt(unix.SOL_SOCKET, unix.SO_TIMESTAMPING_NEW, flags) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return sconn, nil |
||||
} |
||||
|
||||
func parseTimestampFromCmsgs(oob []byte) (time.Time, error) { |
||||
msgs, err := unix.ParseSocketControlMessage(oob) |
||||
if err != nil { |
||||
return time.Time{}, fmt.Errorf("error parsing oob as cmsgs: %w", err) |
||||
} |
||||
for _, msg := range msgs { |
||||
if msg.Header.Level == unix.SOL_SOCKET && msg.Header.Type == unix.SO_TIMESTAMPING_NEW && len(msg.Data) >= 16 { |
||||
sec := int64(binary.NativeEndian.Uint64(msg.Data[:8])) |
||||
ns := int64(binary.NativeEndian.Uint64(msg.Data[8:16])) |
||||
return time.Unix(sec, ns), nil |
||||
} |
||||
} |
||||
return time.Time{}, errors.New("failed to parse timestamp from cmsgs") |
||||
} |
||||
|
||||
func measureRTTKernel(conn io.ReadWriteCloser, dst *net.UDPAddr, req []byte) (resp []byte, rtt time.Duration, err error) { |
||||
sconn, ok := conn.(*socket.Conn) |
||||
if !ok { |
||||
return nil, 0, fmt.Errorf("conn of unexpected type: %T", conn) |
||||
} |
||||
|
||||
var to unix.Sockaddr |
||||
to4 := dst.IP.To4() |
||||
if to4 != nil { |
||||
to = &unix.SockaddrInet4{ |
||||
Port: 3478, |
||||
} |
||||
copy(to.(*unix.SockaddrInet4).Addr[:], to4) |
||||
} else { |
||||
to = &unix.SockaddrInet6{ |
||||
Port: 3478, |
||||
} |
||||
copy(to.(*unix.SockaddrInet6).Addr[:], dst.IP) |
||||
} |
||||
|
||||
err = sconn.Sendto(context.Background(), req, 0, to) |
||||
if err != nil { |
||||
return nil, 0, fmt.Errorf("sendto error: %v", err) // don't wrap
|
||||
} |
||||
|
||||
txCtx, txCancel := context.WithTimeout(context.Background(), time.Second*2) |
||||
defer txCancel() |
||||
|
||||
buf := make([]byte, 1024) |
||||
oob := make([]byte, 1024) |
||||
var txAt time.Time |
||||
|
||||
for { |
||||
n, oobn, _, _, err := sconn.Recvmsg(txCtx, buf, oob, unix.MSG_ERRQUEUE) |
||||
if err != nil { |
||||
return nil, 0, fmt.Errorf("recvmsg (MSG_ERRQUEUE) error: %v", err) // don't wrap
|
||||
} |
||||
|
||||
buf = buf[:n] |
||||
if n < len(req) || !bytes.Equal(req, buf[len(buf)-len(req):]) { |
||||
// Spin until we find the message we sent. We get the full packet
|
||||
// looped including eth header so match against the tail.
|
||||
continue |
||||
} |
||||
txAt, err = parseTimestampFromCmsgs(oob[:oobn]) |
||||
if err != nil { |
||||
return nil, 0, fmt.Errorf("failed to get tx timestamp: %v", err) // don't wrap
|
||||
} |
||||
break |
||||
} |
||||
|
||||
rxCtx, rxCancel := context.WithTimeout(context.Background(), time.Second*2) |
||||
defer rxCancel() |
||||
n, oobn, _, _, err := sconn.Recvmsg(rxCtx, buf, oob, 0) |
||||
if err != nil { |
||||
return nil, 0, fmt.Errorf("recvmsg error: %w", err) // wrap for timeout-related error unwrapping
|
||||
} |
||||
|
||||
rxAt, err := parseTimestampFromCmsgs(oob[:oobn]) |
||||
if err != nil { |
||||
return nil, 0, fmt.Errorf("failed to get rx timestamp: %v", err) // don't wrap
|
||||
} |
||||
|
||||
return buf[:n], rxAt.Sub(txAt), nil |
||||
} |
||||
|
||||
func supportsKernelTS() bool { |
||||
return true |
||||
} |
||||
Loading…
Reference in new issue