Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 0 additions & 142 deletions cns/restserver/internalapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ import (
"net/http/httptest"
"net/netip"
"reflect"
"strconv"
"strings"
"time"

"github.com/Azure/azure-container-networking/cns"
"github.com/Azure/azure-container-networking/cns/logger"
Expand Down Expand Up @@ -168,146 +166,6 @@ func (service *HTTPRestService) SyncNodeStatus(dncEP, infraVnet, nodeID string,
return
}

// SyncHostNCVersion will check NC version from NMAgent and save it as host NC version in container status.
// If NMAgent NC version got updated, CNS will refresh the pending programming IP status.
func (service *HTTPRestService) SyncHostNCVersion(ctx context.Context, channelMode string) {
service.Lock()
defer service.Unlock()
start := time.Now()
programmedNCCount, err := service.syncHostNCVersion(ctx, channelMode)
// even if we get an error, we want to write the CNI conflist if we have any NC programmed to any version
if programmedNCCount > 0 {
// This will only be done once per lifetime of the CNS process. This function is threadsafe and will panic
// if it fails, so it is safe to call in a non-preemptable goroutine.
go service.MustGenerateCNIConflistOnce()
}
if err != nil {
logger.Errorf("sync host error %v", err)
}
syncHostNCVersionCount.WithLabelValues(strconv.FormatBool(err == nil)).Inc()
syncHostNCVersionLatency.WithLabelValues(strconv.FormatBool(err == nil)).Observe(time.Since(start).Seconds())
}

var errNonExistentContainerStatus = errors.New("nonExistantContainerstatus")

// syncHostVersion updates the CNS state with the latest programmed versions of NCs attached to the VM. If any NC in local CNS state
// does not match the version that DNC claims to have published, this function will call NMAgent and list the latest programmed versions of
// all NCs and update the CNS state accordingly. This function returns the the total number of NCs on this VM that have been programmed to
// some version, NOT the number of NCs that are up-to-date.
func (service *HTTPRestService) syncHostNCVersion(ctx context.Context, channelMode string) (int, error) {
outdatedNCs := map[string]struct{}{}
programmedNCs := map[string]struct{}{}
for idx := range service.state.ContainerStatus {
// Will open a separate PR to convert all the NC version related variable to int. Change from string to int is a pain.
localNCVersion, err := strconv.Atoi(service.state.ContainerStatus[idx].HostVersion)
if err != nil {
logger.Errorf("Received err when change containerstatus.HostVersion %s to int, err msg %v", service.state.ContainerStatus[idx].HostVersion, err)
continue
}
dncNCVersion, err := strconv.Atoi(service.state.ContainerStatus[idx].CreateNetworkContainerRequest.Version)
if err != nil {
logger.Errorf("Received err when change nc version %s in containerstatus to int, err msg %v", service.state.ContainerStatus[idx].CreateNetworkContainerRequest.Version, err)
continue
}
// host NC version is the NC version from NMAgent, if it's smaller than NC version from DNC, then append it to indicate it needs update.
if localNCVersion < dncNCVersion {
outdatedNCs[service.state.ContainerStatus[idx].ID] = struct{}{}
} else if localNCVersion > dncNCVersion {
logger.Errorf("NC version from NMAgent is larger than DNC, NC version from NMAgent is %d, NC version from DNC is %d", localNCVersion, dncNCVersion)
}

if localNCVersion > -1 {
programmedNCs[service.state.ContainerStatus[idx].ID] = struct{}{}
}
}
if len(outdatedNCs) == 0 {
return len(programmedNCs), nil
}

ncVersionListResp, err := service.nma.GetNCVersionList(ctx)
if err != nil {
return len(programmedNCs), errors.Wrap(err, "failed to get nc version list from nmagent")
}

// Get IMDS NC versions for delegated NIC scenarios
imdsNCVersions, err := service.GetIMDSNCs(ctx)
if err != nil {
// If any of the NMA API check calls, imds calls fails assume that nma build doesn't have the latest changes and create empty map
imdsNCVersions = make(map[string]string)
}

nmaNCs := map[string]string{}
for _, nc := range ncVersionListResp.Containers {
nmaNCs[strings.ToLower(nc.NetworkContainerID)] = nc.Version
}

// Consolidate both nc's from NMA and IMDS calls
nmaProgrammedNCs := make(map[string]string)
for ncID, version := range nmaNCs {
nmaProgrammedNCs[ncID] = version
}
for ncID, version := range imdsNCVersions {
if _, exists := nmaProgrammedNCs[ncID]; !exists {
nmaProgrammedNCs[strings.ToLower(ncID)] = version
} else {
//nolint:staticcheck // SA1019: suppress deprecated logger.Warnf usage. Todo: legacy logger usage is consistent in cns repo. Migrates when all logger usage is migrated
logger.Warnf("NC %s exists in both NMA and IMDS responses, which is not expected", ncID)
}
}
hasNC.Set(float64(len(nmaProgrammedNCs)))
for ncID := range outdatedNCs {
nmaProgrammedNCVersionStr, ok := nmaProgrammedNCs[ncID]
if !ok {
// Neither NMA nor IMDS has this NC that we need programmed yet, bail out
continue
}
nmaProgrammedNCVersion, err := strconv.Atoi(nmaProgrammedNCVersionStr)
if err != nil {
logger.Errorf("failed to parse container version of %s: %s", ncID, err)
continue
}
// Check whether it exist in service state and get the related nc info
ncInfo, exist := service.state.ContainerStatus[ncID]
if !exist {
// if we marked this NC as needs update, but it no longer exists in internal state when we reach
// this point, our internal state has changed unexpectedly and we should bail out and try again.
return len(programmedNCs), errors.Wrapf(errNonExistentContainerStatus, "can't find NC with ID %s in service state, stop updating this host NC version", ncID)
}
// if the NC still exists in state and is programmed to some version (doesn't have to be latest), add it to our set of NCs that have been programmed
if nmaProgrammedNCVersion > -1 {
programmedNCs[ncID] = struct{}{}
}

localNCVersion, err := strconv.Atoi(ncInfo.HostVersion)
if err != nil {
logger.Errorf("failed to parse host nc version string %s: %s", ncInfo.HostVersion, err)
continue
}
if localNCVersion > nmaProgrammedNCVersion {
//nolint:staticcheck // SA1019: suppress deprecated logger.Printf usage. Todo: legacy logger usage is consistent in cns repo. Migrates when all logger usage is migrated
logger.Errorf("NC version from consolidated sources is decreasing: have %d, got %d", localNCVersion, nmaProgrammedNCVersion)
continue
}
if channelMode == cns.CRD {
service.MarkIpsAsAvailableUntransacted(ncInfo.ID, nmaProgrammedNCVersion)
}
//nolint:staticcheck // SA1019: suppress deprecated logger.Printf usage. Todo: legacy logger usage is consistent in cns repo. Migrates when all logger usage is migrated
logger.Printf("Updating NC %s host version from %s to %s", ncID, ncInfo.HostVersion, nmaProgrammedNCVersionStr)
ncInfo.HostVersion = nmaProgrammedNCVersionStr
logger.Printf("Updated NC %s host version to %s", ncID, ncInfo.HostVersion)
service.state.ContainerStatus[ncID] = ncInfo
// if we successfully updated the NC, pop it from the needs update set.
delete(outdatedNCs, ncID)
}
// if we didn't empty out the needs update set, NMA has not programmed all the NCs we are expecting, and we
// need to return an error indicating that
if len(outdatedNCs) > 0 {
return len(programmedNCs), errors.Errorf("unable to update some NCs: %v, missing or bad response from NMA or IMDS", outdatedNCs)
}

return len(programmedNCs), nil
}

func (service *HTTPRestService) ReconcileIPAssignment(podInfoByIP map[string]cns.PodInfo, ncReqs []*cns.CreateNetworkContainerRequest) types.ResponseCode {
// index all the secondary IP configs for all the nc reqs, for easier lookup later on.
allSecIPsIdx := make(map[string]*cns.CreateNetworkContainerRequest)
Expand Down
73 changes: 47 additions & 26 deletions cns/restserver/internalapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func TestCreateAndUpdateNCWithSecondaryIPNCVersion(t *testing.T) {
}
}

func TestSyncHostNCVersion(t *testing.T) {
func TestStartSyncHostNCVersionLoop(t *testing.T) {
// cns.KubernetesCRD has one more logic compared to other orchestrator type, so test both of them
orchestratorTypes := []string{cns.Kubernetes, cns.KubernetesCRD}
for _, orchestratorType := range orchestratorTypes {
Expand Down Expand Up @@ -349,7 +349,13 @@ func TestSyncHostNCVersion(t *testing.T) {
defer cleanupIMDS()

// When syncing the host NC version, it will use the orchestratorType passed in.
svc.SyncHostNCVersion(context.Background(), orchestratorType)
cnsconf := configuration.CNSConfig{
SyncHostNCVersionIntervalMs: 100,
ChannelMode: orchestratorType,
}
err := svc.StartSyncHostNCVersionLoop(t.Context(), cnsconf)
assert.NoError(t, err)
svc.Wait(t.Context()) // wait for at leat one run
containerStatus = svc.state.ContainerStatus[req.NetworkContainerid]
if containerStatus.HostVersion != "0" {
t.Errorf("Unexpected containerStatus.HostVersion %s, expected host version should be 0 in string", containerStatus.HostVersion)
Expand Down Expand Up @@ -400,7 +406,9 @@ func TestPendingIPsGotUpdatedWhenSyncHostNCVersion(t *testing.T) {
cleanup := setMockNMAgent(svc, mnma)
defer cleanup()

svc.SyncHostNCVersion(context.Background(), cns.CRD)
err := svc.StartSyncHostNCVersionLoop(t.Context(), fastcnsconf)
assert.NoError(t, err)
svc.Wait(t.Context()) // wait for at leat one run
containerStatus = svc.state.ContainerStatus[req.NetworkContainerid]

receivedSecondaryIPConfigs = containerStatus.CreateNetworkContainerRequest.SecondaryIPConfigs
Expand Down Expand Up @@ -440,13 +448,13 @@ func TestSyncHostNCVersionErrorMissingNC(t *testing.T) {
cleanup := setMockNMAgent(svc, mnma)
defer cleanup()

_, err := svc.syncHostNCVersion(context.Background(), cns.KubernetesCRD)
_, err := svc.syncHostNCVersion(t.Context(), cns.KubernetesCRD)
if err == nil {
t.Errorf("Expected error when NC is missing from both NMAgent and IMDS, but got nil")
}

// Check that the error message contains the expected text
expectedErrorText := "unable to update some NCs"
expectedErrorText := "Have outdated NCs"
if !strings.Contains(err.Error(), expectedErrorText) {
t.Errorf("Expected error to contain '%s', but got: %v", expectedErrorText, err)
}
Expand Down Expand Up @@ -485,7 +493,7 @@ func TestSyncHostNCVersionLocalVersionHigher(t *testing.T) {
cleanup := setMockNMAgent(svc, mnma)
defer cleanup()

_, err := svc.syncHostNCVersion(context.Background(), cns.KubernetesCRD)
_, err := svc.syncHostNCVersion(t.Context(), cns.KubernetesCRD)
if err != nil {
t.Errorf("Expected sync to succeed, but got error: %v", err)
}
Expand Down Expand Up @@ -528,7 +536,7 @@ func TestSyncHostNCVersionLocalHigherThanDNC(t *testing.T) {

// This should detect that localNCVersion (3) > dncNCVersion (1) and log error
// but since there are no outdated NCs, it should return successfully
_, err := svc.syncHostNCVersion(context.Background(), cns.KubernetesCRD)
_, err := svc.syncHostNCVersion(t.Context(), cns.KubernetesCRD)
if err != nil {
t.Errorf("Expected no error when localNCVersion > dncNCVersion (no outdated NCs), but got: %v", err)
}
Expand Down Expand Up @@ -601,13 +609,13 @@ func TestSyncHostNCVersionIMDSAPIVersionNotSupported(t *testing.T) {
defer func() { svc.imdsClient = originalIMDS }()

// Test should fail because of outdated IMDS NC that can't be updated
_, err := svc.syncHostNCVersion(context.Background(), orchestratorType)
_, err := svc.syncHostNCVersion(t.Context(), orchestratorType)
if err == nil {
t.Errorf("Expected error when there are outdated IMDS NCs but API version is not supported, but got nil")
}

// Verify the error is about being unable to update NCs
expectedErrorText := "unable to update some NCs"
expectedErrorText := "Have outdated NCs"
if !strings.Contains(err.Error(), expectedErrorText) {
t.Errorf("Expected error to contain '%s', but got: %v", expectedErrorText, err)
}
Expand Down Expand Up @@ -1345,6 +1353,11 @@ func (m *mockCNIConflistGenerator) getGeneratedCount() int {
return m.generatedCount
}

var fastcnsconf = configuration.CNSConfig{
SyncHostNCVersionIntervalMs: 100,
ChannelMode: cns.CRD,
}

// TestCNIConflistGenerationNewNC tests that discovering a new programmed NC in CNS state will trigger CNI conflist generation
func TestCNIConflistGenerationNewNC(t *testing.T) {
ncID := "some-new-nc" //nolint:goconst // value not shared across tests, can change without issue
Expand Down Expand Up @@ -1380,9 +1393,11 @@ func TestCNIConflistGenerationNewNC(t *testing.T) {
imdsClient: fakes.NewMockIMDSClient(),
}

service.SyncHostNCVersion(context.Background(), cns.CRD)
// CNI conflist gen happens in goroutine so sleep for a second to let it run
time.Sleep(time.Second)
ctx, cancel := context.WithTimeout(t.Context(), time.Second*5)
defer cancel()
err := service.StartSyncHostNCVersionLoop(ctx, fastcnsconf)
assert.NoError(t, err)
service.Wait(ctx)
assert.Equal(t, 1, mockgen.getGeneratedCount())
}

Expand Down Expand Up @@ -1421,9 +1436,11 @@ func TestCNIConflistGenerationExistingNC(t *testing.T) {
imdsClient: fakes.NewMockIMDSClient(),
}

service.SyncHostNCVersion(context.Background(), cns.CRD)
// CNI conflist gen happens in goroutine so sleep for a second to let it run
time.Sleep(time.Second)
ctx, cancel := context.WithTimeout(t.Context(), time.Second*5)
defer cancel()
err := service.StartSyncHostNCVersionLoop(ctx, fastcnsconf)
assert.NoError(t, err)
service.Wait(ctx)
assert.Equal(t, 1, mockgen.getGeneratedCount())
}

Expand Down Expand Up @@ -1463,12 +1480,12 @@ func TestCNIConflistGenerationNewNCTwice(t *testing.T) {
imdsClient: fakes.NewMockIMDSClient(),
}

service.SyncHostNCVersion(context.Background(), cns.CRD)
// CNI conflist gen happens in goroutine so sleep for a second to let it run
time.Sleep(time.Second)
ctx, cancel := context.WithTimeout(t.Context(), time.Second*5)
defer cancel()
err := service.StartSyncHostNCVersionLoop(ctx, fastcnsconf)
assert.NoError(t, err)
service.Wait(ctx)
assert.Equal(t, 1, mockgen.getGeneratedCount())

service.SyncHostNCVersion(context.Background(), cns.CRD)
// CNI conflist gen happens in goroutine so sleep for a second to let it run
time.Sleep(time.Second)
assert.Equal(t, 1, mockgen.getGeneratedCount()) // should still be one
Expand Down Expand Up @@ -1502,9 +1519,11 @@ func TestCNIConflistNotGenerated(t *testing.T) {
imdsClient: fakes.NewMockIMDSClient(),
}

service.SyncHostNCVersion(context.Background(), cns.CRD)
// CNI conflist gen happens in goroutine so sleep for a second to let it run
time.Sleep(time.Second)
ctx, cancel := context.WithTimeout(t.Context(), time.Second*5)
defer cancel()
err := service.StartSyncHostNCVersionLoop(ctx, fastcnsconf)
assert.NoError(t, err)
service.Wait(ctx)
assert.Equal(t, 0, mockgen.getGeneratedCount())
}

Expand Down Expand Up @@ -1545,9 +1564,11 @@ func TestCNIConflistGenerationOnNMAError(t *testing.T) {
imdsClient: fakes.NewMockIMDSClient(),
}

service.SyncHostNCVersion(context.Background(), cns.CRD)
// CNI conflist gen happens in goroutine so sleep for a second to let it run
time.Sleep(time.Second)
ctx, cancel := context.WithTimeout(t.Context(), time.Second*5)
defer cancel()
err := service.StartSyncHostNCVersionLoop(ctx, fastcnsconf)
assert.NoError(t, err)
service.Wait(ctx)
assert.Equal(t, 1, mockgen.getGeneratedCount())
}

Expand Down
10 changes: 6 additions & 4 deletions cns/restserver/nodesubnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ func (service *HTTPRestService) UpdateIPsForNodeSubnet(secondaryIPs []netip.Addr

logger.Debugf("IP change processed successfully")

// saved NC successfully. UpdateIPsForNodeSubnet is called only when IPs are fetched from NMAgent.
// We now have IPs to serve IPAM requests. Generate conflist to indicate CNS is ready
service.MustGenerateCNIConflistOnce()
service.ncWait.Done()
return nil
}

Expand Down Expand Up @@ -59,6 +57,10 @@ func (service *HTTPRestService) InitializeNodeSubnet(ctx context.Context, podInf

// StartNodeSubnet starts the IP fetcher for NodeSubnet. This will cause secondary IPs to be fetched periodically.
// After the first successful fetch, conflist will be generated to indicate CNS is ready.
func (service *HTTPRestService) StartNodeSubnet(ctx context.Context) {
func (service *HTTPRestService) StartNodeSubnet(ctx context.Context) error {
if err := service.ncWait.Start(); err != nil {
return err
}
service.nodesubnetIPFetcher.Start(ctx)
return nil
}
12 changes: 4 additions & 8 deletions cns/restserver/nodesubnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,15 @@ func TestNodeSubnet(t *testing.T) {

checkIPassignment(t, service, expectedIPs)

service.StartNodeSubnet(ctx)
if err := service.StartNodeSubnet(ctx); err != nil {
t.Fatalf("StartNodeSubnet returned an error: %v", err)
}

if service.GetNodesubnetIPFetcher() == nil {
t.Fatal("NodeSubnetIPFetcher is not initialized")
}

select {
case <-ctx.Done():
t.Errorf("test context done - %s", ctx.Err())
return
case <-mockCNIConflistGenerator.GenerateCalled:
break
}
service.Wait(ctx)

expectedIPs["10.0.0.45"] = types.Available
checkIPassignment(t, service, expectedIPs)
Expand Down
Loading
Loading