From f5b9a90254c7418eb32848e9f1c14222bf89d626 Mon Sep 17 00:00:00 2001 From: Paul Miller Date: Tue, 25 Nov 2025 00:29:31 +0000 Subject: [PATCH 01/17] Better error message when nics aren't programmed --- cns/restserver/internalapi.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cns/restserver/internalapi.go b/cns/restserver/internalapi.go index 9f855ec64f..68f6794e54 100644 --- a/cns/restserver/internalapi.go +++ b/cns/restserver/internalapi.go @@ -175,15 +175,20 @@ func (service *HTTPRestService) SyncHostNCVersion(ctx context.Context, channelMo defer service.Unlock() start := time.Now() programmedNCCount, err := service.syncHostNCVersion(ctx, channelMode) + // even if we get an error, we want to write the CNI conflist if we have any NC programmed to any version if programmedNCCount > 0 { // This will only be done once per lifetime of the CNS process. This function is threadsafe and will panic // if it fails, so it is safe to call in a non-preemptable goroutine. go service.MustGenerateCNIConflistOnce() + } else { + logger.Printf("No NCs programmed on this host yet, skipping CNI conflist generation") } + if err != nil { logger.Errorf("sync host error %v", err) } + syncHostNCVersionCount.WithLabelValues(strconv.FormatBool(err == nil)).Inc() syncHostNCVersionLatency.WithLabelValues(strconv.FormatBool(err == nil)).Observe(time.Since(start).Seconds()) } @@ -302,7 +307,7 @@ func (service *HTTPRestService) syncHostNCVersion(ctx context.Context, channelMo // if we didn't empty out the needs update set, NMA has not programmed all the NCs we are expecting, and we // need to return an error indicating that if len(outdatedNCs) > 0 { - return len(programmedNCs), errors.Errorf("unable to update some NCs: %v, missing or bad response from NMA or IMDS", outdatedNCs) + return len(programmedNCs), errors.Errorf("Have outdated NCs: %v, Current Programmed nics from NMA/IMDS %v", outdatedNCs, programmedNCs) } return len(programmedNCs), nil From b8e906bb5dafd5d906d1d227d1cad816e919e95a Mon Sep 17 00:00:00 2001 From: Paul Miller Date: Tue, 25 Nov 2025 01:15:17 +0000 Subject: [PATCH 02/17] don't go ready till we actually write out a conflist/syncnc --- cns/healthserver/healthz.go | 1 + cns/restserver/internalapi.go | 147 ----------------------- cns/restserver/restserver.go | 15 +-- cns/restserver/synchostnc.go | 213 ++++++++++++++++++++++++++++++++++ cns/service/main.go | 39 +------ 5 files changed, 220 insertions(+), 195 deletions(-) create mode 100644 cns/restserver/synchostnc.go diff --git a/cns/healthserver/healthz.go b/cns/healthserver/healthz.go index bc1b90f444..97ad2ae093 100644 --- a/cns/healthserver/healthz.go +++ b/cns/healthserver/healthz.go @@ -70,6 +70,7 @@ func NewHealthzHandlerWithChecks(cfg *Config) (http.Handler, error) { return nil } } + return &healthz.Handler{ Checks: checks, }, nil diff --git a/cns/restserver/internalapi.go b/cns/restserver/internalapi.go index 68f6794e54..5a089f58cc 100644 --- a/cns/restserver/internalapi.go +++ b/cns/restserver/internalapi.go @@ -13,9 +13,7 @@ import ( "net/http/httptest" "net/netip" "reflect" - "strconv" "strings" - "time" "github.com/Azure/azure-container-networking/cns" "github.com/Azure/azure-container-networking/cns/logger" @@ -168,151 +166,6 @@ func (service *HTTPRestService) SyncNodeStatus(dncEP, infraVnet, nodeID string, return } -// SyncHostNCVersion will check NC version from NMAgent and save it as host NC version in container status. -// If NMAgent NC version got updated, CNS will refresh the pending programming IP status. -func (service *HTTPRestService) SyncHostNCVersion(ctx context.Context, channelMode string) { - service.Lock() - defer service.Unlock() - start := time.Now() - programmedNCCount, err := service.syncHostNCVersion(ctx, channelMode) - - // even if we get an error, we want to write the CNI conflist if we have any NC programmed to any version - if programmedNCCount > 0 { - // This will only be done once per lifetime of the CNS process. This function is threadsafe and will panic - // if it fails, so it is safe to call in a non-preemptable goroutine. - go service.MustGenerateCNIConflistOnce() - } else { - logger.Printf("No NCs programmed on this host yet, skipping CNI conflist generation") - } - - if err != nil { - logger.Errorf("sync host error %v", err) - } - - syncHostNCVersionCount.WithLabelValues(strconv.FormatBool(err == nil)).Inc() - syncHostNCVersionLatency.WithLabelValues(strconv.FormatBool(err == nil)).Observe(time.Since(start).Seconds()) -} - -var errNonExistentContainerStatus = errors.New("nonExistantContainerstatus") - -// syncHostVersion updates the CNS state with the latest programmed versions of NCs attached to the VM. If any NC in local CNS state -// does not match the version that DNC claims to have published, this function will call NMAgent and list the latest programmed versions of -// all NCs and update the CNS state accordingly. This function returns the the total number of NCs on this VM that have been programmed to -// some version, NOT the number of NCs that are up-to-date. -func (service *HTTPRestService) syncHostNCVersion(ctx context.Context, channelMode string) (int, error) { - outdatedNCs := map[string]struct{}{} - programmedNCs := map[string]struct{}{} - for idx := range service.state.ContainerStatus { - // Will open a separate PR to convert all the NC version related variable to int. Change from string to int is a pain. - localNCVersion, err := strconv.Atoi(service.state.ContainerStatus[idx].HostVersion) - if err != nil { - logger.Errorf("Received err when change containerstatus.HostVersion %s to int, err msg %v", service.state.ContainerStatus[idx].HostVersion, err) - continue - } - dncNCVersion, err := strconv.Atoi(service.state.ContainerStatus[idx].CreateNetworkContainerRequest.Version) - if err != nil { - logger.Errorf("Received err when change nc version %s in containerstatus to int, err msg %v", service.state.ContainerStatus[idx].CreateNetworkContainerRequest.Version, err) - continue - } - // host NC version is the NC version from NMAgent, if it's smaller than NC version from DNC, then append it to indicate it needs update. - if localNCVersion < dncNCVersion { - outdatedNCs[service.state.ContainerStatus[idx].ID] = struct{}{} - } else if localNCVersion > dncNCVersion { - logger.Errorf("NC version from NMAgent is larger than DNC, NC version from NMAgent is %d, NC version from DNC is %d", localNCVersion, dncNCVersion) - } - - if localNCVersion > -1 { - programmedNCs[service.state.ContainerStatus[idx].ID] = struct{}{} - } - } - if len(outdatedNCs) == 0 { - return len(programmedNCs), nil - } - - ncVersionListResp, err := service.nma.GetNCVersionList(ctx) - if err != nil { - return len(programmedNCs), errors.Wrap(err, "failed to get nc version list from nmagent") - } - - // Get IMDS NC versions for delegated NIC scenarios - imdsNCVersions, err := service.GetIMDSNCs(ctx) - if err != nil { - // If any of the NMA API check calls, imds calls fails assume that nma build doesn't have the latest changes and create empty map - imdsNCVersions = make(map[string]string) - } - - nmaNCs := map[string]string{} - for _, nc := range ncVersionListResp.Containers { - nmaNCs[strings.ToLower(nc.NetworkContainerID)] = nc.Version - } - - // Consolidate both nc's from NMA and IMDS calls - nmaProgrammedNCs := make(map[string]string) - for ncID, version := range nmaNCs { - nmaProgrammedNCs[ncID] = version - } - for ncID, version := range imdsNCVersions { - if _, exists := nmaProgrammedNCs[ncID]; !exists { - nmaProgrammedNCs[strings.ToLower(ncID)] = version - } else { - //nolint:staticcheck // SA1019: suppress deprecated logger.Warnf usage. Todo: legacy logger usage is consistent in cns repo. Migrates when all logger usage is migrated - logger.Warnf("NC %s exists in both NMA and IMDS responses, which is not expected", ncID) - } - } - hasNC.Set(float64(len(nmaProgrammedNCs))) - for ncID := range outdatedNCs { - nmaProgrammedNCVersionStr, ok := nmaProgrammedNCs[ncID] - if !ok { - // Neither NMA nor IMDS has this NC that we need programmed yet, bail out - continue - } - nmaProgrammedNCVersion, err := strconv.Atoi(nmaProgrammedNCVersionStr) - if err != nil { - logger.Errorf("failed to parse container version of %s: %s", ncID, err) - continue - } - // Check whether it exist in service state and get the related nc info - ncInfo, exist := service.state.ContainerStatus[ncID] - if !exist { - // if we marked this NC as needs update, but it no longer exists in internal state when we reach - // this point, our internal state has changed unexpectedly and we should bail out and try again. - return len(programmedNCs), errors.Wrapf(errNonExistentContainerStatus, "can't find NC with ID %s in service state, stop updating this host NC version", ncID) - } - // if the NC still exists in state and is programmed to some version (doesn't have to be latest), add it to our set of NCs that have been programmed - if nmaProgrammedNCVersion > -1 { - programmedNCs[ncID] = struct{}{} - } - - localNCVersion, err := strconv.Atoi(ncInfo.HostVersion) - if err != nil { - logger.Errorf("failed to parse host nc version string %s: %s", ncInfo.HostVersion, err) - continue - } - if localNCVersion > nmaProgrammedNCVersion { - //nolint:staticcheck // SA1019: suppress deprecated logger.Printf usage. Todo: legacy logger usage is consistent in cns repo. Migrates when all logger usage is migrated - logger.Errorf("NC version from consolidated sources is decreasing: have %d, got %d", localNCVersion, nmaProgrammedNCVersion) - continue - } - if channelMode == cns.CRD { - service.MarkIpsAsAvailableUntransacted(ncInfo.ID, nmaProgrammedNCVersion) - } - //nolint:staticcheck // SA1019: suppress deprecated logger.Printf usage. Todo: legacy logger usage is consistent in cns repo. Migrates when all logger usage is migrated - logger.Printf("Updating NC %s host version from %s to %s", ncID, ncInfo.HostVersion, nmaProgrammedNCVersionStr) - ncInfo.HostVersion = nmaProgrammedNCVersionStr - logger.Printf("Updated NC %s host version to %s", ncID, ncInfo.HostVersion) - service.state.ContainerStatus[ncID] = ncInfo - // if we successfully updated the NC, pop it from the needs update set. - delete(outdatedNCs, ncID) - } - // if we didn't empty out the needs update set, NMA has not programmed all the NCs we are expecting, and we - // need to return an error indicating that - if len(outdatedNCs) > 0 { - return len(programmedNCs), errors.Errorf("Have outdated NCs: %v, Current Programmed nics from NMA/IMDS %v", outdatedNCs, programmedNCs) - } - - return len(programmedNCs), nil -} - func (service *HTTPRestService) ReconcileIPAssignment(podInfoByIP map[string]cns.PodInfo, ncReqs []*cns.CreateNetworkContainerRequest) types.ResponseCode { // index all the secondary IP configs for all the nc reqs, for easier lookup later on. allSecIPsIdx := make(map[string]*cns.CreateNetworkContainerRequest) diff --git a/cns/restserver/restserver.go b/cns/restserver/restserver.go index f14c69d12c..4a5e8140cf 100644 --- a/cns/restserver/restserver.go +++ b/cns/restserver/restserver.go @@ -102,6 +102,7 @@ type HTTPRestService struct { PnpIDByMacAddress map[string]string imdsClient imdsClient nodesubnetIPFetcher *nodesubnet.IPFetcher + ncSynced chan struct{} } type CNIConflistGenerator interface { @@ -382,20 +383,6 @@ func (service *HTTPRestService) Stop() { logger.Printf("[Azure CNS] Service stopped.") } -// MustGenerateCNIConflistOnce will generate the CNI conflist once if the service was initialized with -// a conflist generator. If not, this is a no-op. -func (service *HTTPRestService) MustGenerateCNIConflistOnce() { - service.generateCNIConflistOnce.Do(func() { - if err := service.cniConflistGenerator.Generate(); err != nil { - panic("unable to generate cni conflist with error: " + err.Error()) - } - - if err := service.cniConflistGenerator.Close(); err != nil { - panic("unable to close the cni conflist output stream: " + err.Error()) - } - }) -} - func (service *HTTPRestService) AttachIPConfigsHandlerMiddleware(middleware cns.IPConfigsHandlerMiddleware) { service.IPConfigsHandlerMiddleware = middleware } diff --git a/cns/restserver/synchostnc.go b/cns/restserver/synchostnc.go new file mode 100644 index 0000000000..3d75161bdc --- /dev/null +++ b/cns/restserver/synchostnc.go @@ -0,0 +1,213 @@ +// Copyright 2017 Microsoft. All rights reserved. +// MIT License + +package restserver + +import ( + "context" + "strconv" + "strings" + "time" + + "github.com/Azure/azure-container-networking/cns" + "github.com/Azure/azure-container-networking/cns/configuration" + "github.com/Azure/azure-container-networking/cns/logger" + "github.com/pkg/errors" +) + +//TODO make this file a sub pacakge? + +func (service *HTTPRestService) StartSyncHostNCVersionLoop(ctx context.Context, cnsconfig configuration.CNSConfig) { + //do we need a sync.once to protect this? should we error if this is called twice? + service.ncSynced = make(chan struct{}) + go func() { + logger.Printf("Starting SyncHostNCVersion loop.") + // Periodically poll vfp programmed NC version from NMAgent + tickerChannel := time.Tick(time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond) + for { + select { + case <-tickerChannel: + timedCtx, cancel := context.WithTimeout(ctx, time.Duration(cnsconfig.SyncHostNCVersionIntervalMs)*time.Millisecond) + service.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode) + cancel() + case <-ctx.Done(): + logger.Printf("Stopping SyncHostNCVersion loop.") + return + } + } + }() +} + +// SyncHostNCVersion will check NC version from NMAgent and save it as host NC version in container status. +// If NMAgent NC version got updated, CNS will refresh the pending programming IP status. +func (service *HTTPRestService) SyncHostNCVersion(ctx context.Context, channelMode string) { + service.Lock() + defer service.Unlock() + start := time.Now() + programmedNCCount, err := service.syncHostNCVersion(ctx, channelMode) + + // even if we get an error, we want to write the CNI conflist if we have any NC programmed to any version + if programmedNCCount > 0 { + // This will only be done once per lifetime of the CNS process. This function is threadsafe and will panic + // if it fails, so it is safe to call in a non-preemptable goroutine. + go service.MustGenerateCNIConflistOnce() + } else { + logger.Printf("No NCs programmed on this host yet, skipping CNI conflist generation") + } + + if err != nil { + logger.Errorf("sync host error %v", err) + } + + syncHostNCVersionCount.WithLabelValues(strconv.FormatBool(err == nil)).Inc() + syncHostNCVersionLatency.WithLabelValues(strconv.FormatBool(err == nil)).Observe(time.Since(start).Seconds()) +} + +var errNonExistentContainerStatus = errors.New("nonExistantContainerstatus") + +// syncHostVersion updates the CNS state with the latest programmed versions of NCs attached to the VM. If any NC in local CNS state +// does not match the version that DNC claims to have published, this function will call NMAgent and list the latest programmed versions of +// all NCs and update the CNS state accordingly. This function returns the the total number of NCs on this VM that have been programmed to +// some version, NOT the number of NCs that are up-to-date. +func (service *HTTPRestService) syncHostNCVersion(ctx context.Context, channelMode string) (int, error) { + outdatedNCs := map[string]struct{}{} + programmedNCs := map[string]struct{}{} + for idx := range service.state.ContainerStatus { + // Will open a separate PR to convert all the NC version related variable to int. Change from string to int is a pain. + localNCVersion, err := strconv.Atoi(service.state.ContainerStatus[idx].HostVersion) + if err != nil { + logger.Errorf("Received err when change containerstatus.HostVersion %s to int, err msg %v", service.state.ContainerStatus[idx].HostVersion, err) + continue + } + dncNCVersion, err := strconv.Atoi(service.state.ContainerStatus[idx].CreateNetworkContainerRequest.Version) + if err != nil { + logger.Errorf("Received err when change nc version %s in containerstatus to int, err msg %v", service.state.ContainerStatus[idx].CreateNetworkContainerRequest.Version, err) + continue + } + // host NC version is the NC version from NMAgent, if it's smaller than NC version from DNC, then append it to indicate it needs update. + if localNCVersion < dncNCVersion { + outdatedNCs[service.state.ContainerStatus[idx].ID] = struct{}{} + } else if localNCVersion > dncNCVersion { + logger.Errorf("NC version from NMAgent is larger than DNC, NC version from NMAgent is %d, NC version from DNC is %d", localNCVersion, dncNCVersion) + } + + if localNCVersion > -1 { + programmedNCs[service.state.ContainerStatus[idx].ID] = struct{}{} + } + } + if len(outdatedNCs) == 0 { + return len(programmedNCs), nil + } + + ncVersionListResp, err := service.nma.GetNCVersionList(ctx) + if err != nil { + return len(programmedNCs), errors.Wrap(err, "failed to get nc version list from nmagent") + } + + // Get IMDS NC versions for delegated NIC scenarios + imdsNCVersions, err := service.GetIMDSNCs(ctx) + if err != nil { + // If any of the NMA API check calls, imds calls fails assume that nma build doesn't have the latest changes and create empty map + imdsNCVersions = make(map[string]string) + } + + nmaNCs := map[string]string{} + for _, nc := range ncVersionListResp.Containers { + nmaNCs[strings.ToLower(nc.NetworkContainerID)] = nc.Version + } + + // Consolidate both nc's from NMA and IMDS calls + nmaProgrammedNCs := make(map[string]string) + for ncID, version := range nmaNCs { + nmaProgrammedNCs[ncID] = version + } + for ncID, version := range imdsNCVersions { + if _, exists := nmaProgrammedNCs[ncID]; !exists { + nmaProgrammedNCs[strings.ToLower(ncID)] = version + } else { + //nolint:staticcheck // SA1019: suppress deprecated logger.Warnf usage. Todo: legacy logger usage is consistent in cns repo. Migrates when all logger usage is migrated + logger.Warnf("NC %s exists in both NMA and IMDS responses, which is not expected", ncID) + } + } + hasNC.Set(float64(len(nmaProgrammedNCs))) + for ncID := range outdatedNCs { + nmaProgrammedNCVersionStr, ok := nmaProgrammedNCs[ncID] + if !ok { + // Neither NMA nor IMDS has this NC that we need programmed yet, bail out + continue + } + nmaProgrammedNCVersion, err := strconv.Atoi(nmaProgrammedNCVersionStr) + if err != nil { + logger.Errorf("failed to parse container version of %s: %s", ncID, err) + continue + } + // Check whether it exist in service state and get the related nc info + ncInfo, exist := service.state.ContainerStatus[ncID] + if !exist { + // if we marked this NC as needs update, but it no longer exists in internal state when we reach + // this point, our internal state has changed unexpectedly and we should bail out and try again. + return len(programmedNCs), errors.Wrapf(errNonExistentContainerStatus, "can't find NC with ID %s in service state, stop updating this host NC version", ncID) + } + // if the NC still exists in state and is programmed to some version (doesn't have to be latest), add it to our set of NCs that have been programmed + if nmaProgrammedNCVersion > -1 { + programmedNCs[ncID] = struct{}{} + } + + localNCVersion, err := strconv.Atoi(ncInfo.HostVersion) + if err != nil { + logger.Errorf("failed to parse host nc version string %s: %s", ncInfo.HostVersion, err) + continue + } + if localNCVersion > nmaProgrammedNCVersion { + //nolint:staticcheck // SA1019: suppress deprecated logger.Printf usage. Todo: legacy logger usage is consistent in cns repo. Migrates when all logger usage is migrated + logger.Errorf("NC version from consolidated sources is decreasing: have %d, got %d", localNCVersion, nmaProgrammedNCVersion) + continue + } + if channelMode == cns.CRD { + service.MarkIpsAsAvailableUntransacted(ncInfo.ID, nmaProgrammedNCVersion) + } + //nolint:staticcheck // SA1019: suppress deprecated logger.Printf usage. Todo: legacy logger usage is consistent in cns repo. Migrates when all logger usage is migrated + logger.Printf("Updating NC %s host version from %s to %s", ncID, ncInfo.HostVersion, nmaProgrammedNCVersionStr) + ncInfo.HostVersion = nmaProgrammedNCVersionStr + logger.Printf("Updated NC %s host version to %s", ncID, ncInfo.HostVersion) + service.state.ContainerStatus[ncID] = ncInfo + // if we successfully updated the NC, pop it from the needs update set. + delete(outdatedNCs, ncID) + } + // if we didn't empty out the needs update set, NMA has not programmed all the NCs we are expecting, and we + // need to return an error indicating that + if len(outdatedNCs) > 0 { + return len(programmedNCs), errors.Errorf("Have outdated NCs: %v, Current Programmed nics from NMA/IMDS %v", outdatedNCs, programmedNCs) + } + + return len(programmedNCs), nil +} + +// MustGenerateCNIConflistOnce will generate the CNI conflist once if the service was initialized with +// a conflist generator. If not, this is a no-op. +func (service *HTTPRestService) MustGenerateCNIConflistOnce() { + service.generateCNIConflistOnce.Do(func() { + close(service.ncSynced) + if err := service.cniConflistGenerator.Generate(); err != nil { + panic("unable to generate cni conflist with error: " + err.Error()) + } + + if err := service.cniConflistGenerator.Close(); err != nil { + panic("unable to close the cni conflist output stream: " + err.Error()) + } + }) +} + +func (service *HTTPRestService) WaitForNCSynced(ctx context.Context) error { + //sync loop never set up get out of here. + if service.ncSynced == nil { + return nil + } + + select { + case <-service.ncSynced: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/cns/service/main.go b/cns/service/main.go index d7b9a526d5..bc29f3a3c8 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -1118,8 +1118,10 @@ func main() { httpRemoteRestService.StartNodeSubnet(rootCtx) } + //we wrote out the conflist right? + httpRemoteRestService.WaitForNCSynced(rootCtx) // mark the service as "ready" - close(readyCh) + close(readyCh) //will still block if root ctx is canceled? // block until process exiting <-rootCtx.Done() @@ -1283,22 +1285,7 @@ func InitializeMultiTenantController(ctx context.Context, httpRestService cns.HT time.Sleep(time.Millisecond * 500) } - // TODO: do we need this to be running? - logger.Printf("Starting SyncHostNCVersion") - go func() { - // Periodically poll vfp programmed NC version from NMAgent - tickerChannel := time.Tick(time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond) - for { - select { - case <-tickerChannel: - timedCtx, cancel := context.WithTimeout(ctx, time.Duration(cnsconfig.SyncHostNCVersionIntervalMs)*time.Millisecond) - httpRestServiceImpl.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode) - cancel() - case <-ctx.Done(): - return - } - } - }() + httpRestServiceImpl.StartSyncHostNCVersionLoop(ctx, cnsconfig) return nil } @@ -1643,23 +1630,7 @@ func InitializeCRDState(ctx context.Context, z *zap.Logger, httpRestService cns. break } - go func() { - logger.Printf("Starting SyncHostNCVersion loop.") - // Periodically poll vfp programmed NC version from NMAgent - tickerChannel := time.Tick(time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond) - for { - select { - case <-tickerChannel: - timedCtx, cancel := context.WithTimeout(ctx, time.Duration(cnsconfig.SyncHostNCVersionIntervalMs)*time.Millisecond) - httpRestServiceImplementation.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode) - cancel() - case <-ctx.Done(): - logger.Printf("Stopping SyncHostNCVersion loop.") - return - } - } - }() - logger.Printf("Initialized SyncHostNCVersion loop.") + httpRestServiceImplementation.StartSyncHostNCVersionLoop(ctx, *cnsconfig) return nil } From a5e2b3456dccbf33c99be83d46e48a247f94a9f7 Mon Sep 17 00:00:00 2001 From: Paul Miller Date: Tue, 25 Nov 2025 05:36:59 +0000 Subject: [PATCH 03/17] fix tests --- cns/restserver/internalapi_test.go | 4 ++-- cns/restserver/nodesubnet.go | 3 ++- cns/restserver/synchostnc.go | 8 +++++--- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/cns/restserver/internalapi_test.go b/cns/restserver/internalapi_test.go index 440e3b4e61..29bd5b2e30 100644 --- a/cns/restserver/internalapi_test.go +++ b/cns/restserver/internalapi_test.go @@ -446,7 +446,7 @@ func TestSyncHostNCVersionErrorMissingNC(t *testing.T) { } // Check that the error message contains the expected text - expectedErrorText := "unable to update some NCs" + expectedErrorText := "Have outdated NCs" if !strings.Contains(err.Error(), expectedErrorText) { t.Errorf("Expected error to contain '%s', but got: %v", expectedErrorText, err) } @@ -607,7 +607,7 @@ func TestSyncHostNCVersionIMDSAPIVersionNotSupported(t *testing.T) { } // Verify the error is about being unable to update NCs - expectedErrorText := "unable to update some NCs" + expectedErrorText := "Have outdated NCs" if !strings.Contains(err.Error(), expectedErrorText) { t.Errorf("Expected error to contain '%s', but got: %v", expectedErrorText, err) } diff --git a/cns/restserver/nodesubnet.go b/cns/restserver/nodesubnet.go index 177d4266bc..6651c47ca0 100644 --- a/cns/restserver/nodesubnet.go +++ b/cns/restserver/nodesubnet.go @@ -31,7 +31,8 @@ func (service *HTTPRestService) UpdateIPsForNodeSubnet(secondaryIPs []netip.Addr // saved NC successfully. UpdateIPsForNodeSubnet is called only when IPs are fetched from NMAgent. // We now have IPs to serve IPAM requests. Generate conflist to indicate CNS is ready - service.MustGenerateCNIConflistOnce() + service.ncSynced = make(chan struct{}) // in case this is called multiple times + service.mustGenerateCNIConflistOnce() return nil } diff --git a/cns/restserver/synchostnc.go b/cns/restserver/synchostnc.go index 3d75161bdc..baa74d3388 100644 --- a/cns/restserver/synchostnc.go +++ b/cns/restserver/synchostnc.go @@ -50,7 +50,7 @@ func (service *HTTPRestService) SyncHostNCVersion(ctx context.Context, channelMo if programmedNCCount > 0 { // This will only be done once per lifetime of the CNS process. This function is threadsafe and will panic // if it fails, so it is safe to call in a non-preemptable goroutine. - go service.MustGenerateCNIConflistOnce() + go service.mustGenerateCNIConflistOnce() } else { logger.Printf("No NCs programmed on this host yet, skipping CNI conflist generation") } @@ -185,9 +185,11 @@ func (service *HTTPRestService) syncHostNCVersion(ctx context.Context, channelMo // MustGenerateCNIConflistOnce will generate the CNI conflist once if the service was initialized with // a conflist generator. If not, this is a no-op. -func (service *HTTPRestService) MustGenerateCNIConflistOnce() { +func (service *HTTPRestService) mustGenerateCNIConflistOnce() { service.generateCNIConflistOnce.Do(func() { - close(service.ncSynced) + if service.ncSynced != nil { + close(service.ncSynced) + } if err := service.cniConflistGenerator.Generate(); err != nil { panic("unable to generate cni conflist with error: " + err.Error()) } From 3371a9565322b4406215756d5bfcdfde3e66f760 Mon Sep 17 00:00:00 2001 From: Paul Miller Date: Tue, 25 Nov 2025 05:46:24 +0000 Subject: [PATCH 04/17] wait doesn't actually need an erro --- cns/restserver/synchostnc.go | 8 ++++---- cns/service/main.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cns/restserver/synchostnc.go b/cns/restserver/synchostnc.go index baa74d3388..36a9add815 100644 --- a/cns/restserver/synchostnc.go +++ b/cns/restserver/synchostnc.go @@ -200,16 +200,16 @@ func (service *HTTPRestService) mustGenerateCNIConflistOnce() { }) } -func (service *HTTPRestService) WaitForNCSynced(ctx context.Context) error { +func (service *HTTPRestService) WaitForConfList(ctx context.Context) { //sync loop never set up get out of here. if service.ncSynced == nil { - return nil + return } select { case <-service.ncSynced: - return nil + return case <-ctx.Done(): - return ctx.Err() + return } } diff --git a/cns/service/main.go b/cns/service/main.go index bc29f3a3c8..da33e76ece 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -1119,7 +1119,7 @@ func main() { } //we wrote out the conflist right? - httpRemoteRestService.WaitForNCSynced(rootCtx) + httpRemoteRestService.WaitForConfList(rootCtx) // mark the service as "ready" close(readyCh) //will still block if root ctx is canceled? // block until process exiting From 713c47c457d347bc8df007f5c03230fba9e8a256 Mon Sep 17 00:00:00 2001 From: Paul Miller Date: Tue, 25 Nov 2025 06:12:34 +0000 Subject: [PATCH 05/17] guard our done channel --- cns/restserver/restserver.go | 5 ++++- cns/restserver/synchostnc.go | 25 ++++++++++++++++++------- cns/service/main.go | 8 ++++++-- 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/cns/restserver/restserver.go b/cns/restserver/restserver.go index 4a5e8140cf..1dafc03182 100644 --- a/cns/restserver/restserver.go +++ b/cns/restserver/restserver.go @@ -102,7 +102,9 @@ type HTTPRestService struct { PnpIDByMacAddress map[string]string imdsClient imdsClient nodesubnetIPFetcher *nodesubnet.IPFetcher - ncSynced chan struct{} + //put in ncstate struct? + ncSynced chan struct{} + ncSyncLoop bool } type CNIConflistGenerator interface { @@ -253,6 +255,7 @@ func NewHTTPRestService(config *common.ServiceConfig, wscli interfaceGetter, wsp homeAzMonitor: homeAzMonitor, cniConflistGenerator: gen, imdsClient: imdsClient, + ncSynced: make(chan struct{}), }, nil } diff --git a/cns/restserver/synchostnc.go b/cns/restserver/synchostnc.go index 36a9add815..6182bc68a5 100644 --- a/cns/restserver/synchostnc.go +++ b/cns/restserver/synchostnc.go @@ -17,9 +17,13 @@ import ( //TODO make this file a sub pacakge? -func (service *HTTPRestService) StartSyncHostNCVersionLoop(ctx context.Context, cnsconfig configuration.CNSConfig) { - //do we need a sync.once to protect this? should we error if this is called twice? - service.ncSynced = make(chan struct{}) +func (service *HTTPRestService) StartSyncHostNCVersionLoop(ctx context.Context, cnsconfig configuration.CNSConfig) error { + service.Lock() //could use a seperate lock or atomic bool. + defer service.Unlock() + if service.ncSyncLoop { + return errors.New("SyncHostNCVersion loop already started") + } + service.ncSyncLoop = true go func() { logger.Printf("Starting SyncHostNCVersion loop.") // Periodically poll vfp programmed NC version from NMAgent @@ -36,6 +40,7 @@ func (service *HTTPRestService) StartSyncHostNCVersionLoop(ctx context.Context, } } }() + return nil } // SyncHostNCVersion will check NC version from NMAgent and save it as host NC version in container status. @@ -187,9 +192,11 @@ func (service *HTTPRestService) syncHostNCVersion(ctx context.Context, channelMo // a conflist generator. If not, this is a no-op. func (service *HTTPRestService) mustGenerateCNIConflistOnce() { service.generateCNIConflistOnce.Do(func() { - if service.ncSynced != nil { + service.Lock() //lock inside a do scary? + if service.ncSyncLoop { close(service.ncSynced) } + service.Unlock() if err := service.cniConflistGenerator.Generate(); err != nil { panic("unable to generate cni conflist with error: " + err.Error()) } @@ -202,9 +209,13 @@ func (service *HTTPRestService) mustGenerateCNIConflistOnce() { func (service *HTTPRestService) WaitForConfList(ctx context.Context) { //sync loop never set up get out of here. - if service.ncSynced == nil { - return - } + func() { + service.Lock() + defer service.Unlock() + if !service.ncSyncLoop { + return + } + }() select { case <-service.ncSynced: diff --git a/cns/service/main.go b/cns/service/main.go index da33e76ece..52f4a83360 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -1285,7 +1285,9 @@ func InitializeMultiTenantController(ctx context.Context, httpRestService cns.HT time.Sleep(time.Millisecond * 500) } - httpRestServiceImpl.StartSyncHostNCVersionLoop(ctx, cnsconfig) + if err := httpRestServiceImpl.StartSyncHostNCVersionLoop(ctx, cnsconfig); err != nil { + return err + } return nil } @@ -1630,7 +1632,9 @@ func InitializeCRDState(ctx context.Context, z *zap.Logger, httpRestService cns. break } - httpRestServiceImplementation.StartSyncHostNCVersionLoop(ctx, *cnsconfig) + if err := httpRestServiceImplementation.StartSyncHostNCVersionLoop(ctx, *cnsconfig); err != nil { + return err + } return nil } From 84fa79feb4b27a8626f4bf3764ba4ec3a23ce878 Mon Sep 17 00:00:00 2001 From: Paul Miller Date: Tue, 25 Nov 2025 06:31:18 +0000 Subject: [PATCH 06/17] sync immediatly and newticker --- cns/restserver/restserver.go | 3 ++- cns/restserver/synchostnc.go | 30 +++++++++++++----------------- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/cns/restserver/restserver.go b/cns/restserver/restserver.go index 1dafc03182..e1f030525e 100644 --- a/cns/restserver/restserver.go +++ b/cns/restserver/restserver.go @@ -6,6 +6,7 @@ import ( "net/http" "net/http/pprof" "sync" + "sync/atomic" "time" "github.com/Azure/azure-container-networking/cns" @@ -104,7 +105,7 @@ type HTTPRestService struct { nodesubnetIPFetcher *nodesubnet.IPFetcher //put in ncstate struct? ncSynced chan struct{} - ncSyncLoop bool + ncSyncLoop atomic.Bool } type CNIConflistGenerator interface { diff --git a/cns/restserver/synchostnc.go b/cns/restserver/synchostnc.go index 6182bc68a5..3d8b463a35 100644 --- a/cns/restserver/synchostnc.go +++ b/cns/restserver/synchostnc.go @@ -18,20 +18,21 @@ import ( //TODO make this file a sub pacakge? func (service *HTTPRestService) StartSyncHostNCVersionLoop(ctx context.Context, cnsconfig configuration.CNSConfig) error { - service.Lock() //could use a seperate lock or atomic bool. - defer service.Unlock() - if service.ncSyncLoop { + if !service.ncSyncLoop.CompareAndSwap(false, true) { return errors.New("SyncHostNCVersion loop already started") } - service.ncSyncLoop = true go func() { logger.Printf("Starting SyncHostNCVersion loop.") // Periodically poll vfp programmed NC version from NMAgent - tickerChannel := time.Tick(time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond) + ticker := time.NewTicker(time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond) + timeout := time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond for { + timedCtx, cancel := context.WithTimeout(ctx, timeout) + service.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode) + cancel() select { - case <-tickerChannel: - timedCtx, cancel := context.WithTimeout(ctx, time.Duration(cnsconfig.SyncHostNCVersionIntervalMs)*time.Millisecond) + case <-ticker.C: + timedCtx, cancel := context.WithTimeout(ctx, timeout) service.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode) cancel() case <-ctx.Done(): @@ -43,6 +44,7 @@ func (service *HTTPRestService) StartSyncHostNCVersionLoop(ctx context.Context, return nil } +// TODO: lowercase/unexport this function drive everything through // SyncHostNCVersion will check NC version from NMAgent and save it as host NC version in container status. // If NMAgent NC version got updated, CNS will refresh the pending programming IP status. func (service *HTTPRestService) SyncHostNCVersion(ctx context.Context, channelMode string) { @@ -192,11 +194,9 @@ func (service *HTTPRestService) syncHostNCVersion(ctx context.Context, channelMo // a conflist generator. If not, this is a no-op. func (service *HTTPRestService) mustGenerateCNIConflistOnce() { service.generateCNIConflistOnce.Do(func() { - service.Lock() //lock inside a do scary? - if service.ncSyncLoop { + if service.ncSyncLoop.Load() { close(service.ncSynced) } - service.Unlock() if err := service.cniConflistGenerator.Generate(); err != nil { panic("unable to generate cni conflist with error: " + err.Error()) } @@ -209,13 +209,9 @@ func (service *HTTPRestService) mustGenerateCNIConflistOnce() { func (service *HTTPRestService) WaitForConfList(ctx context.Context) { //sync loop never set up get out of here. - func() { - service.Lock() - defer service.Unlock() - if !service.ncSyncLoop { - return - } - }() + if !service.ncSyncLoop.Load() { + return + } select { case <-service.ncSynced: From bc1401fc80f52a409cf147bddf10a4ec4fd0c750 Mon Sep 17 00:00:00 2001 From: Paul Miller Date: Tue, 25 Nov 2025 08:40:05 -0800 Subject: [PATCH 07/17] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Paul Miller --- cns/restserver/synchostnc.go | 2 +- cns/service/main.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cns/restserver/synchostnc.go b/cns/restserver/synchostnc.go index 3d8b463a35..ce51617a0a 100644 --- a/cns/restserver/synchostnc.go +++ b/cns/restserver/synchostnc.go @@ -208,7 +208,7 @@ func (service *HTTPRestService) mustGenerateCNIConflistOnce() { } func (service *HTTPRestService) WaitForConfList(ctx context.Context) { - //sync loop never set up get out of here. + // Sync loop never set up, get out of here. if !service.ncSyncLoop.Load() { return } diff --git a/cns/service/main.go b/cns/service/main.go index 52f4a83360..fa13a62b26 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -1118,10 +1118,10 @@ func main() { httpRemoteRestService.StartNodeSubnet(rootCtx) } - //we wrote out the conflist right? + // Wait for NC sync to complete before marking service as ready. httpRemoteRestService.WaitForConfList(rootCtx) // mark the service as "ready" - close(readyCh) //will still block if root ctx is canceled? + close(readyCh) // This will not block, even if rootCtx is canceled. // block until process exiting <-rootCtx.Done() From 6c3c9f59d15b2259e671b827a4e6627f9b020300 Mon Sep 17 00:00:00 2001 From: Paul Miller Date: Wed, 26 Nov 2025 01:25:11 +0000 Subject: [PATCH 08/17] final cleanup --- cns/healthserver/healthz.go | 1 - cns/restserver/nodesubnet.go | 4 +--- cns/restserver/synchostnc.go | 2 +- cns/service/main.go | 2 +- 4 files changed, 3 insertions(+), 6 deletions(-) diff --git a/cns/healthserver/healthz.go b/cns/healthserver/healthz.go index 97ad2ae093..bc1b90f444 100644 --- a/cns/healthserver/healthz.go +++ b/cns/healthserver/healthz.go @@ -70,7 +70,6 @@ func NewHealthzHandlerWithChecks(cfg *Config) (http.Handler, error) { return nil } } - return &healthz.Handler{ Checks: checks, }, nil diff --git a/cns/restserver/nodesubnet.go b/cns/restserver/nodesubnet.go index 6651c47ca0..e215d38c86 100644 --- a/cns/restserver/nodesubnet.go +++ b/cns/restserver/nodesubnet.go @@ -29,9 +29,7 @@ func (service *HTTPRestService) UpdateIPsForNodeSubnet(secondaryIPs []netip.Addr logger.Debugf("IP change processed successfully") - // saved NC successfully. UpdateIPsForNodeSubnet is called only when IPs are fetched from NMAgent. - // We now have IPs to serve IPAM requests. Generate conflist to indicate CNS is ready - service.ncSynced = make(chan struct{}) // in case this is called multiple times + //encapsulation viloation. Better if this called something equivalent to StartSyncHostNCVersionLoop service.mustGenerateCNIConflistOnce() return nil } diff --git a/cns/restserver/synchostnc.go b/cns/restserver/synchostnc.go index ce51617a0a..030c36775f 100644 --- a/cns/restserver/synchostnc.go +++ b/cns/restserver/synchostnc.go @@ -15,7 +15,7 @@ import ( "github.com/pkg/errors" ) -//TODO make this file a sub pacakge? +// TODO: make this file a sub pacakge? func (service *HTTPRestService) StartSyncHostNCVersionLoop(ctx context.Context, cnsconfig configuration.CNSConfig) error { if !service.ncSyncLoop.CompareAndSwap(false, true) { diff --git a/cns/service/main.go b/cns/service/main.go index fa13a62b26..a2f61109a9 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -1121,7 +1121,7 @@ func main() { // Wait for NC sync to complete before marking service as ready. httpRemoteRestService.WaitForConfList(rootCtx) // mark the service as "ready" - close(readyCh) // This will not block, even if rootCtx is canceled. + close(readyCh) // block until process exiting <-rootCtx.Done() From dcb45f87e48db92e9687b004307e834f23ff8473 Mon Sep 17 00:00:00 2001 From: Paul Miller Date: Wed, 26 Nov 2025 06:14:17 +0000 Subject: [PATCH 09/17] node subnet needs to handle this too --- cns/restserver/helper_for_nodesubnet_test.go | 3 ++- cns/restserver/nodesubnet.go | 6 +++++- cns/restserver/nodesubnet_test.go | 4 +++- cns/service/main.go | 5 ++++- 4 files changed, 14 insertions(+), 4 deletions(-) diff --git a/cns/restserver/helper_for_nodesubnet_test.go b/cns/restserver/helper_for_nodesubnet_test.go index 5719757429..81be6f00d7 100644 --- a/cns/restserver/helper_for_nodesubnet_test.go +++ b/cns/restserver/helper_for_nodesubnet_test.go @@ -79,7 +79,8 @@ func GetRestServiceObjectForNodeSubnetTest(t *testing.T, generator CNIConflistGe return interfaces, nil }, }, - wscli: &fakes.WireserverClientFake{}, + wscli: &fakes.WireserverClientFake{}, + ncSynced: make(chan struct{}), } } diff --git a/cns/restserver/nodesubnet.go b/cns/restserver/nodesubnet.go index e215d38c86..8f16125f91 100644 --- a/cns/restserver/nodesubnet.go +++ b/cns/restserver/nodesubnet.go @@ -58,6 +58,10 @@ func (service *HTTPRestService) InitializeNodeSubnet(ctx context.Context, podInf // StartNodeSubnet starts the IP fetcher for NodeSubnet. This will cause secondary IPs to be fetched periodically. // After the first successful fetch, conflist will be generated to indicate CNS is ready. -func (service *HTTPRestService) StartNodeSubnet(ctx context.Context) { +func (service *HTTPRestService) StartNodeSubnet(ctx context.Context) error { + if !service.ncSyncLoop.CompareAndSwap(false, true) { + return errors.New("SyncHostNCVersion loop already started") + } service.nodesubnetIPFetcher.Start(ctx) + return nil } diff --git a/cns/restserver/nodesubnet_test.go b/cns/restserver/nodesubnet_test.go index 361f5d005b..181eacd344 100644 --- a/cns/restserver/nodesubnet_test.go +++ b/cns/restserver/nodesubnet_test.go @@ -96,7 +96,9 @@ func TestNodeSubnet(t *testing.T) { checkIPassignment(t, service, expectedIPs) - service.StartNodeSubnet(ctx) + if err := service.StartNodeSubnet(ctx); err != nil { + t.Fatalf("StartNodeSubnet returned an error: %v", err) + } if service.GetNodesubnetIPFetcher() == nil { t.Fatal("NodeSubnetIPFetcher is not initialized") diff --git a/cns/service/main.go b/cns/service/main.go index a2f61109a9..4a5024e389 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -1115,7 +1115,10 @@ func main() { // at this point, rest service is running. We can now start serving new requests. So call StartNodeSubnet, which // will fetch secondary IPs and generate conflist. Do not move this all before rest service start - this will cause // CNI to start sending requests, and if the service doesn't start successfully, the requests will fail. - httpRemoteRestService.StartNodeSubnet(rootCtx) + if err := httpRemoteRestService.StartNodeSubnet(rootCtx); err != nil { + logger.Errorf("Failed to start NodeSubnet: %v", err) + return + } } // Wait for NC sync to complete before marking service as ready. From 5044c81baec7a278cb78c2204197e1a5d20c8c7d Mon Sep 17 00:00:00 2001 From: Paul Miller Date: Wed, 26 Nov 2025 06:58:42 +0000 Subject: [PATCH 10/17] try and encapsulate a little more --- cns/restserver/helper_for_nodesubnet_test.go | 4 +- cns/restserver/nodesubnet.go | 4 +- cns/restserver/restserver.go | 12 ++-- cns/restserver/synchostnc.go | 69 ++++++++++++++------ cns/service/main.go | 2 +- 5 files changed, 62 insertions(+), 29 deletions(-) diff --git a/cns/restserver/helper_for_nodesubnet_test.go b/cns/restserver/helper_for_nodesubnet_test.go index 81be6f00d7..e7aa459e7f 100644 --- a/cns/restserver/helper_for_nodesubnet_test.go +++ b/cns/restserver/helper_for_nodesubnet_test.go @@ -79,8 +79,8 @@ func GetRestServiceObjectForNodeSubnetTest(t *testing.T, generator CNIConflistGe return interfaces, nil }, }, - wscli: &fakes.WireserverClientFake{}, - ncSynced: make(chan struct{}), + wscli: &fakes.WireserverClientFake{}, + ncSyncState: &NetworkContainerSyncState{}, } } diff --git a/cns/restserver/nodesubnet.go b/cns/restserver/nodesubnet.go index 8f16125f91..d96beef4dc 100644 --- a/cns/restserver/nodesubnet.go +++ b/cns/restserver/nodesubnet.go @@ -59,8 +59,8 @@ func (service *HTTPRestService) InitializeNodeSubnet(ctx context.Context, podInf // StartNodeSubnet starts the IP fetcher for NodeSubnet. This will cause secondary IPs to be fetched periodically. // After the first successful fetch, conflist will be generated to indicate CNS is ready. func (service *HTTPRestService) StartNodeSubnet(ctx context.Context) error { - if !service.ncSyncLoop.CompareAndSwap(false, true) { - return errors.New("SyncHostNCVersion loop already started") + if err := service.ncSyncState.Start(); err != nil { + return err } service.nodesubnetIPFetcher.Start(ctx) return nil diff --git a/cns/restserver/restserver.go b/cns/restserver/restserver.go index e1f030525e..e0373ad77f 100644 --- a/cns/restserver/restserver.go +++ b/cns/restserver/restserver.go @@ -6,7 +6,6 @@ import ( "net/http" "net/http/pprof" "sync" - "sync/atomic" "time" "github.com/Azure/azure-container-networking/cns" @@ -103,9 +102,7 @@ type HTTPRestService struct { PnpIDByMacAddress map[string]string imdsClient imdsClient nodesubnetIPFetcher *nodesubnet.IPFetcher - //put in ncstate struct? - ncSynced chan struct{} - ncSyncLoop atomic.Bool + ncSyncState *NetworkContainerSyncState } type CNIConflistGenerator interface { @@ -256,7 +253,7 @@ func NewHTTPRestService(config *common.ServiceConfig, wscli interfaceGetter, wsp homeAzMonitor: homeAzMonitor, cniConflistGenerator: gen, imdsClient: imdsClient, - ncSynced: make(chan struct{}), + ncSyncState: &NetworkContainerSyncState{}, }, nil } @@ -390,3 +387,8 @@ func (service *HTTPRestService) Stop() { func (service *HTTPRestService) AttachIPConfigsHandlerMiddleware(middleware cns.IPConfigsHandlerMiddleware) { service.IPConfigsHandlerMiddleware = middleware } + +// GetNetworkContainerSyncState returns the NetworkContainerSyncState for external use. +func (service *HTTPRestService) Wait(ctx context.Context) { + service.ncSyncState.WaitForConfList(ctx) +} diff --git a/cns/restserver/synchostnc.go b/cns/restserver/synchostnc.go index 030c36775f..cc0653b78d 100644 --- a/cns/restserver/synchostnc.go +++ b/cns/restserver/synchostnc.go @@ -7,6 +7,7 @@ import ( "context" "strconv" "strings" + "sync/atomic" "time" "github.com/Azure/azure-container-networking/cns" @@ -17,9 +18,54 @@ import ( // TODO: make this file a sub pacakge? +// NetworkContainerSyncState manages the synchronization state for network container operations. +type NetworkContainerSyncState struct { + ncSynced chan struct{} + ncSyncLoop atomic.Bool +} + +// Start attempts to start the sync loop. Returns an error if already started. +func (n *NetworkContainerSyncState) Start() error { + if n == nil { + return errors.New("NetworkContainerSyncState is nil") + } + if !n.ncSyncLoop.CompareAndSwap(false, true) { + return errors.New("sync loop already started") + } + n.ncSynced = make(chan struct{}) + return nil +} + +// NotifyReady closes the ncSynced channel to signal readiness. +func (n *NetworkContainerSyncState) NotifyReady() { + if n == nil || !n.ncSyncLoop.Load() { + return //nobody ever set this up just move on. + } + close(n.ncSynced) +} + +// WaitForConfList waits for the CNI conflist to be ready or for the context to be done. +func (n *NetworkContainerSyncState) WaitForConfList(ctx context.Context) { + if n == nil { + return //do nothing if we never got intiialized. + } + + // Sync loop never set up, get out of here. + if n.ncSyncLoop.Load() { + return + } + + select { + case <-n.ncSynced: + return + case <-ctx.Done(): + return + } +} + func (service *HTTPRestService) StartSyncHostNCVersionLoop(ctx context.Context, cnsconfig configuration.CNSConfig) error { - if !service.ncSyncLoop.CompareAndSwap(false, true) { - return errors.New("SyncHostNCVersion loop already started") + if err := service.ncSyncState.Start(); err != nil { + return err } go func() { logger.Printf("Starting SyncHostNCVersion loop.") @@ -194,9 +240,7 @@ func (service *HTTPRestService) syncHostNCVersion(ctx context.Context, channelMo // a conflist generator. If not, this is a no-op. func (service *HTTPRestService) mustGenerateCNIConflistOnce() { service.generateCNIConflistOnce.Do(func() { - if service.ncSyncLoop.Load() { - close(service.ncSynced) - } + if err := service.cniConflistGenerator.Generate(); err != nil { panic("unable to generate cni conflist with error: " + err.Error()) } @@ -204,19 +248,6 @@ func (service *HTTPRestService) mustGenerateCNIConflistOnce() { if err := service.cniConflistGenerator.Close(); err != nil { panic("unable to close the cni conflist output stream: " + err.Error()) } + service.ncSyncState.NotifyReady() }) } - -func (service *HTTPRestService) WaitForConfList(ctx context.Context) { - // Sync loop never set up, get out of here. - if !service.ncSyncLoop.Load() { - return - } - - select { - case <-service.ncSynced: - return - case <-ctx.Done(): - return - } -} diff --git a/cns/service/main.go b/cns/service/main.go index 4a5024e389..59f1ae4a54 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -1122,7 +1122,7 @@ func main() { } // Wait for NC sync to complete before marking service as ready. - httpRemoteRestService.WaitForConfList(rootCtx) + httpRemoteRestService.Wait(rootCtx) // mark the service as "ready" close(readyCh) // block until process exiting From 6c82f7a2bff38041bf8d4157c00f4fcfbfc6b77d Mon Sep 17 00:00:00 2001 From: Paul Miller Date: Wed, 26 Nov 2025 17:21:19 +0000 Subject: [PATCH 11/17] use a wait group and simplify down --- cns/restserver/helper_for_nodesubnet_test.go | 3 +- cns/restserver/restserver.go | 7 ++- cns/restserver/synchostnc.go | 51 +++++++++----------- 3 files changed, 27 insertions(+), 34 deletions(-) diff --git a/cns/restserver/helper_for_nodesubnet_test.go b/cns/restserver/helper_for_nodesubnet_test.go index e7aa459e7f..5719757429 100644 --- a/cns/restserver/helper_for_nodesubnet_test.go +++ b/cns/restserver/helper_for_nodesubnet_test.go @@ -79,8 +79,7 @@ func GetRestServiceObjectForNodeSubnetTest(t *testing.T, generator CNIConflistGe return interfaces, nil }, }, - wscli: &fakes.WireserverClientFake{}, - ncSyncState: &NetworkContainerSyncState{}, + wscli: &fakes.WireserverClientFake{}, } } diff --git a/cns/restserver/restserver.go b/cns/restserver/restserver.go index e0373ad77f..807a523a81 100644 --- a/cns/restserver/restserver.go +++ b/cns/restserver/restserver.go @@ -102,7 +102,7 @@ type HTTPRestService struct { PnpIDByMacAddress map[string]string imdsClient imdsClient nodesubnetIPFetcher *nodesubnet.IPFetcher - ncSyncState *NetworkContainerSyncState + ncSyncState networkContainerSyncState } type CNIConflistGenerator interface { @@ -253,7 +253,6 @@ func NewHTTPRestService(config *common.ServiceConfig, wscli interfaceGetter, wsp homeAzMonitor: homeAzMonitor, cniConflistGenerator: gen, imdsClient: imdsClient, - ncSyncState: &NetworkContainerSyncState{}, }, nil } @@ -388,7 +387,7 @@ func (service *HTTPRestService) AttachIPConfigsHandlerMiddleware(middleware cns. service.IPConfigsHandlerMiddleware = middleware } -// GetNetworkContainerSyncState returns the NetworkContainerSyncState for external use. +// Wait waits for the NetworkContainerSyncState to be ready or for the context to be done. func (service *HTTPRestService) Wait(ctx context.Context) { - service.ncSyncState.WaitForConfList(ctx) + service.ncSyncState.Wait(ctx) } diff --git a/cns/restserver/synchostnc.go b/cns/restserver/synchostnc.go index cc0653b78d..e2c41cff72 100644 --- a/cns/restserver/synchostnc.go +++ b/cns/restserver/synchostnc.go @@ -7,6 +7,7 @@ import ( "context" "strconv" "strings" + "sync" "sync/atomic" "time" @@ -18,48 +19,42 @@ import ( // TODO: make this file a sub pacakge? -// NetworkContainerSyncState manages the synchronization state for network container operations. -type NetworkContainerSyncState struct { - ncSynced chan struct{} - ncSyncLoop atomic.Bool +// NetworkContainerSyncState manages waiting on conflist to ge +// Basically a wait group that can only be added once and waits with a context +// meant to be used uninitialized and then started once the sync loop begins. +type networkContainerSyncState struct { + wg sync.WaitGroup + started atomic.Bool } // Start attempts to start the sync loop. Returns an error if already started. -func (n *NetworkContainerSyncState) Start() error { - if n == nil { - return errors.New("NetworkContainerSyncState is nil") - } - if !n.ncSyncLoop.CompareAndSwap(false, true) { +func (n *networkContainerSyncState) Start() error { + if !n.started.CompareAndSwap(false, true) { return errors.New("sync loop already started") } - n.ncSynced = make(chan struct{}) + n.wg.Add(1) return nil } -// NotifyReady closes the ncSynced channel to signal readiness. -func (n *NetworkContainerSyncState) NotifyReady() { - if n == nil || !n.ncSyncLoop.Load() { +// NotifyReady called once +func (n *networkContainerSyncState) NotifyReady() { + if !n.started.Load() { return //nobody ever set this up just move on. } - close(n.ncSynced) + n.wg.Done() } -// WaitForConfList waits for the CNI conflist to be ready or for the context to be done. -func (n *NetworkContainerSyncState) WaitForConfList(ctx context.Context) { - if n == nil { - return //do nothing if we never got intiialized. - } - - // Sync loop never set up, get out of here. - if n.ncSyncLoop.Load() { - return - } +// Wait waits for the CNI conflist to be ready or for the context to be done. +func (n *networkContainerSyncState) Wait(ctx context.Context) { + done := make(chan struct{}) + go func() { + n.wg.Wait() + close(done) + }() select { - case <-n.ncSynced: - return + case <-done: case <-ctx.Done(): - return } } @@ -90,7 +85,7 @@ func (service *HTTPRestService) StartSyncHostNCVersionLoop(ctx context.Context, return nil } -// TODO: lowercase/unexport this function drive everything through +// TODO: lowercase/unexport this function drive everything through StartSyncHostNCVersionLoop? // SyncHostNCVersion will check NC version from NMAgent and save it as host NC version in container status. // If NMAgent NC version got updated, CNS will refresh the pending programming IP status. func (service *HTTPRestService) SyncHostNCVersion(ctx context.Context, channelMode string) { From d0029fb50aafbf7fe843b14cfbb908ae5dd41f16 Mon Sep 17 00:00:00 2001 From: Paul Miller Date: Wed, 26 Nov 2025 18:18:44 +0000 Subject: [PATCH 12/17] move conflist genation to wait --- cns/restserver/internalapi_test.go | 42 +++++++++++++++----------- cns/restserver/nodesubnet.go | 3 +- cns/restserver/nodesubnet_test.go | 8 +---- cns/restserver/restserver.go | 14 +++++++-- cns/restserver/synchostnc.go | 47 ++++++++++-------------------- 5 files changed, 54 insertions(+), 60 deletions(-) diff --git a/cns/restserver/internalapi_test.go b/cns/restserver/internalapi_test.go index 29bd5b2e30..df9640fc89 100644 --- a/cns/restserver/internalapi_test.go +++ b/cns/restserver/internalapi_test.go @@ -1345,6 +1345,11 @@ func (m *mockCNIConflistGenerator) getGeneratedCount() int { return m.generatedCount } +var fastcnsconf = configuration.CNSConfig{ + SyncHostNCVersionIntervalMs: 100, + ChannelMode: cns.CRD, +} + // TestCNIConflistGenerationNewNC tests that discovering a new programmed NC in CNS state will trigger CNI conflist generation func TestCNIConflistGenerationNewNC(t *testing.T) { ncID := "some-new-nc" //nolint:goconst // value not shared across tests, can change without issue @@ -1380,9 +1385,10 @@ func TestCNIConflistGenerationNewNC(t *testing.T) { imdsClient: fakes.NewMockIMDSClient(), } - service.SyncHostNCVersion(context.Background(), cns.CRD) - // CNI conflist gen happens in goroutine so sleep for a second to let it run - time.Sleep(time.Second) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + service.StartSyncHostNCVersionLoop(ctx, fastcnsconf) + service.Wait(ctx) assert.Equal(t, 1, mockgen.getGeneratedCount()) } @@ -1421,9 +1427,10 @@ func TestCNIConflistGenerationExistingNC(t *testing.T) { imdsClient: fakes.NewMockIMDSClient(), } - service.SyncHostNCVersion(context.Background(), cns.CRD) - // CNI conflist gen happens in goroutine so sleep for a second to let it run - time.Sleep(time.Second) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + service.StartSyncHostNCVersionLoop(ctx, fastcnsconf) + service.Wait(ctx) assert.Equal(t, 1, mockgen.getGeneratedCount()) } @@ -1463,12 +1470,11 @@ func TestCNIConflistGenerationNewNCTwice(t *testing.T) { imdsClient: fakes.NewMockIMDSClient(), } - service.SyncHostNCVersion(context.Background(), cns.CRD) - // CNI conflist gen happens in goroutine so sleep for a second to let it run - time.Sleep(time.Second) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + service.StartSyncHostNCVersionLoop(ctx, fastcnsconf) + service.Wait(ctx) assert.Equal(t, 1, mockgen.getGeneratedCount()) - - service.SyncHostNCVersion(context.Background(), cns.CRD) // CNI conflist gen happens in goroutine so sleep for a second to let it run time.Sleep(time.Second) assert.Equal(t, 1, mockgen.getGeneratedCount()) // should still be one @@ -1502,9 +1508,10 @@ func TestCNIConflistNotGenerated(t *testing.T) { imdsClient: fakes.NewMockIMDSClient(), } - service.SyncHostNCVersion(context.Background(), cns.CRD) - // CNI conflist gen happens in goroutine so sleep for a second to let it run - time.Sleep(time.Second) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + service.StartSyncHostNCVersionLoop(ctx, fastcnsconf) + service.Wait(ctx) assert.Equal(t, 0, mockgen.getGeneratedCount()) } @@ -1545,9 +1552,10 @@ func TestCNIConflistGenerationOnNMAError(t *testing.T) { imdsClient: fakes.NewMockIMDSClient(), } - service.SyncHostNCVersion(context.Background(), cns.CRD) - // CNI conflist gen happens in goroutine so sleep for a second to let it run - time.Sleep(time.Second) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + service.StartSyncHostNCVersionLoop(ctx, fastcnsconf) + service.Wait(ctx) assert.Equal(t, 1, mockgen.getGeneratedCount()) } diff --git a/cns/restserver/nodesubnet.go b/cns/restserver/nodesubnet.go index d96beef4dc..6d0c73c2d9 100644 --- a/cns/restserver/nodesubnet.go +++ b/cns/restserver/nodesubnet.go @@ -29,8 +29,7 @@ func (service *HTTPRestService) UpdateIPsForNodeSubnet(secondaryIPs []netip.Addr logger.Debugf("IP change processed successfully") - //encapsulation viloation. Better if this called something equivalent to StartSyncHostNCVersionLoop - service.mustGenerateCNIConflistOnce() + service.ncSyncState.NotifyReady() return nil } diff --git a/cns/restserver/nodesubnet_test.go b/cns/restserver/nodesubnet_test.go index 181eacd344..2e935fe88f 100644 --- a/cns/restserver/nodesubnet_test.go +++ b/cns/restserver/nodesubnet_test.go @@ -104,13 +104,7 @@ func TestNodeSubnet(t *testing.T) { t.Fatal("NodeSubnetIPFetcher is not initialized") } - select { - case <-ctx.Done(): - t.Errorf("test context done - %s", ctx.Err()) - return - case <-mockCNIConflistGenerator.GenerateCalled: - break - } + service.Wait(ctx) expectedIPs["10.0.0.45"] = types.Available checkIPassignment(t, service, expectedIPs) diff --git a/cns/restserver/restserver.go b/cns/restserver/restserver.go index 807a523a81..8c1250354c 100644 --- a/cns/restserver/restserver.go +++ b/cns/restserver/restserver.go @@ -97,7 +97,6 @@ type HTTPRestService struct { EndpointState map[string]*EndpointInfo // key : container id EndpointStateStore store.KeyValueStore cniConflistGenerator CNIConflistGenerator - generateCNIConflistOnce sync.Once IPConfigsHandlerMiddleware cns.IPConfigsHandlerMiddleware PnpIDByMacAddress map[string]string imdsClient imdsClient @@ -387,7 +386,18 @@ func (service *HTTPRestService) AttachIPConfigsHandlerMiddleware(middleware cns. service.IPConfigsHandlerMiddleware = middleware } -// Wait waits for the NetworkContainerSyncState to be ready or for the context to be done. +// Wait waits for nc sync state then writes out the conflist. func (service *HTTPRestService) Wait(ctx context.Context) { service.ncSyncState.Wait(ctx) + if ctx.Err() != nil { + logger.Printf("Context done before writing out conflist: %v", ctx.Err()) + return + } + if err := service.cniConflistGenerator.Generate(); err != nil { + panic("unable to generate cni conflist with error: " + err.Error()) + } + + if err := service.cniConflistGenerator.Close(); err != nil { + panic("unable to close the cni conflist output stream: " + err.Error()) + } } diff --git a/cns/restserver/synchostnc.go b/cns/restserver/synchostnc.go index e2c41cff72..08afdca6ba 100644 --- a/cns/restserver/synchostnc.go +++ b/cns/restserver/synchostnc.go @@ -27,7 +27,7 @@ type networkContainerSyncState struct { started atomic.Bool } -// Start attempts to start the sync loop. Returns an error if already started. +// Start is like add except it only allows being called once. func (n *networkContainerSyncState) Start() error { if !n.started.CompareAndSwap(false, true) { return errors.New("sync loop already started") @@ -36,7 +36,7 @@ func (n *networkContainerSyncState) Start() error { return nil } -// NotifyReady called once +// NotifyReady is like Done but will ignore if Start was never called. func (n *networkContainerSyncState) NotifyReady() { if !n.started.Load() { return //nobody ever set this up just move on. @@ -48,7 +48,7 @@ func (n *networkContainerSyncState) NotifyReady() { func (n *networkContainerSyncState) Wait(ctx context.Context) { done := make(chan struct{}) go func() { - n.wg.Wait() + n.wg.Wait() //still fine to wait even if never started will just return immediately close(done) }() @@ -58,23 +58,29 @@ func (n *networkContainerSyncState) Wait(ctx context.Context) { } } +// StartSyncHostNCVersionLoop loops until NCs are programmed and conflist is written The node subnet equivalent isStartNodeSubnet func (service *HTTPRestService) StartSyncHostNCVersionLoop(ctx context.Context, cnsconfig configuration.CNSConfig) error { if err := service.ncSyncState.Start(); err != nil { return err } go func() { + var one sync.Once logger.Printf("Starting SyncHostNCVersion loop.") // Periodically poll vfp programmed NC version from NMAgent ticker := time.NewTicker(time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond) timeout := time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond for { timedCtx, cancel := context.WithTimeout(ctx, timeout) - service.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode) + if service.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode) { + one.Do(service.ncSyncState.NotifyReady) + } cancel() select { case <-ticker.C: timedCtx, cancel := context.WithTimeout(ctx, timeout) - service.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode) + if service.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode) { + one.Do(service.ncSyncState.NotifyReady) + } cancel() case <-ctx.Done(): logger.Printf("Stopping SyncHostNCVersion loop.") @@ -88,27 +94,20 @@ func (service *HTTPRestService) StartSyncHostNCVersionLoop(ctx context.Context, // TODO: lowercase/unexport this function drive everything through StartSyncHostNCVersionLoop? // SyncHostNCVersion will check NC version from NMAgent and save it as host NC version in container status. // If NMAgent NC version got updated, CNS will refresh the pending programming IP status. -func (service *HTTPRestService) SyncHostNCVersion(ctx context.Context, channelMode string) { +// returns true if soemthing was progammeed +func (service *HTTPRestService) SyncHostNCVersion(ctx context.Context, channelMode string) bool { service.Lock() defer service.Unlock() start := time.Now() programmedNCCount, err := service.syncHostNCVersion(ctx, channelMode) - - // even if we get an error, we want to write the CNI conflist if we have any NC programmed to any version - if programmedNCCount > 0 { - // This will only be done once per lifetime of the CNS process. This function is threadsafe and will panic - // if it fails, so it is safe to call in a non-preemptable goroutine. - go service.mustGenerateCNIConflistOnce() - } else { - logger.Printf("No NCs programmed on this host yet, skipping CNI conflist generation") - } - if err != nil { logger.Errorf("sync host error %v", err) } syncHostNCVersionCount.WithLabelValues(strconv.FormatBool(err == nil)).Inc() + //does not include time to write out conflist. syncHostNCVersionLatency.WithLabelValues(strconv.FormatBool(err == nil)).Observe(time.Since(start).Seconds()) + return programmedNCCount > 0 } var errNonExistentContainerStatus = errors.New("nonExistantContainerstatus") @@ -230,19 +229,3 @@ func (service *HTTPRestService) syncHostNCVersion(ctx context.Context, channelMo return len(programmedNCs), nil } - -// MustGenerateCNIConflistOnce will generate the CNI conflist once if the service was initialized with -// a conflist generator. If not, this is a no-op. -func (service *HTTPRestService) mustGenerateCNIConflistOnce() { - service.generateCNIConflistOnce.Do(func() { - - if err := service.cniConflistGenerator.Generate(); err != nil { - panic("unable to generate cni conflist with error: " + err.Error()) - } - - if err := service.cniConflistGenerator.Close(); err != nil { - panic("unable to close the cni conflist output stream: " + err.Error()) - } - service.ncSyncState.NotifyReady() - }) -} From 07b200d73237ea96c179d42c4c5d6103fa9b98f6 Mon Sep 17 00:00:00 2001 From: Paul Miller Date: Wed, 26 Nov 2025 21:07:43 +0000 Subject: [PATCH 13/17] hide more methods drive more coverage --- cns/restserver/internalapi_test.go | 12 +++++++++--- cns/restserver/synchostnc.go | 6 +++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/cns/restserver/internalapi_test.go b/cns/restserver/internalapi_test.go index df9640fc89..c28c9f4249 100644 --- a/cns/restserver/internalapi_test.go +++ b/cns/restserver/internalapi_test.go @@ -304,7 +304,7 @@ func TestCreateAndUpdateNCWithSecondaryIPNCVersion(t *testing.T) { } } -func TestSyncHostNCVersion(t *testing.T) { +func TestStartSyncHostNCVersionLoop(t *testing.T) { // cns.KubernetesCRD has one more logic compared to other orchestrator type, so test both of them orchestratorTypes := []string{cns.Kubernetes, cns.KubernetesCRD} for _, orchestratorType := range orchestratorTypes { @@ -349,7 +349,12 @@ func TestSyncHostNCVersion(t *testing.T) { defer cleanupIMDS() // When syncing the host NC version, it will use the orchestratorType passed in. - svc.SyncHostNCVersion(context.Background(), orchestratorType) + cnsconf := configuration.CNSConfig{ + SyncHostNCVersionIntervalMs: 100, + ChannelMode: orchestratorType, + } + svc.StartSyncHostNCVersionLoop(t.Context(), cnsconf) + svc.ncSyncState.Wait(t.Context()) // wait for at leat one run containerStatus = svc.state.ContainerStatus[req.NetworkContainerid] if containerStatus.HostVersion != "0" { t.Errorf("Unexpected containerStatus.HostVersion %s, expected host version should be 0 in string", containerStatus.HostVersion) @@ -400,7 +405,8 @@ func TestPendingIPsGotUpdatedWhenSyncHostNCVersion(t *testing.T) { cleanup := setMockNMAgent(svc, mnma) defer cleanup() - svc.SyncHostNCVersion(context.Background(), cns.CRD) + svc.StartSyncHostNCVersionLoop(t.Context(), fastcnsconf) + svc.ncSyncState.Wait(t.Context()) // wait for at leat one run containerStatus = svc.state.ContainerStatus[req.NetworkContainerid] receivedSecondaryIPConfigs = containerStatus.CreateNetworkContainerRequest.SecondaryIPConfigs diff --git a/cns/restserver/synchostnc.go b/cns/restserver/synchostnc.go index 08afdca6ba..4af62b4576 100644 --- a/cns/restserver/synchostnc.go +++ b/cns/restserver/synchostnc.go @@ -71,14 +71,14 @@ func (service *HTTPRestService) StartSyncHostNCVersionLoop(ctx context.Context, timeout := time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond for { timedCtx, cancel := context.WithTimeout(ctx, timeout) - if service.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode) { + if service.syncHostNCVersionWrapper(timedCtx, cnsconfig.ChannelMode) { one.Do(service.ncSyncState.NotifyReady) } cancel() select { case <-ticker.C: timedCtx, cancel := context.WithTimeout(ctx, timeout) - if service.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode) { + if service.syncHostNCVersionWrapper(timedCtx, cnsconfig.ChannelMode) { one.Do(service.ncSyncState.NotifyReady) } cancel() @@ -95,7 +95,7 @@ func (service *HTTPRestService) StartSyncHostNCVersionLoop(ctx context.Context, // SyncHostNCVersion will check NC version from NMAgent and save it as host NC version in container status. // If NMAgent NC version got updated, CNS will refresh the pending programming IP status. // returns true if soemthing was progammeed -func (service *HTTPRestService) SyncHostNCVersion(ctx context.Context, channelMode string) bool { +func (service *HTTPRestService) syncHostNCVersionWrapper(ctx context.Context, channelMode string) bool { service.Lock() defer service.Unlock() start := time.Now() From 8233037ab51ff86d25c2c6dcd48df4c0d6325c94 Mon Sep 17 00:00:00 2001 From: Paul Miller Date: Wed, 26 Nov 2025 21:08:59 +0000 Subject: [PATCH 14/17] use public funcitons --- cns/restserver/internalapi_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cns/restserver/internalapi_test.go b/cns/restserver/internalapi_test.go index c28c9f4249..3be4b66fc9 100644 --- a/cns/restserver/internalapi_test.go +++ b/cns/restserver/internalapi_test.go @@ -354,7 +354,7 @@ func TestStartSyncHostNCVersionLoop(t *testing.T) { ChannelMode: orchestratorType, } svc.StartSyncHostNCVersionLoop(t.Context(), cnsconf) - svc.ncSyncState.Wait(t.Context()) // wait for at leat one run + svc.Wait(t.Context()) // wait for at leat one run containerStatus = svc.state.ContainerStatus[req.NetworkContainerid] if containerStatus.HostVersion != "0" { t.Errorf("Unexpected containerStatus.HostVersion %s, expected host version should be 0 in string", containerStatus.HostVersion) @@ -406,7 +406,7 @@ func TestPendingIPsGotUpdatedWhenSyncHostNCVersion(t *testing.T) { defer cleanup() svc.StartSyncHostNCVersionLoop(t.Context(), fastcnsconf) - svc.ncSyncState.Wait(t.Context()) // wait for at leat one run + svc.Wait(t.Context()) // wait for at leat one run containerStatus = svc.state.ContainerStatus[req.NetworkContainerid] receivedSecondaryIPConfigs = containerStatus.CreateNetworkContainerRequest.SecondaryIPConfigs From e00f9bb954890b41344ebeb1c457aee8361a9dde Mon Sep 17 00:00:00 2001 From: Paul Miller Date: Wed, 26 Nov 2025 21:17:10 +0000 Subject: [PATCH 15/17] comment fixing --- cns/restserver/synchostnc.go | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/cns/restserver/synchostnc.go b/cns/restserver/synchostnc.go index 4af62b4576..42e8dbf56e 100644 --- a/cns/restserver/synchostnc.go +++ b/cns/restserver/synchostnc.go @@ -58,7 +58,8 @@ func (n *networkContainerSyncState) Wait(ctx context.Context) { } } -// StartSyncHostNCVersionLoop loops until NCs are programmed and conflist is written The node subnet equivalent isStartNodeSubnet +// StartSyncHostNCVersionLoop loops until checking htat NCS are programmed annd also notifis when at least one has been programmed +// so we can write conflist and mark cns ready. func (service *HTTPRestService) StartSyncHostNCVersionLoop(ctx context.Context, cnsconfig configuration.CNSConfig) error { if err := service.ncSyncState.Start(); err != nil { return err @@ -70,18 +71,14 @@ func (service *HTTPRestService) StartSyncHostNCVersionLoop(ctx context.Context, ticker := time.NewTicker(time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond) timeout := time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond for { - timedCtx, cancel := context.WithTimeout(ctx, timeout) - if service.syncHostNCVersionWrapper(timedCtx, cnsconfig.ChannelMode) { + if service.syncHostNCVersionWrapper(ctx, cnsconfig.ChannelMode, timeout) { one.Do(service.ncSyncState.NotifyReady) } - cancel() select { case <-ticker.C: - timedCtx, cancel := context.WithTimeout(ctx, timeout) - if service.syncHostNCVersionWrapper(timedCtx, cnsconfig.ChannelMode) { + if service.syncHostNCVersionWrapper(ctx, cnsconfig.ChannelMode, timeout) { one.Do(service.ncSyncState.NotifyReady) } - cancel() case <-ctx.Done(): logger.Printf("Stopping SyncHostNCVersion loop.") return @@ -91,15 +88,15 @@ func (service *HTTPRestService) StartSyncHostNCVersionLoop(ctx context.Context, return nil } -// TODO: lowercase/unexport this function drive everything through StartSyncHostNCVersionLoop? -// SyncHostNCVersion will check NC version from NMAgent and save it as host NC version in container status. -// If NMAgent NC version got updated, CNS will refresh the pending programming IP status. -// returns true if soemthing was progammeed -func (service *HTTPRestService) syncHostNCVersionWrapper(ctx context.Context, channelMode string) bool { +// syncHostNCVersionWrapper bascially calls syncHostNCVersion but wraps it with locks a timeout and logges erros (but doesn't fail). +// Mostly exists so StartSyncHostNCVersionLoop doesn't have to repeat itself to be a do/while loop +func (service *HTTPRestService) syncHostNCVersionWrapper(ctx context.Context, channelMode string, timeout time.Duration) bool { + timedCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() service.Lock() defer service.Unlock() start := time.Now() - programmedNCCount, err := service.syncHostNCVersion(ctx, channelMode) + programmedNCCount, err := service.syncHostNCVersion(timedCtx, channelMode) if err != nil { logger.Errorf("sync host error %v", err) } From 2f8eabfba7c41e37b81a15a2e26ae35a613f1803 Mon Sep 17 00:00:00 2001 From: Paul Miller Date: Wed, 26 Nov 2025 21:26:41 +0000 Subject: [PATCH 16/17] check return values and use t.Context() --- cns/restserver/internalapi_test.go | 39 ++++++++++++++++++------------ 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/cns/restserver/internalapi_test.go b/cns/restserver/internalapi_test.go index 3be4b66fc9..b31f226f95 100644 --- a/cns/restserver/internalapi_test.go +++ b/cns/restserver/internalapi_test.go @@ -353,7 +353,8 @@ func TestStartSyncHostNCVersionLoop(t *testing.T) { SyncHostNCVersionIntervalMs: 100, ChannelMode: orchestratorType, } - svc.StartSyncHostNCVersionLoop(t.Context(), cnsconf) + err := svc.StartSyncHostNCVersionLoop(t.Context(), cnsconf) + assert.NoError(t, err) svc.Wait(t.Context()) // wait for at leat one run containerStatus = svc.state.ContainerStatus[req.NetworkContainerid] if containerStatus.HostVersion != "0" { @@ -405,7 +406,8 @@ func TestPendingIPsGotUpdatedWhenSyncHostNCVersion(t *testing.T) { cleanup := setMockNMAgent(svc, mnma) defer cleanup() - svc.StartSyncHostNCVersionLoop(t.Context(), fastcnsconf) + err := svc.StartSyncHostNCVersionLoop(t.Context(), fastcnsconf) + assert.NoError(t, err) svc.Wait(t.Context()) // wait for at leat one run containerStatus = svc.state.ContainerStatus[req.NetworkContainerid] @@ -446,7 +448,7 @@ func TestSyncHostNCVersionErrorMissingNC(t *testing.T) { cleanup := setMockNMAgent(svc, mnma) defer cleanup() - _, err := svc.syncHostNCVersion(context.Background(), cns.KubernetesCRD) + _, err := svc.syncHostNCVersion(t.Context(), cns.KubernetesCRD) if err == nil { t.Errorf("Expected error when NC is missing from both NMAgent and IMDS, but got nil") } @@ -491,7 +493,7 @@ func TestSyncHostNCVersionLocalVersionHigher(t *testing.T) { cleanup := setMockNMAgent(svc, mnma) defer cleanup() - _, err := svc.syncHostNCVersion(context.Background(), cns.KubernetesCRD) + _, err := svc.syncHostNCVersion(t.Context(), cns.KubernetesCRD) if err != nil { t.Errorf("Expected sync to succeed, but got error: %v", err) } @@ -534,7 +536,7 @@ func TestSyncHostNCVersionLocalHigherThanDNC(t *testing.T) { // This should detect that localNCVersion (3) > dncNCVersion (1) and log error // but since there are no outdated NCs, it should return successfully - _, err := svc.syncHostNCVersion(context.Background(), cns.KubernetesCRD) + _, err := svc.syncHostNCVersion(t.Context(), cns.KubernetesCRD) if err != nil { t.Errorf("Expected no error when localNCVersion > dncNCVersion (no outdated NCs), but got: %v", err) } @@ -607,7 +609,7 @@ func TestSyncHostNCVersionIMDSAPIVersionNotSupported(t *testing.T) { defer func() { svc.imdsClient = originalIMDS }() // Test should fail because of outdated IMDS NC that can't be updated - _, err := svc.syncHostNCVersion(context.Background(), orchestratorType) + _, err := svc.syncHostNCVersion(t.Context(), orchestratorType) if err == nil { t.Errorf("Expected error when there are outdated IMDS NCs but API version is not supported, but got nil") } @@ -1391,9 +1393,10 @@ func TestCNIConflistGenerationNewNC(t *testing.T) { imdsClient: fakes.NewMockIMDSClient(), } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*5) defer cancel() - service.StartSyncHostNCVersionLoop(ctx, fastcnsconf) + err := service.StartSyncHostNCVersionLoop(ctx, fastcnsconf) + assert.NoError(t, err) service.Wait(ctx) assert.Equal(t, 1, mockgen.getGeneratedCount()) } @@ -1433,9 +1436,10 @@ func TestCNIConflistGenerationExistingNC(t *testing.T) { imdsClient: fakes.NewMockIMDSClient(), } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*5) defer cancel() - service.StartSyncHostNCVersionLoop(ctx, fastcnsconf) + err := service.StartSyncHostNCVersionLoop(ctx, fastcnsconf) + assert.NoError(t, err) service.Wait(ctx) assert.Equal(t, 1, mockgen.getGeneratedCount()) } @@ -1476,9 +1480,10 @@ func TestCNIConflistGenerationNewNCTwice(t *testing.T) { imdsClient: fakes.NewMockIMDSClient(), } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*5) defer cancel() - service.StartSyncHostNCVersionLoop(ctx, fastcnsconf) + err := service.StartSyncHostNCVersionLoop(ctx, fastcnsconf) + assert.NoError(t, err) service.Wait(ctx) assert.Equal(t, 1, mockgen.getGeneratedCount()) // CNI conflist gen happens in goroutine so sleep for a second to let it run @@ -1514,9 +1519,10 @@ func TestCNIConflistNotGenerated(t *testing.T) { imdsClient: fakes.NewMockIMDSClient(), } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*5) defer cancel() - service.StartSyncHostNCVersionLoop(ctx, fastcnsconf) + err := service.StartSyncHostNCVersionLoop(ctx, fastcnsconf) + assert.NoError(t, err) service.Wait(ctx) assert.Equal(t, 0, mockgen.getGeneratedCount()) } @@ -1558,9 +1564,10 @@ func TestCNIConflistGenerationOnNMAError(t *testing.T) { imdsClient: fakes.NewMockIMDSClient(), } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*5) defer cancel() - service.StartSyncHostNCVersionLoop(ctx, fastcnsconf) + err := service.StartSyncHostNCVersionLoop(ctx, fastcnsconf) + assert.NoError(t, err) service.Wait(ctx) assert.Equal(t, 1, mockgen.getGeneratedCount()) } From c9ba36bb352196efa264160ae986a5aed671a814 Mon Sep 17 00:00:00 2001 From: Paul Miller Date: Wed, 26 Nov 2025 21:41:38 +0000 Subject: [PATCH 17/17] simpler name --- cns/restserver/nodesubnet.go | 4 ++-- cns/restserver/restserver.go | 4 ++-- cns/restserver/synchostnc.go | 20 ++++++++++---------- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/cns/restserver/nodesubnet.go b/cns/restserver/nodesubnet.go index 6d0c73c2d9..74e910e8dd 100644 --- a/cns/restserver/nodesubnet.go +++ b/cns/restserver/nodesubnet.go @@ -29,7 +29,7 @@ func (service *HTTPRestService) UpdateIPsForNodeSubnet(secondaryIPs []netip.Addr logger.Debugf("IP change processed successfully") - service.ncSyncState.NotifyReady() + service.ncWait.Done() return nil } @@ -58,7 +58,7 @@ func (service *HTTPRestService) InitializeNodeSubnet(ctx context.Context, podInf // StartNodeSubnet starts the IP fetcher for NodeSubnet. This will cause secondary IPs to be fetched periodically. // After the first successful fetch, conflist will be generated to indicate CNS is ready. func (service *HTTPRestService) StartNodeSubnet(ctx context.Context) error { - if err := service.ncSyncState.Start(); err != nil { + if err := service.ncWait.Start(); err != nil { return err } service.nodesubnetIPFetcher.Start(ctx) diff --git a/cns/restserver/restserver.go b/cns/restserver/restserver.go index 8c1250354c..77236d19c1 100644 --- a/cns/restserver/restserver.go +++ b/cns/restserver/restserver.go @@ -101,7 +101,7 @@ type HTTPRestService struct { PnpIDByMacAddress map[string]string imdsClient imdsClient nodesubnetIPFetcher *nodesubnet.IPFetcher - ncSyncState networkContainerSyncState + ncWait ncWait } type CNIConflistGenerator interface { @@ -388,7 +388,7 @@ func (service *HTTPRestService) AttachIPConfigsHandlerMiddleware(middleware cns. // Wait waits for nc sync state then writes out the conflist. func (service *HTTPRestService) Wait(ctx context.Context) { - service.ncSyncState.Wait(ctx) + service.ncWait.Wait(ctx) if ctx.Err() != nil { logger.Printf("Context done before writing out conflist: %v", ctx.Err()) return diff --git a/cns/restserver/synchostnc.go b/cns/restserver/synchostnc.go index 42e8dbf56e..5592a71ef7 100644 --- a/cns/restserver/synchostnc.go +++ b/cns/restserver/synchostnc.go @@ -19,16 +19,16 @@ import ( // TODO: make this file a sub pacakge? -// NetworkContainerSyncState manages waiting on conflist to ge +// ncWait manages making sure ncstate or nodesubnet state is ready before conflist writing // Basically a wait group that can only be added once and waits with a context // meant to be used uninitialized and then started once the sync loop begins. -type networkContainerSyncState struct { +type ncWait struct { wg sync.WaitGroup started atomic.Bool } -// Start is like add except it only allows being called once. -func (n *networkContainerSyncState) Start() error { +// Start is like Add except it only allows being called once. +func (n *ncWait) Start() error { if !n.started.CompareAndSwap(false, true) { return errors.New("sync loop already started") } @@ -36,8 +36,8 @@ func (n *networkContainerSyncState) Start() error { return nil } -// NotifyReady is like Done but will ignore if Start was never called. -func (n *networkContainerSyncState) NotifyReady() { +// Done is like waitgroup Done but will ignore if Start was never called. +func (n *ncWait) Done() { if !n.started.Load() { return //nobody ever set this up just move on. } @@ -45,7 +45,7 @@ func (n *networkContainerSyncState) NotifyReady() { } // Wait waits for the CNI conflist to be ready or for the context to be done. -func (n *networkContainerSyncState) Wait(ctx context.Context) { +func (n *ncWait) Wait(ctx context.Context) { done := make(chan struct{}) go func() { n.wg.Wait() //still fine to wait even if never started will just return immediately @@ -61,7 +61,7 @@ func (n *networkContainerSyncState) Wait(ctx context.Context) { // StartSyncHostNCVersionLoop loops until checking htat NCS are programmed annd also notifis when at least one has been programmed // so we can write conflist and mark cns ready. func (service *HTTPRestService) StartSyncHostNCVersionLoop(ctx context.Context, cnsconfig configuration.CNSConfig) error { - if err := service.ncSyncState.Start(); err != nil { + if err := service.ncWait.Start(); err != nil { return err } go func() { @@ -72,12 +72,12 @@ func (service *HTTPRestService) StartSyncHostNCVersionLoop(ctx context.Context, timeout := time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond for { if service.syncHostNCVersionWrapper(ctx, cnsconfig.ChannelMode, timeout) { - one.Do(service.ncSyncState.NotifyReady) + one.Do(service.ncWait.Done) } select { case <-ticker.C: if service.syncHostNCVersionWrapper(ctx, cnsconfig.ChannelMode, timeout) { - one.Do(service.ncSyncState.NotifyReady) + one.Do(service.ncWait.Done) } case <-ctx.Done(): logger.Printf("Stopping SyncHostNCVersion loop.")