appc,feature/conn25: conn25: send address assignments to connector

After we intercept a DNS response and assign magic and transit addresses
we must communicate the assignment to our connector so that it can
direct traffic when it arrives.

Use the recently added peerapi endpoint to send the addresses.

Updates tailscale/corp#34258
Signed-off-by: Fran Bull <fran@tailscale.com>
This commit is contained in:
Fran Bull
2026-02-27 14:31:43 -08:00
committed by franbull
parent 6a19995f13
commit a4614d7d17
10 changed files with 486 additions and 37 deletions
+149 -18
View File
@@ -8,8 +8,12 @@
package conn25
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/netip"
"slices"
@@ -17,6 +21,7 @@ import (
"go4.org/netipx"
"golang.org/x/net/dns/dnsmessage"
"tailscale.com/appc"
"tailscale.com/feature"
"tailscale.com/ipn/ipnext"
"tailscale.com/ipn/ipnlocal"
@@ -33,16 +38,30 @@ import (
// It is also the [extension] name and the log prefix.
const featureName = "conn25"
const maxBodyBytes = 1024 * 1024
// jsonDecode decodes all of a io.ReadCloser (eg an http.Request Body) into one pointer with best practices.
// It limits the size of bytes it will read.
// It either decodes all of the bytes into the pointer, or errors (unlike json.Decoder.Decode).
// It closes the ReadCloser after reading.
func jsonDecode(target any, rc io.ReadCloser) error {
defer rc.Close()
respBs, err := io.ReadAll(io.LimitReader(rc, maxBodyBytes+1))
if err != nil {
return err
}
err = json.Unmarshal(respBs, &target)
return err
}
func init() {
feature.Register(featureName)
newExtension := func(logf logger.Logf, sb ipnext.SafeBackend) (ipnext.Extension, error) {
e := &extension{
ipnext.RegisterExtension(featureName, func(logf logger.Logf, sb ipnext.SafeBackend) (ipnext.Extension, error) {
return &extension{
conn25: newConn25(logger.WithPrefix(logf, "conn25: ")),
backend: sb,
}
return e, nil
}
ipnext.RegisterExtension(featureName, newExtension)
}, nil
})
ipnlocal.RegisterPeerAPIHandler("/v0/connector/transit-ip", handleConnectorTransitIP)
}
@@ -61,6 +80,9 @@ type extension struct {
conn25 *Conn25 // safe for concurrent access and only set at creation
backend ipnext.SafeBackend // safe for concurrent access and only set at creation
host ipnext.Host // set in Init, read-only after
ctxCancel context.CancelCauseFunc // cancels sendLoop goroutine
mu sync.Mutex // protects the fields below
isDNSHookRegistered bool
}
@@ -72,17 +94,32 @@ func (e *extension) Name() string {
// Init implements [ipnext.Extension].
func (e *extension) Init(host ipnext.Host) error {
//Init only once
e.mu.Lock()
defer e.mu.Unlock()
if e.ctxCancel != nil {
return nil
}
e.host = host
host.Hooks().OnSelfChange.Add(e.onSelfChange)
ctx, cancel := context.WithCancelCause(context.Background())
e.ctxCancel = cancel
go e.sendLoop(ctx)
return nil
}
// Shutdown implements [ipnlocal.Extension].
func (e *extension) Shutdown() error {
if e.ctxCancel != nil {
e.ctxCancel(errors.New("extension shutdown"))
}
if e.conn25 != nil {
close(e.conn25.client.addrsCh)
}
return nil
}
func (e *extension) handleConnectorTransitIP(h ipnlocal.PeerAPIHandler, w http.ResponseWriter, r *http.Request) {
const maxBodyBytes = 1024 * 1024
defer r.Body.Close()
if r.Method != "POST" {
http.Error(w, "Method should be POST", http.StatusMethodNotAllowed)
@@ -172,7 +209,10 @@ func (c *Conn25) isConfigured() bool {
func newConn25(logf logger.Logf) *Conn25 {
c := &Conn25{
client: &client{logf: logf},
client: &client{
logf: logf,
addrsCh: make(chan addrs, 64),
},
connector: &connector{logf: logf},
}
return c
@@ -310,7 +350,8 @@ const AppConnectorsExperimentalAttrName = "tailscale.com/app-connectors-experime
type config struct {
isConfigured bool
apps []appctype.Conn25Attr
appsByDomain map[dnsname.FQDN][]string
appsByName map[string]appctype.Conn25Attr
appNamesByDomain map[dnsname.FQDN][]string
selfRoutedDomains set.Set[dnsname.FQDN]
}
@@ -326,7 +367,8 @@ func configFromNodeView(n tailcfg.NodeView) (config, error) {
cfg := config{
isConfigured: true,
apps: apps,
appsByDomain: map[dnsname.FQDN][]string{},
appsByName: map[string]appctype.Conn25Attr{},
appNamesByDomain: map[dnsname.FQDN][]string{},
selfRoutedDomains: set.Set[dnsname.FQDN]{},
}
for _, app := range apps {
@@ -336,11 +378,12 @@ func configFromNodeView(n tailcfg.NodeView) (config, error) {
if err != nil {
return config{}, err
}
mak.Set(&cfg.appsByDomain, fqdn, append(cfg.appsByDomain[fqdn], app.Name))
mak.Set(&cfg.appNamesByDomain, fqdn, append(cfg.appNamesByDomain[fqdn], app.Name))
if selfMatchesTags {
cfg.selfRoutedDomains.Add(fqdn)
}
}
mak.Set(&cfg.appsByName, app.Name, app)
}
return cfg, nil
}
@@ -350,7 +393,8 @@ func configFromNodeView(n tailcfg.NodeView) (config, error) {
// connectors.
// It's safe for concurrent use.
type client struct {
logf logger.Logf
logf logger.Logf
addrsCh chan addrs
mu sync.Mutex // protects the fields below
magicIPPool *ippool
@@ -402,7 +446,7 @@ func (c *client) reconfig(newCfg config) error {
func (c *client) isConnectorDomain(domain dnsname.FQDN) bool {
c.mu.Lock()
defer c.mu.Unlock()
appNames, ok := c.config.appsByDomain[domain]
appNames, ok := c.config.appNamesByDomain[domain]
return ok && len(appNames) > 0
}
@@ -416,7 +460,7 @@ func (c *client) reserveAddresses(domain dnsname.FQDN, dst netip.Addr) (addrs, e
if existing, ok := c.assignments.lookupByDomainDst(domain, dst); ok {
return existing, nil
}
appNames, _ := c.config.appsByDomain[domain]
appNames, _ := c.config.appNamesByDomain[domain]
// only reserve for first app
app := appNames[0]
mip, err := c.magicIPPool.next()
@@ -437,12 +481,100 @@ func (c *client) reserveAddresses(domain dnsname.FQDN, dst netip.Addr) (addrs, e
if err := c.assignments.insert(as); err != nil {
return addrs{}, err
}
err = c.enqueueAddressAssignment(as)
if err != nil {
return addrs{}, err
}
return as, nil
}
func (c *client) enqueueAddressAssignment(addrs addrs) {
// TODO(fran) 2026-02-03 asynchronously send peerapi req to connector to
// allocate these addresses for us.
func (e *extension) sendLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case as := <-e.conn25.client.addrsCh:
if err := e.sendAddressAssignment(ctx, as); err != nil {
e.conn25.client.logf("error sending transit IP assignment (app: %s, mip: %v, src: %v): %v", as.app, as.magic, as.dst, err)
}
}
}
}
func (c *client) enqueueAddressAssignment(addrs addrs) error {
select {
// TODO(fran) investigate the value of waiting for multiple addresses and sending them
// in one ConnectorTransitIPRequest
case c.addrsCh <- addrs:
return nil
default:
c.logf("address assignment queue full, dropping transit assignment for %v", addrs.domain)
return errors.New("queue full")
}
}
func makePeerAPIReq(ctx context.Context, httpClient *http.Client, urlBase string, as addrs) error {
url := urlBase + "/v0/connector/transit-ip"
reqBody := ConnectorTransitIPRequest{
TransitIPs: []TransitIPRequest{{
TransitIP: as.transit,
DestinationIP: as.dst,
App: as.app,
}},
}
bs, err := json.Marshal(reqBody)
if err != nil {
return fmt.Errorf("marshalling request: %w", err)
}
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(bs))
if err != nil {
return fmt.Errorf("creating request: %w", err)
}
resp, err := httpClient.Do(req)
if err != nil {
return fmt.Errorf("sending request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("connector returned HTTP %d", resp.StatusCode)
}
var respBody ConnectorTransitIPResponse
err = jsonDecode(&respBody, resp.Body)
if err != nil {
return fmt.Errorf("decoding response: %w", err)
}
if len(respBody.TransitIPs) > 0 && respBody.TransitIPs[0].Code != OK {
return fmt.Errorf("connector error: %s", respBody.TransitIPs[0].Message)
}
return nil
}
func (e *extension) sendAddressAssignment(ctx context.Context, as addrs) error {
app, ok := e.conn25.client.config.appsByName[as.app]
if !ok {
e.conn25.client.logf("App not found for app: %s (domain: %s)", as.app, as.domain)
return errors.New("app not found")
}
nb := e.host.NodeBackend()
peers := appc.PickConnector(nb, app)
var urlBase string
for _, p := range peers {
urlBase = nb.PeerAPIBase(p)
if urlBase != "" {
break
}
}
if urlBase == "" {
return errors.New("no connector peer found to handle address assignment")
}
client := e.backend.Sys().Dialer.Get().PeerAPIHTTPClient()
return makePeerAPIReq(ctx, client, urlBase, as)
}
func (c *client) mapDNSResponse(buf []byte) []byte {
@@ -501,7 +633,6 @@ func (c *client) mapDNSResponse(buf []byte) []byte {
c.logf("assigned connector addresses unexpectedly empty: %v", err)
return buf
}
c.enqueueAddressAssignment(addrs)
default:
if err := p.SkipAnswer(); err != nil {
c.logf("error parsing dns response: %v", err)