appc,ipn/ipnlocal: receive AppConnector updates via the event bus (#17411)
Add subscribers for AppConnector events Make the RouteAdvertiser interface optional We cannot yet remove it because the tests still depend on it to verify correctness. We will need to separately update the test fixtures to remove that dependency. Publish RouteInfo via the event bus, so we do not need a callback to do that. Replace it with a flag that indicates whether to treat the route info the connector has as "definitive" for filtering purposes. Update the tests to simplify the construction of AppConnector values now that a store callback is no longer required. Also fix a couple of pre-existing racy tests that were hidden by not being concurrent in the same way production is. Updates #15160 Updates #17192 Change-Id: Id39525c0f02184e88feaf0d8a3c05504850e47ee Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
This commit is contained in:
+38
-23
@@ -592,6 +592,8 @@ func (b *LocalBackend) consumeEventbusTopics(ec *eventbus.Client) func(*eventbus
|
||||
healthChange = healthChangeSub.Events()
|
||||
}
|
||||
changeDeltaSub := eventbus.Subscribe[netmon.ChangeDelta](ec)
|
||||
routeUpdateSub := eventbus.Subscribe[appctype.RouteUpdate](ec)
|
||||
storeRoutesSub := eventbus.Subscribe[appctype.RouteInfo](ec)
|
||||
|
||||
var portlist <-chan PortlistServices
|
||||
if buildfeatures.HasPortList {
|
||||
@@ -612,10 +614,31 @@ func (b *LocalBackend) consumeEventbusTopics(ec *eventbus.Client) func(*eventbus
|
||||
b.onHealthChange(change)
|
||||
case changeDelta := <-changeDeltaSub.Events():
|
||||
b.linkChange(&changeDelta)
|
||||
|
||||
case pl := <-portlist:
|
||||
if buildfeatures.HasPortList { // redundant, but explicit for linker deadcode and humans
|
||||
b.setPortlistServices(pl)
|
||||
}
|
||||
case ru := <-routeUpdateSub.Events():
|
||||
// TODO(creachadair, 2025-10-02): It is currently possible for updates produced under
|
||||
// one profile to arrive and be applied after a switch to another profile.
|
||||
// We need to find a way to ensure that changes to the backend state are applied
|
||||
// consistently in the presnce of profile changes, which currently may not happen in
|
||||
// a single atomic step. See: https://github.com/tailscale/tailscale/issues/17414
|
||||
if err := b.AdvertiseRoute(ru.Advertise...); err != nil {
|
||||
b.logf("appc: failed to advertise routes: %v: %v", ru.Advertise, err)
|
||||
}
|
||||
if err := b.UnadvertiseRoute(ru.Unadvertise...); err != nil {
|
||||
b.logf("appc: failed to unadvertise routes: %v: %v", ru.Unadvertise, err)
|
||||
}
|
||||
case ri := <-storeRoutesSub.Events():
|
||||
// Whether or not routes should be stored can change over time.
|
||||
shouldStoreRoutes := b.ControlKnobs().AppCStoreRoutes.Load()
|
||||
if shouldStoreRoutes {
|
||||
if err := b.storeRouteInfo(ri); err != nil {
|
||||
b.logf("appc: failed to store route info: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4836,35 +4859,27 @@ func (b *LocalBackend) reconfigAppConnectorLocked(nm *netmap.NetworkMap, prefs i
|
||||
}
|
||||
}()
|
||||
|
||||
// App connectors have been disabled.
|
||||
if !prefs.AppConnector().Advertise {
|
||||
b.appConnector.Close() // clean up a previous connector (safe on nil)
|
||||
b.appConnector = nil
|
||||
return
|
||||
}
|
||||
|
||||
shouldAppCStoreRoutes := b.ControlKnobs().AppCStoreRoutes.Load()
|
||||
if b.appConnector == nil || b.appConnector.ShouldStoreRoutes() != shouldAppCStoreRoutes {
|
||||
var ri *appctype.RouteInfo
|
||||
var storeFunc func(*appctype.RouteInfo) error
|
||||
if shouldAppCStoreRoutes {
|
||||
var err error
|
||||
ri, err = b.readRouteInfoLocked()
|
||||
if err != nil {
|
||||
ri = &appctype.RouteInfo{}
|
||||
if err != ipn.ErrStateNotExist {
|
||||
b.logf("Unsuccessful Read RouteInfo: ", err)
|
||||
}
|
||||
}
|
||||
storeFunc = b.storeRouteInfo
|
||||
// We don't (yet) have an app connector configured, or the configured
|
||||
// connector has a different route persistence setting.
|
||||
shouldStoreRoutes := b.ControlKnobs().AppCStoreRoutes.Load()
|
||||
if b.appConnector == nil || (shouldStoreRoutes != b.appConnector.ShouldStoreRoutes()) {
|
||||
ri, err := b.readRouteInfoLocked()
|
||||
if err != nil && err != ipn.ErrStateNotExist {
|
||||
b.logf("Unsuccessful Read RouteInfo: %v", err)
|
||||
}
|
||||
|
||||
b.appConnector.Close() // clean up a previous connector (safe on nil)
|
||||
b.appConnector = appc.NewAppConnector(appc.Config{
|
||||
Logf: b.logf,
|
||||
EventBus: b.sys.Bus.Get(),
|
||||
RouteAdvertiser: b,
|
||||
RouteInfo: ri,
|
||||
StoreRoutesFunc: storeFunc,
|
||||
HasStoredRoutes: shouldStoreRoutes,
|
||||
})
|
||||
}
|
||||
if nm == nil {
|
||||
@@ -7008,9 +7023,9 @@ func (b *LocalBackend) ObserveDNSResponse(res []byte) error {
|
||||
// ErrDisallowedAutoRoute is returned by AdvertiseRoute when a route that is not allowed is requested.
|
||||
var ErrDisallowedAutoRoute = errors.New("route is not allowed")
|
||||
|
||||
// AdvertiseRoute implements the appc.RouteAdvertiser interface. It sets a new
|
||||
// route advertisement if one is not already present in the existing routes.
|
||||
// If the route is disallowed, ErrDisallowedAutoRoute is returned.
|
||||
// AdvertiseRoute implements the appctype.RouteAdvertiser interface. It sets a
|
||||
// new route advertisement if one is not already present in the existing
|
||||
// routes. If the route is disallowed, ErrDisallowedAutoRoute is returned.
|
||||
func (b *LocalBackend) AdvertiseRoute(ipps ...netip.Prefix) error {
|
||||
finalRoutes := b.Prefs().AdvertiseRoutes().AsSlice()
|
||||
var newRoutes []netip.Prefix
|
||||
@@ -7066,8 +7081,8 @@ func coveredRouteRangeNoDefault(finalRoutes []netip.Prefix, ipp netip.Prefix) bo
|
||||
return false
|
||||
}
|
||||
|
||||
// UnadvertiseRoute implements the appc.RouteAdvertiser interface. It removes
|
||||
// a route advertisement if one is present in the existing routes.
|
||||
// UnadvertiseRoute implements the appctype.RouteAdvertiser interface. It
|
||||
// removes a route advertisement if one is present in the existing routes.
|
||||
func (b *LocalBackend) UnadvertiseRoute(toRemove ...netip.Prefix) error {
|
||||
currentRoutes := b.Prefs().AdvertiseRoutes().AsSlice()
|
||||
finalRoutes := currentRoutes[:0]
|
||||
@@ -7095,7 +7110,7 @@ func namespaceKeyForCurrentProfile(pm *profileManager, key ipn.StateKey) ipn.Sta
|
||||
|
||||
const routeInfoStateStoreKey ipn.StateKey = "_routeInfo"
|
||||
|
||||
func (b *LocalBackend) storeRouteInfo(ri *appctype.RouteInfo) error {
|
||||
func (b *LocalBackend) storeRouteInfo(ri appctype.RouteInfo) error {
|
||||
if !buildfeatures.HasAppConnectors {
|
||||
return feature.ErrUnavailable
|
||||
}
|
||||
|
||||
+61
-27
@@ -75,8 +75,6 @@ import (
|
||||
"tailscale.com/wgengine/wgcfg"
|
||||
)
|
||||
|
||||
func fakeStoreRoutes(*appctype.RouteInfo) error { return nil }
|
||||
|
||||
func inRemove(ip netip.Addr) bool {
|
||||
for _, pfx := range removeFromDefaultRoute {
|
||||
if pfx.Contains(ip) {
|
||||
@@ -2321,14 +2319,9 @@ func TestOfferingAppConnector(t *testing.T) {
|
||||
if b.OfferingAppConnector() {
|
||||
t.Fatal("unexpected offering app connector")
|
||||
}
|
||||
rc := &appctest.RouteCollector{}
|
||||
if shouldStore {
|
||||
b.appConnector = appc.NewAppConnector(appc.Config{
|
||||
Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc, RouteInfo: &appctype.RouteInfo{}, StoreRoutesFunc: fakeStoreRoutes,
|
||||
})
|
||||
} else {
|
||||
b.appConnector = appc.NewAppConnector(appc.Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc})
|
||||
}
|
||||
b.appConnector = appc.NewAppConnector(appc.Config{
|
||||
Logf: t.Logf, EventBus: bus, HasStoredRoutes: shouldStore,
|
||||
})
|
||||
if !b.OfferingAppConnector() {
|
||||
t.Fatal("unexpected not offering app connector")
|
||||
}
|
||||
@@ -2379,6 +2372,7 @@ func TestObserveDNSResponse(t *testing.T) {
|
||||
for _, shouldStore := range []bool{false, true} {
|
||||
b := newTestBackend(t)
|
||||
bus := b.sys.Bus.Get()
|
||||
w := eventbustest.NewWatcher(t, bus)
|
||||
|
||||
// ensure no error when no app connector is configured
|
||||
if err := b.ObserveDNSResponse(dnsResponse("example.com.", "192.0.0.8")); err != nil {
|
||||
@@ -2386,28 +2380,30 @@ func TestObserveDNSResponse(t *testing.T) {
|
||||
}
|
||||
|
||||
rc := &appctest.RouteCollector{}
|
||||
if shouldStore {
|
||||
b.appConnector = appc.NewAppConnector(appc.Config{
|
||||
Logf: t.Logf,
|
||||
EventBus: bus,
|
||||
RouteAdvertiser: rc,
|
||||
RouteInfo: &appctype.RouteInfo{},
|
||||
StoreRoutesFunc: fakeStoreRoutes,
|
||||
})
|
||||
} else {
|
||||
b.appConnector = appc.NewAppConnector(appc.Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc})
|
||||
}
|
||||
b.appConnector.UpdateDomains([]string{"example.com"})
|
||||
b.appConnector.Wait(context.Background())
|
||||
a := appc.NewAppConnector(appc.Config{
|
||||
Logf: t.Logf,
|
||||
EventBus: bus,
|
||||
RouteAdvertiser: rc,
|
||||
HasStoredRoutes: shouldStore,
|
||||
})
|
||||
a.UpdateDomains([]string{"example.com"})
|
||||
a.Wait(t.Context())
|
||||
b.appConnector = a
|
||||
|
||||
if err := b.ObserveDNSResponse(dnsResponse("example.com.", "192.0.0.8")); err != nil {
|
||||
t.Errorf("ObserveDNSResponse: %v", err)
|
||||
}
|
||||
b.appConnector.Wait(context.Background())
|
||||
a.Wait(t.Context())
|
||||
wantRoutes := []netip.Prefix{netip.MustParsePrefix("192.0.0.8/32")}
|
||||
if !slices.Equal(rc.Routes(), wantRoutes) {
|
||||
t.Fatalf("got routes %v, want %v", rc.Routes(), wantRoutes)
|
||||
}
|
||||
|
||||
if err := eventbustest.Expect(w,
|
||||
eqUpdate(appctype.RouteUpdate{Advertise: mustPrefix("192.0.0.8/32")}),
|
||||
); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2558,7 +2554,7 @@ func TestBackfillAppConnectorRoutes(t *testing.T) {
|
||||
|
||||
// Store the test IP in profile data, but not in Prefs.AdvertiseRoutes.
|
||||
b.ControlKnobs().AppCStoreRoutes.Store(true)
|
||||
if err := b.storeRouteInfo(&appctype.RouteInfo{
|
||||
if err := b.storeRouteInfo(appctype.RouteInfo{
|
||||
Domains: map[string][]netip.Addr{
|
||||
"example.com": {ip},
|
||||
},
|
||||
@@ -5511,10 +5507,10 @@ func TestReadWriteRouteInfo(t *testing.T) {
|
||||
b.pm.currentProfile = prof1.View()
|
||||
|
||||
// set up routeInfo
|
||||
ri1 := &appctype.RouteInfo{}
|
||||
ri1 := appctype.RouteInfo{}
|
||||
ri1.Wildcards = []string{"1"}
|
||||
|
||||
ri2 := &appctype.RouteInfo{}
|
||||
ri2 := appctype.RouteInfo{}
|
||||
ri2.Wildcards = []string{"2"}
|
||||
|
||||
// read before write
|
||||
@@ -7066,3 +7062,41 @@ func toStrings[T ~string](in []T) []string {
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
type textUpdate struct {
|
||||
Advertise []string
|
||||
Unadvertise []string
|
||||
}
|
||||
|
||||
func routeUpdateToText(u appctype.RouteUpdate) textUpdate {
|
||||
var out textUpdate
|
||||
for _, p := range u.Advertise {
|
||||
out.Advertise = append(out.Advertise, p.String())
|
||||
}
|
||||
for _, p := range u.Unadvertise {
|
||||
out.Unadvertise = append(out.Unadvertise, p.String())
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func mustPrefix(ss ...string) (out []netip.Prefix) {
|
||||
for _, s := range ss {
|
||||
out = append(out, netip.MustParsePrefix(s))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// eqUpdate generates an eventbus test filter that matches an appctype.RouteUpdate
|
||||
// message equal to want, or reports an error giving a human-readable diff.
|
||||
//
|
||||
// TODO(creachadair): This is copied from the appc test package, but we can't
|
||||
// put it into the appctest package because the appc tests depend on it and
|
||||
// that makes a cycle. Clean up those tests and put this somewhere common.
|
||||
func eqUpdate(want appctype.RouteUpdate) func(appctype.RouteUpdate) error {
|
||||
return func(got appctype.RouteUpdate) error {
|
||||
if diff := cmp.Diff(routeUpdateToText(got), routeUpdateToText(want)); diff != "" {
|
||||
return fmt.Errorf("wrong update (-got, +want):\n%s", diff)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -256,22 +256,12 @@ func TestPeerAPIPrettyReplyCNAME(t *testing.T) {
|
||||
reg := new(usermetric.Registry)
|
||||
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, sys.Bus.Get(), sys.Set)
|
||||
pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht))
|
||||
var a *appc.AppConnector
|
||||
if shouldStore {
|
||||
a = appc.NewAppConnector(appc.Config{
|
||||
Logf: t.Logf,
|
||||
EventBus: sys.Bus.Get(),
|
||||
RouteAdvertiser: &appctest.RouteCollector{},
|
||||
RouteInfo: &appctype.RouteInfo{},
|
||||
StoreRoutesFunc: fakeStoreRoutes,
|
||||
})
|
||||
} else {
|
||||
a = appc.NewAppConnector(appc.Config{
|
||||
Logf: t.Logf,
|
||||
EventBus: sys.Bus.Get(),
|
||||
RouteAdvertiser: &appctest.RouteCollector{},
|
||||
})
|
||||
}
|
||||
a := appc.NewAppConnector(appc.Config{
|
||||
Logf: t.Logf,
|
||||
EventBus: sys.Bus.Get(),
|
||||
HasStoredRoutes: shouldStore,
|
||||
})
|
||||
t.Cleanup(a.Close)
|
||||
sys.Set(pm.Store())
|
||||
sys.Set(eng)
|
||||
|
||||
@@ -329,11 +319,11 @@ func TestPeerAPIPrettyReplyCNAME(t *testing.T) {
|
||||
|
||||
func TestPeerAPIReplyToDNSQueriesAreObserved(t *testing.T) {
|
||||
for _, shouldStore := range []bool{false, true} {
|
||||
ctx := context.Background()
|
||||
var h peerAPIHandler
|
||||
h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345")
|
||||
|
||||
sys := tsd.NewSystemWithBus(eventbustest.NewBus(t))
|
||||
bw := eventbustest.NewWatcher(t, sys.Bus.Get())
|
||||
|
||||
rc := &appctest.RouteCollector{}
|
||||
ht := health.NewTracker(sys.Bus.Get())
|
||||
@@ -341,18 +331,13 @@ func TestPeerAPIReplyToDNSQueriesAreObserved(t *testing.T) {
|
||||
|
||||
reg := new(usermetric.Registry)
|
||||
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, sys.Bus.Get(), sys.Set)
|
||||
var a *appc.AppConnector
|
||||
if shouldStore {
|
||||
a = appc.NewAppConnector(appc.Config{
|
||||
Logf: t.Logf,
|
||||
EventBus: sys.Bus.Get(),
|
||||
RouteAdvertiser: rc,
|
||||
RouteInfo: &appctype.RouteInfo{},
|
||||
StoreRoutesFunc: fakeStoreRoutes,
|
||||
})
|
||||
} else {
|
||||
a = appc.NewAppConnector(appc.Config{Logf: t.Logf, EventBus: sys.Bus.Get(), RouteAdvertiser: rc})
|
||||
}
|
||||
a := appc.NewAppConnector(appc.Config{
|
||||
Logf: t.Logf,
|
||||
EventBus: sys.Bus.Get(),
|
||||
RouteAdvertiser: rc,
|
||||
HasStoredRoutes: shouldStore,
|
||||
})
|
||||
t.Cleanup(a.Close)
|
||||
sys.Set(pm.Store())
|
||||
sys.Set(eng)
|
||||
|
||||
@@ -362,7 +347,7 @@ func TestPeerAPIReplyToDNSQueriesAreObserved(t *testing.T) {
|
||||
|
||||
h.ps = &peerAPIServer{b: b}
|
||||
h.ps.b.appConnector.UpdateDomains([]string{"example.com"})
|
||||
h.ps.b.appConnector.Wait(ctx)
|
||||
a.Wait(t.Context())
|
||||
|
||||
h.ps.resolver = &fakeResolver{build: func(b *dnsmessage.Builder) {
|
||||
b.AResource(
|
||||
@@ -392,12 +377,18 @@ func TestPeerAPIReplyToDNSQueriesAreObserved(t *testing.T) {
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("unexpected status code: %v", w.Code)
|
||||
}
|
||||
h.ps.b.appConnector.Wait(ctx)
|
||||
a.Wait(t.Context())
|
||||
|
||||
wantRoutes := []netip.Prefix{netip.MustParsePrefix("192.0.0.8/32")}
|
||||
if !slices.Equal(rc.Routes(), wantRoutes) {
|
||||
t.Errorf("got %v; want %v", rc.Routes(), wantRoutes)
|
||||
}
|
||||
|
||||
if err := eventbustest.Expect(bw,
|
||||
eqUpdate(appctype.RouteUpdate{Advertise: mustPrefix("192.0.0.8/32")}),
|
||||
); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -408,24 +399,20 @@ func TestPeerAPIReplyToDNSQueriesAreObservedWithCNAMEFlattening(t *testing.T) {
|
||||
h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345")
|
||||
|
||||
sys := tsd.NewSystemWithBus(eventbustest.NewBus(t))
|
||||
bw := eventbustest.NewWatcher(t, sys.Bus.Get())
|
||||
|
||||
ht := health.NewTracker(sys.Bus.Get())
|
||||
reg := new(usermetric.Registry)
|
||||
rc := &appctest.RouteCollector{}
|
||||
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, sys.Bus.Get(), sys.Set)
|
||||
pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht))
|
||||
var a *appc.AppConnector
|
||||
if shouldStore {
|
||||
a = appc.NewAppConnector(appc.Config{
|
||||
Logf: t.Logf,
|
||||
EventBus: sys.Bus.Get(),
|
||||
RouteAdvertiser: rc,
|
||||
RouteInfo: &appctype.RouteInfo{},
|
||||
StoreRoutesFunc: fakeStoreRoutes,
|
||||
})
|
||||
} else {
|
||||
a = appc.NewAppConnector(appc.Config{Logf: t.Logf, EventBus: sys.Bus.Get(), RouteAdvertiser: rc})
|
||||
}
|
||||
a := appc.NewAppConnector(appc.Config{
|
||||
Logf: t.Logf,
|
||||
EventBus: sys.Bus.Get(),
|
||||
RouteAdvertiser: rc,
|
||||
HasStoredRoutes: shouldStore,
|
||||
})
|
||||
t.Cleanup(a.Close)
|
||||
sys.Set(pm.Store())
|
||||
sys.Set(eng)
|
||||
|
||||
@@ -482,6 +469,12 @@ func TestPeerAPIReplyToDNSQueriesAreObservedWithCNAMEFlattening(t *testing.T) {
|
||||
if !slices.Equal(rc.Routes(), wantRoutes) {
|
||||
t.Errorf("got %v; want %v", rc.Routes(), wantRoutes)
|
||||
}
|
||||
|
||||
if err := eventbustest.Expect(bw,
|
||||
eqUpdate(appctype.RouteUpdate{Advertise: mustPrefix("192.0.0.8/32")}),
|
||||
); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user