cmd/k8s-operator: migrate to tailscale-client-go-v2 (#19010)
This commit modifies the kubernetes operator to use the `tailscale-client-go-v2` package instead of the internal tailscale client it was previously using. This now gives us the ability to expand out custom resources and features as they become available via the API module. The tailnet reconciler has also been modified to manage clients as tailnets are created and removed, providing each subsequent reconciler with a single `ClientProvider` that obtains a tailscale client for the respective tailnet by name, or the operator's default when presented with a blank string. Fixes: https://github.com/tailscale/corp/issues/38418 Signed-off-by: David Bond <davidsbond93@gmail.com>
This commit is contained in:
@@ -4,7 +4,6 @@
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"crypto/tls"
|
||||
@@ -53,12 +52,13 @@ import (
|
||||
"sigs.k8s.io/kind/pkg/cluster/nodeutils"
|
||||
"sigs.k8s.io/kind/pkg/cmd"
|
||||
|
||||
"tailscale.com/internal/client/tailscale"
|
||||
"tailscale.com/client/tailscale/v2"
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/ipn/store/mem"
|
||||
tsoperator "tailscale.com/k8s-operator"
|
||||
tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
|
||||
"tailscale.com/tsnet"
|
||||
"tailscale.com/util/must"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -106,7 +106,8 @@ func runTests(m *testing.M) (int, error) {
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if err := os.MkdirAll(tmp, 0755); err != nil {
|
||||
|
||||
if err = os.MkdirAll(tmp, 0755); err != nil {
|
||||
return 0, fmt.Errorf("failed to create temp dir: %w", err)
|
||||
}
|
||||
|
||||
@@ -122,10 +123,12 @@ func runTests(m *testing.M) (int, error) {
|
||||
kindProvider = cluster.NewProvider(
|
||||
cluster.ProviderWithLogger(cmd.NewLogger()),
|
||||
)
|
||||
|
||||
clusters, err := kindProvider.List()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to list kind clusters: %w", err)
|
||||
}
|
||||
|
||||
if !slices.Contains(clusters, kindClusterName) {
|
||||
if err := kindProvider.Create(kindClusterName,
|
||||
cluster.CreateWithWaitForReady(5*time.Minute),
|
||||
@@ -147,6 +150,7 @@ func runTests(m *testing.M) (int, error) {
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("error loading kubeconfig: %w", err)
|
||||
}
|
||||
|
||||
kubeClient, err = client.NewWithWatch(restCfg, client.Options{Scheme: tsapi.GlobalScheme})
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("error creating Kubernetes client: %w", err)
|
||||
@@ -157,24 +161,28 @@ func runTests(m *testing.M) (int, error) {
|
||||
clientID, clientSecret string // OAuth client for the operator to use.
|
||||
caPaths []string // Extra CA cert file paths to add to images.
|
||||
|
||||
certsDir string = filepath.Join(tmp, "certs") // Directory containing extra CA certs to add to images.
|
||||
certsDir = filepath.Join(tmp, "certs") // Directory containing extra CA certs to add to images.
|
||||
)
|
||||
if *fDevcontrol {
|
||||
// Deploy pebble and get its certs.
|
||||
if err := applyPebbleResources(ctx, kubeClient); err != nil {
|
||||
if err = applyPebbleResources(ctx, kubeClient); err != nil {
|
||||
return 0, fmt.Errorf("failed to apply pebble resources: %w", err)
|
||||
}
|
||||
|
||||
pebblePod, err := waitForPodReady(ctx, logger, kubeClient, ns, client.MatchingLabels{"app": "pebble"})
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("pebble pod not ready: %w", err)
|
||||
}
|
||||
if err := forwardLocalPortToPod(ctx, logger, restCfg, ns, pebblePod, 15000); err != nil {
|
||||
|
||||
if err = forwardLocalPortToPod(ctx, logger, restCfg, ns, pebblePod, 15000); err != nil {
|
||||
return 0, fmt.Errorf("failed to set up port forwarding to pebble: %w", err)
|
||||
}
|
||||
|
||||
testCAs = x509.NewCertPool()
|
||||
if ok := testCAs.AppendCertsFromPEM(pebbleMiniCACert); !ok {
|
||||
return 0, fmt.Errorf("failed to parse pebble minica cert")
|
||||
}
|
||||
|
||||
var pebbleCAChain []byte
|
||||
for _, path := range []string{"/intermediates/0", "/roots/0"} {
|
||||
pem, err := pebbleGet(ctx, 15000, path)
|
||||
@@ -183,20 +191,25 @@ func runTests(m *testing.M) (int, error) {
|
||||
}
|
||||
pebbleCAChain = append(pebbleCAChain, pem...)
|
||||
}
|
||||
|
||||
if ok := testCAs.AppendCertsFromPEM(pebbleCAChain); !ok {
|
||||
return 0, fmt.Errorf("failed to parse pebble ca chain cert")
|
||||
}
|
||||
if err := os.MkdirAll(certsDir, 0755); err != nil {
|
||||
|
||||
if err = os.MkdirAll(certsDir, 0755); err != nil {
|
||||
return 0, fmt.Errorf("failed to create certs dir: %w", err)
|
||||
}
|
||||
|
||||
pebbleCAChainPath := filepath.Join(certsDir, "pebble-ca-chain.crt")
|
||||
if err := os.WriteFile(pebbleCAChainPath, pebbleCAChain, 0644); err != nil {
|
||||
if err = os.WriteFile(pebbleCAChainPath, pebbleCAChain, 0644); err != nil {
|
||||
return 0, fmt.Errorf("failed to write pebble CA chain: %w", err)
|
||||
}
|
||||
|
||||
pebbleMiniCACertPath := filepath.Join(certsDir, "pebble.minica.crt")
|
||||
if err := os.WriteFile(pebbleMiniCACertPath, pebbleMiniCACert, 0644); err != nil {
|
||||
if err = os.WriteFile(pebbleMiniCACertPath, pebbleMiniCACert, 0644); err != nil {
|
||||
return 0, fmt.Errorf("failed to write pebble minica: %w", err)
|
||||
}
|
||||
|
||||
caPaths = []string{pebbleCAChainPath, pebbleMiniCACertPath}
|
||||
if !*fSkipCleanup {
|
||||
defer os.RemoveAll(certsDir)
|
||||
@@ -210,13 +223,15 @@ func runTests(m *testing.M) (int, error) {
|
||||
// For Pods -> devcontrol (tailscale clients joining the tailnet):
|
||||
// * Create ssh-server Deployment in cluster.
|
||||
// * Create reverse ssh tunnel that goes from ssh-server port 31544 to localhost:31544.
|
||||
if err := forwardLocalPortToPod(ctx, logger, restCfg, ns, pebblePod, 8055); err != nil {
|
||||
if err = forwardLocalPortToPod(ctx, logger, restCfg, ns, pebblePod, 8055); err != nil {
|
||||
return 0, fmt.Errorf("failed to set up port forwarding to pebble: %w", err)
|
||||
}
|
||||
|
||||
privateKey, publicKey, err := readOrGenerateSSHKey(tmp)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to read or generate SSH key: %w", err)
|
||||
}
|
||||
|
||||
if !*fSkipCleanup {
|
||||
defer os.Remove(privateKeyPath)
|
||||
}
|
||||
@@ -225,6 +240,7 @@ func runTests(m *testing.M) (int, error) {
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to set up cluster->devcontrol connection: %w", err)
|
||||
}
|
||||
|
||||
if !*fSkipCleanup {
|
||||
defer func() {
|
||||
if err := cleanupSSHResources(context.Background(), kubeClient); err != nil {
|
||||
@@ -245,7 +261,7 @@ func runTests(m *testing.M) (int, error) {
|
||||
var apiKeyData struct {
|
||||
APIKey string `json:"apiKey"`
|
||||
}
|
||||
if err := json.Unmarshal(b, &apiKeyData); err != nil {
|
||||
if err = json.Unmarshal(b, &apiKeyData); err != nil {
|
||||
return 0, fmt.Errorf("failed to parse api-key.json: %w", err)
|
||||
}
|
||||
if apiKeyData.APIKey == "" {
|
||||
@@ -253,48 +269,27 @@ func runTests(m *testing.M) (int, error) {
|
||||
}
|
||||
|
||||
// Finish setting up tsClient.
|
||||
tsClient = tailscale.NewClient("-", tailscale.APIKey(apiKeyData.APIKey))
|
||||
tsClient.BaseURL = "http://localhost:31544"
|
||||
tsClient = &tailscale.Client{
|
||||
APIKey: apiKeyData.APIKey,
|
||||
BaseURL: must.Get(url.Parse("http://localhost:31544")),
|
||||
}
|
||||
|
||||
// Set ACLs and create OAuth client.
|
||||
req, _ := http.NewRequest("POST", tsClient.BuildTailnetURL("acl"), bytes.NewReader(requiredACLs))
|
||||
resp, err := tsClient.Do(req)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to set ACLs: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
b, _ := io.ReadAll(resp.Body)
|
||||
return 0, fmt.Errorf("HTTP %d setting ACLs: %s", resp.StatusCode, string(b))
|
||||
if err = tsClient.PolicyFile().Set(ctx, string(requiredACLs), ""); err != nil {
|
||||
return 0, fmt.Errorf("failed to set policy file: %w", err)
|
||||
}
|
||||
|
||||
logger.Infof("ACLs configured")
|
||||
|
||||
reqBody, err := json.Marshal(map[string]any{
|
||||
"keyType": "client",
|
||||
"scopes": []string{"auth_keys", "devices:core", "services"},
|
||||
"tags": []string{"tag:k8s-operator"},
|
||||
"description": "k8s-operator client for e2e tests",
|
||||
key, err := tsClient.Keys().CreateOAuthClient(ctx, tailscale.CreateOAuthClientRequest{
|
||||
Scopes: []string{"auth_keys", "devices:core", "services"},
|
||||
Tags: []string{"tag:k8s-operator"},
|
||||
Description: "k8s-operator client for e2e tests",
|
||||
})
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to marshal OAuth client creation request: %w", err)
|
||||
}
|
||||
req, _ = http.NewRequest("POST", tsClient.BuildTailnetURL("keys"), bytes.NewReader(reqBody))
|
||||
resp, err = tsClient.Do(req)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to create OAuth client: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
b, _ := io.ReadAll(resp.Body)
|
||||
return 0, fmt.Errorf("HTTP %d creating OAuth client: %s", resp.StatusCode, string(b))
|
||||
}
|
||||
var key struct {
|
||||
ID string `json:"id"`
|
||||
Key string `json:"key"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&key); err != nil {
|
||||
return 0, fmt.Errorf("failed to decode OAuth client creation response: %w", err)
|
||||
}
|
||||
|
||||
clientID = key.ID
|
||||
clientSecret = key.Key
|
||||
} else {
|
||||
@@ -320,7 +315,9 @@ func runTests(m *testing.M) (int, error) {
|
||||
}
|
||||
// An access token will last for an hour which is plenty of time for
|
||||
// the tests to run. No need for token refresh logic.
|
||||
tsClient = tailscale.NewClient("-", tailscale.APIKey(tk.AccessToken))
|
||||
tsClient = &tailscale.Client{
|
||||
APIKey: tk.AccessToken,
|
||||
}
|
||||
}
|
||||
|
||||
var ossTag string
|
||||
@@ -447,18 +444,18 @@ func runTests(m *testing.M) (int, error) {
|
||||
caps.Devices.Create.Ephemeral = true
|
||||
caps.Devices.Create.Tags = []string{"tag:k8s"}
|
||||
|
||||
authKey, authKeyMeta, err := tsClient.CreateKey(ctx, caps)
|
||||
authKey, err := tsClient.Keys().CreateAuthKey(ctx, tailscale.CreateKeyRequest{Capabilities: caps})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer tsClient.DeleteKey(context.Background(), authKeyMeta.ID)
|
||||
defer tsClient.Keys().Delete(context.Background(), authKey.ID)
|
||||
|
||||
tnClient = &tsnet.Server{
|
||||
ControlURL: tsClient.BaseURL,
|
||||
ControlURL: tsClient.BaseURL.String(),
|
||||
Hostname: "test-proxy",
|
||||
Ephemeral: true,
|
||||
Store: &mem.Store{},
|
||||
AuthKey: authKey,
|
||||
AuthKey: authKey.Key,
|
||||
}
|
||||
_, err = tnClient.Up(ctx)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user