k8s-operator,kube: allowing k8s api request events to be enabled via grants (#18393)

Updates #35796

Signed-off-by: chaosinthecrd <tom@tmlabs.co.uk>
This commit is contained in:
Tom Meadows
2026-01-16 13:29:12 +00:00
committed by GitHub
parent 54d77898da
commit 1cc6f3282e
6 changed files with 118 additions and 50 deletions
+2
View File
@@ -221,6 +221,8 @@ OmitEmptyUnsupportedInV2 tailscale.com/kube/kubeapi.Event.Count
OmitEmptyUnsupportedInV2 tailscale.com/kube/kubeapi.ObjectMeta.Generation OmitEmptyUnsupportedInV2 tailscale.com/kube/kubeapi.ObjectMeta.Generation
OmitEmptyUnsupportedInV2 tailscale.com/kube/kubeapi.Status.Code OmitEmptyUnsupportedInV2 tailscale.com/kube/kubeapi.Status.Code
OmitEmptyUnsupportedInV2 tailscale.com/kube/kubetypes.KubernetesCapRule.EnforceRecorder OmitEmptyUnsupportedInV2 tailscale.com/kube/kubetypes.KubernetesCapRule.EnforceRecorder
OmitEmptyUnsupportedInV2 tailscale.com/kube/kubetypes.KubernetesCapRule.EnableEvents
OmitEmptyUnsupportedInV2 tailscale.com/kube/kubetypes.KubernetesCapRule.EnableSessionRecordings
OmitEmptyUnsupportedInV2 tailscale.com/log/sockstatlog.event.IsCellularInterface OmitEmptyUnsupportedInV2 tailscale.com/log/sockstatlog.event.IsCellularInterface
OmitEmptyUnsupportedInV2 tailscale.com/sessionrecording.CastHeader.SrcNodeUserID OmitEmptyUnsupportedInV2 tailscale.com/sessionrecording.CastHeader.SrcNodeUserID
OmitEmptyUnsupportedInV2 tailscale.com/sessionrecording.Source.NodeUserID OmitEmptyUnsupportedInV2 tailscale.com/sessionrecording.Source.NodeUserID
+85 -41
View File
@@ -46,6 +46,10 @@ var (
whoIsKey = ctxkey.New("", (*apitype.WhoIsResponse)(nil)) whoIsKey = ctxkey.New("", (*apitype.WhoIsResponse)(nil))
) )
const (
eventsEnabledVar = "TS_EXPERIMENTAL_KUBE_API_EVENTS"
)
// NewAPIServerProxy creates a new APIServerProxy that's ready to start once Run // NewAPIServerProxy creates a new APIServerProxy that's ready to start once Run
// is called. No network traffic will flow until Run is called. // is called. No network traffic will flow until Run is called.
// //
@@ -97,7 +101,7 @@ func NewAPIServerProxy(zlog *zap.SugaredLogger, restConfig *rest.Config, ts *tsn
upstreamURL: u, upstreamURL: u,
ts: ts, ts: ts,
sendEventFunc: sessionrecording.SendEvent, sendEventFunc: sessionrecording.SendEvent,
eventsEnabled: envknob.Bool("TS_EXPERIMENTAL_KUBE_API_EVENTS"), eventsEnabled: envknob.Bool(eventsEnabledVar),
} }
ap.rp = &httputil.ReverseProxy{ ap.rp = &httputil.ReverseProxy{
Rewrite: func(pr *httputil.ProxyRequest) { Rewrite: func(pr *httputil.ProxyRequest) {
@@ -128,6 +132,10 @@ func (ap *APIServerProxy) Run(ctx context.Context) error {
TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)), TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
} }
if ap.eventsEnabled {
ap.log.Warnf("DEPRECATED: %q environment variable is deprecated, and will be removed in v1.96. See documentation for more detail.", eventsEnabledVar)
}
mode := "noauth" mode := "noauth"
if ap.authMode { if ap.authMode {
mode = "auth" mode = "auth"
@@ -196,6 +204,7 @@ type APIServerProxy struct {
sendEventFunc func(ap netip.AddrPort, event io.Reader, dial netx.DialFunc) error sendEventFunc func(ap netip.AddrPort, event io.Reader, dial netx.DialFunc) error
// Flag used to enable sending API requests as events to tsrecorder. // Flag used to enable sending API requests as events to tsrecorder.
// Deprecated: events are now set via ACLs (see https://tailscale.com/kb/1246/tailscale-ssh-session-recording#turn-on-session-recording-in-your-tailnet-policy-file)
eventsEnabled bool eventsEnabled bool
} }
@@ -207,13 +216,34 @@ func (ap *APIServerProxy) serveDefault(w http.ResponseWriter, r *http.Request) {
return return
} }
if err = ap.recordRequestAsEvent(r, who); err != nil { c, err := determineRecorderConfig(who)
msg := fmt.Sprintf("error recording Kubernetes API request: %v", err) if err != nil {
ap.log.Errorf(msg) ap.log.Errorf("error trying to determine whether the kubernetes api request %q needs to be recorded: %v", r.URL.String(), err)
http.Error(w, msg, http.StatusBadGateway)
return return
} }
if c.failOpen && len(c.recorderAddresses) == 0 { // will not record
ap.rp.ServeHTTP(w, r.WithContext(whoIsKey.WithValue(r.Context(), who)))
return
}
ksr.CounterKubernetesAPIRequestEventsAttempted.Add(1) // at this point we know that users intended for this request to be recorded
if !c.failOpen && len(c.recorderAddresses) == 0 {
msg := fmt.Sprintf("forbidden: api request %q must be recorded, but no recorders are available.", r.URL.String())
ap.log.Error(msg)
http.Error(w, msg, http.StatusForbidden)
return
}
// NOTE: (ChaosInTheCRD) ap.eventsEnabled deprecated, remove in v1.96
if c.enableEvents || ap.eventsEnabled {
if err = ap.recordRequestAsEvent(r, who, c.recorderAddresses, c.failOpen); err != nil {
msg := fmt.Sprintf("error recording Kubernetes API request: %v", err)
ap.log.Errorf(msg)
http.Error(w, msg, http.StatusBadGateway)
return
}
}
counterNumRequestsProxied.Add(1) counterNumRequestsProxied.Add(1)
ap.rp.ServeHTTP(w, r.WithContext(whoIsKey.WithValue(r.Context(), who))) ap.rp.ServeHTTP(w, r.WithContext(whoIsKey.WithValue(r.Context(), who)))
@@ -256,35 +286,45 @@ func (ap *APIServerProxy) sessionForProto(w http.ResponseWriter, r *http.Request
return return
} }
if err = ap.recordRequestAsEvent(r, who); err != nil {
msg := fmt.Sprintf("error recording Kubernetes API request: %v", err)
ap.log.Errorf(msg)
http.Error(w, msg, http.StatusBadGateway)
return
}
counterNumRequestsProxied.Add(1) counterNumRequestsProxied.Add(1)
failOpen, addrs, err := determineRecorderConfig(who) c, err := determineRecorderConfig(who)
if err != nil { if err != nil {
ap.log.Errorf("error trying to determine whether the 'kubectl %s' session needs to be recorded: %v", sessionType, err) ap.log.Errorf("error trying to determine whether the 'kubectl %s' session needs to be recorded: %v", sessionType, err)
return return
} }
if failOpen && len(addrs) == 0 { // will not record
if c.failOpen && len(c.recorderAddresses) == 0 { // will not record
ap.rp.ServeHTTP(w, r.WithContext(whoIsKey.WithValue(r.Context(), who))) ap.rp.ServeHTTP(w, r.WithContext(whoIsKey.WithValue(r.Context(), who)))
return return
} }
ksr.CounterSessionRecordingsAttempted.Add(1) // at this point we know that users intended for this session to be recorded ksr.CounterKubernetesAPIRequestEventsAttempted.Add(1) // at this point we know that users intended for this request to be recorded
if !failOpen && len(addrs) == 0 { if !c.failOpen && len(c.recorderAddresses) == 0 {
msg := fmt.Sprintf("forbidden: 'kubectl %s' session must be recorded, but no recorders are available.", sessionType) msg := fmt.Sprintf("forbidden: 'kubectl %s' session must be recorded, but no recorders are available.", sessionType)
ap.log.Error(msg) ap.log.Error(msg)
http.Error(w, msg, http.StatusForbidden) http.Error(w, msg, http.StatusForbidden)
return return
} }
// NOTE: (ChaosInTheCRD) ap.eventsEnabled deprecated, remove in v1.96
if c.enableEvents || ap.eventsEnabled {
if err = ap.recordRequestAsEvent(r, who, c.recorderAddresses, c.failOpen); err != nil {
msg := fmt.Sprintf("error recording Kubernetes API request: %v", err)
ap.log.Errorf(msg)
http.Error(w, msg, http.StatusBadGateway)
return
}
}
if !c.enableRecordings {
ap.rp.ServeHTTP(w, r.WithContext(whoIsKey.WithValue(r.Context(), who)))
return
}
ksr.CounterSessionRecordingsAttempted.Add(1) // at this point we know that users intended for this session to be recorded
wantsHeader := upgradeHeaderForProto[proto] wantsHeader := upgradeHeaderForProto[proto]
if h := r.Header.Get(upgradeHeaderKey); h != wantsHeader { if h := r.Header.Get(upgradeHeaderKey); h != wantsHeader {
msg := fmt.Sprintf("[unexpected] unable to verify that streaming protocol is %s, wants Upgrade header %q, got: %q", proto, wantsHeader, h) msg := fmt.Sprintf("[unexpected] unable to verify that streaming protocol is %s, wants Upgrade header %q, got: %q", proto, wantsHeader, h)
if failOpen { if c.failOpen {
msg = msg + "; failure mode is 'fail open'; continuing session without recording." msg = msg + "; failure mode is 'fail open'; continuing session without recording."
ap.log.Warn(msg) ap.log.Warn(msg)
ap.rp.ServeHTTP(w, r.WithContext(whoIsKey.WithValue(r.Context(), who))) ap.rp.ServeHTTP(w, r.WithContext(whoIsKey.WithValue(r.Context(), who)))
@@ -303,8 +343,8 @@ func (ap *APIServerProxy) sessionForProto(w http.ResponseWriter, r *http.Request
SessionType: sessionType, SessionType: sessionType,
TS: ap.ts, TS: ap.ts,
Who: who, Who: who,
Addrs: addrs, Addrs: c.recorderAddresses,
FailOpen: failOpen, FailOpen: c.failOpen,
Pod: r.PathValue(podNameKey), Pod: r.PathValue(podNameKey),
Namespace: r.PathValue(namespaceNameKey), Namespace: r.PathValue(namespaceNameKey),
Log: ap.log, Log: ap.log,
@@ -314,21 +354,9 @@ func (ap *APIServerProxy) sessionForProto(w http.ResponseWriter, r *http.Request
ap.rp.ServeHTTP(h, r.WithContext(whoIsKey.WithValue(r.Context(), who))) ap.rp.ServeHTTP(h, r.WithContext(whoIsKey.WithValue(r.Context(), who)))
} }
func (ap *APIServerProxy) recordRequestAsEvent(req *http.Request, who *apitype.WhoIsResponse) error { func (ap *APIServerProxy) recordRequestAsEvent(req *http.Request, who *apitype.WhoIsResponse, addrs []netip.AddrPort, failOpen bool) error {
if !ap.eventsEnabled {
return nil
}
failOpen, addrs, err := determineRecorderConfig(who)
if err != nil {
return fmt.Errorf("error trying to determine whether the kubernetes api request needs to be recorded: %w", err)
}
if len(addrs) == 0 { if len(addrs) == 0 {
if failOpen { return fmt.Errorf("no recorder addresses specified")
return nil
} else {
return fmt.Errorf("forbidden: kubernetes api request must be recorded, but no recorders are available")
}
} }
factory := &request.RequestInfoFactory{ factory := &request.RequestInfoFactory{
@@ -537,20 +565,30 @@ func addImpersonationHeaders(r *http.Request, log *zap.SugaredLogger) error {
return nil return nil
} }
type recorderConfig struct {
failOpen bool
enableEvents bool
enableRecordings bool
recorderAddresses []netip.AddrPort
}
// determineRecorderConfig determines recorder config from requester's peer // determineRecorderConfig determines recorder config from requester's peer
// capabilities. Determines whether a 'kubectl exec' session from this requester // capabilities. Determines whether a 'kubectl exec' session from this requester
// needs to be recorded and what recorders the recording should be sent to. // needs to be recorded and what recorders the recording should be sent to.
func determineRecorderConfig(who *apitype.WhoIsResponse) (failOpen bool, recorderAddresses []netip.AddrPort, _ error) { func determineRecorderConfig(who *apitype.WhoIsResponse) (c recorderConfig, _ error) {
if who == nil { if who == nil {
return false, nil, errors.New("[unexpected] cannot determine caller") return c, errors.New("[unexpected] cannot determine caller")
} }
failOpen = true
c.failOpen = true
c.enableEvents = false
c.enableRecordings = true
rules, err := tailcfg.UnmarshalCapJSON[kubetypes.KubernetesCapRule](who.CapMap, tailcfg.PeerCapabilityKubernetes) rules, err := tailcfg.UnmarshalCapJSON[kubetypes.KubernetesCapRule](who.CapMap, tailcfg.PeerCapabilityKubernetes)
if err != nil { if err != nil {
return failOpen, nil, fmt.Errorf("failed to unmarshal Kubernetes capability: %w", err) return c, fmt.Errorf("failed to unmarshal Kubernetes capability: %w", err)
} }
if len(rules) == 0 { if len(rules) == 0 {
return failOpen, nil, nil return c, nil
} }
for _, rule := range rules { for _, rule := range rules {
@@ -559,13 +597,19 @@ func determineRecorderConfig(who *apitype.WhoIsResponse) (failOpen bool, recorde
// recorders behind those addrs are online - else we // recorders behind those addrs are online - else we
// spend 30s trying to reach a recorder whose tailscale // spend 30s trying to reach a recorder whose tailscale
// status is offline. // status is offline.
recorderAddresses = append(recorderAddresses, rule.RecorderAddrs...) c.recorderAddresses = append(c.recorderAddresses, rule.RecorderAddrs...)
} }
if rule.EnforceRecorder { if rule.EnforceRecorder {
failOpen = false c.failOpen = false
}
if rule.EnableEvents {
c.enableEvents = true
}
if rule.EnableSessionRecordings {
c.enableRecordings = true
} }
} }
return failOpen, recorderAddresses, nil return c, nil
} }
var upgradeHeaderForProto = map[ksr.Protocol]string{ var upgradeHeaderForProto = map[ksr.Protocol]string{
+15 -3
View File
@@ -61,7 +61,6 @@ func TestRecordRequestAsEvent(t *testing.T) {
log: zl.Sugar(), log: zl.Sugar(),
ts: &tsnet.Server{}, ts: &tsnet.Server{},
sendEventFunc: sender.Send, sendEventFunc: sender.Send,
eventsEnabled: true,
} }
defaultWho := &apitype.WhoIsResponse{ defaultWho := &apitype.WhoIsResponse{
@@ -76,7 +75,7 @@ func TestRecordRequestAsEvent(t *testing.T) {
CapMap: tailcfg.PeerCapMap{ CapMap: tailcfg.PeerCapMap{
tailcfg.PeerCapabilityKubernetes: []tailcfg.RawMessage{ tailcfg.PeerCapabilityKubernetes: []tailcfg.RawMessage{
tailcfg.RawMessage(`{"recorderAddrs":["127.0.0.1:1234"]}`), tailcfg.RawMessage(`{"recorderAddrs":["127.0.0.1:1234"]}`),
tailcfg.RawMessage(`{"enforceRecorder": true}`), tailcfg.RawMessage(`{"enforceRecorder": true, "enableEvents": true}`),
}, },
}, },
} }
@@ -310,6 +309,7 @@ func TestRecordRequestAsEvent(t *testing.T) {
CapMap: tailcfg.PeerCapMap{ CapMap: tailcfg.PeerCapMap{
tailcfg.PeerCapabilityKubernetes: []tailcfg.RawMessage{ tailcfg.PeerCapabilityKubernetes: []tailcfg.RawMessage{
tailcfg.RawMessage(`{"recorderAddrs":["127.0.0.1:1234", "127.0.0.1:5678"]}`), tailcfg.RawMessage(`{"recorderAddrs":["127.0.0.1:1234", "127.0.0.1:5678"]}`),
tailcfg.RawMessage(`{"enforceRecorder": true, "enableEvents": true}`),
}, },
}, },
}, },
@@ -398,6 +398,7 @@ func TestRecordRequestAsEvent(t *testing.T) {
}, },
setupSender: func() { sender.Reset() }, setupSender: func() { sender.Reset() },
wantNumCalls: 0, wantNumCalls: 0,
wantErr: true,
}, },
{ {
name: "error-sending", name: "error-sending",
@@ -510,8 +511,19 @@ func TestRecordRequestAsEvent(t *testing.T) {
tt.setupSender() tt.setupSender()
req := tt.req() req := tt.req()
err := ap.recordRequestAsEvent(req, tt.who)
c, err := determineRecorderConfig(tt.who)
if err != nil {
t.Fatalf("error trying to determine whether the kubernetes api request %q needs to be recorded: %v", req.URL.String(), err)
return
}
if !c.enableEvents && tt.wantEvent != nil {
t.Errorf("expected event but events not enabled in CapMap. Want: %#v", tt.wantEvent)
return
}
err = ap.recordRequestAsEvent(req, tt.who, c.recorderAddresses, c.failOpen)
if (err != nil) != tt.wantErr { if (err != nil) != tt.wantErr {
t.Fatalf("recordRequestAsEvent() error = %v, wantErr %v", err, tt.wantErr) t.Fatalf("recordRequestAsEvent() error = %v, wantErr %v", err, tt.wantErr)
} }
+5 -5
View File
@@ -166,15 +166,15 @@ func Test_determineRecorderConfig(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
gotFailOpen, gotRecorderAddresses, err := determineRecorderConfig(tt.who) c, err := determineRecorderConfig(tt.who)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
if gotFailOpen != tt.wantFailOpen { if c.failOpen != tt.wantFailOpen {
t.Errorf("determineRecorderConfig() gotFailOpen = %v, want %v", gotFailOpen, tt.wantFailOpen) t.Errorf("determineRecorderConfig() gotFailOpen = %v, want %v", c.failOpen, tt.wantFailOpen)
} }
if !reflect.DeepEqual(gotRecorderAddresses, tt.wantRecorderAddresses) { if !reflect.DeepEqual(c.recorderAddresses, tt.wantRecorderAddresses) {
t.Errorf("determineRecorderConfig() gotRecorderAddresses = %v, want %v", gotRecorderAddresses, tt.wantRecorderAddresses) t.Errorf("determineRecorderConfig() gotRecorderAddresses = %v, want %v", c.recorderAddresses, tt.wantRecorderAddresses)
} }
}) })
} }
@@ -52,6 +52,8 @@ var (
// CounterSessionRecordingsAttempted counts the number of session recording attempts. // CounterSessionRecordingsAttempted counts the number of session recording attempts.
CounterSessionRecordingsAttempted = clientmetric.NewCounter("k8s_auth_proxy_session_recordings_attempted") CounterSessionRecordingsAttempted = clientmetric.NewCounter("k8s_auth_proxy_session_recordings_attempted")
CounterKubernetesAPIRequestEventsAttempted = clientmetric.NewCounter("k8s_auth_proxy_api_request_event_recording_attempted")
// counterSessionRecordingsUploaded counts the number of successfully uploaded session recordings. // counterSessionRecordingsUploaded counts the number of successfully uploaded session recordings.
counterSessionRecordingsUploaded = clientmetric.NewCounter("k8s_auth_proxy_session_recordings_uploaded") counterSessionRecordingsUploaded = clientmetric.NewCounter("k8s_auth_proxy_session_recordings_uploaded")
) )
+9 -1
View File
@@ -38,8 +38,16 @@ type KubernetesCapRule struct {
// Default is to fail open. // Default is to fail open.
// The field name matches `EnforceRecorder` field with equal semantics for Tailscale SSH // The field name matches `EnforceRecorder` field with equal semantics for Tailscale SSH
// session recorder. // session recorder.
// https://tailscale.com/kb/1246/tailscale-ssh-session-recording#turn-on-session-recording-in-acls // https://tailscale.com/kb/1246/tailscale-ssh-session-recording#turn-on-session-recording-in-your-tailnet-policy-file
EnforceRecorder bool `json:"enforceRecorder,omitempty"` EnforceRecorder bool `json:"enforceRecorder,omitempty"`
// EnableEvents defines whether kubectl API request events (beta)
// should be recorded or not.
// https://tailscale.com/kb/1246/tailscale-ssh-session-recording#turn-on-session-recording-in-your-tailnet-policy-file
EnableEvents bool `json:"enableEvents,omitempty"`
// EnableSessionRecordings defines whether kubectl sessions
// (e.g., exec, attach) should be recorded or not.
// https://tailscale.com/kb/1246/tailscale-ssh-session-recording#turn-on-session-recording-in-your-tailnet-policy-file
EnableSessionRecordings bool `json:"enableSessionRecordings,omitempty"`
} }
// ImpersonateRule defines how a request from the tailnet identity matching // ImpersonateRule defines how a request from the tailnet identity matching