Skip to content
98 changes: 49 additions & 49 deletions cloud/linode/cilium_loadbalancers.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ var (
// getExistingSharedIPsInCluster determines the list of addresses to share on nodes by checking the
// CiliumLoadBalancerIPPools created by the CCM in createCiliumLBIPPool
// NOTE: Cilium CRDs must be installed for this to work
func (l *loadbalancers) getExistingSharedIPsInCluster(ctx context.Context) ([]string, error) {
func (l *Loadbalancers) getExistingSharedIPsInCluster(ctx context.Context) ([]string, error) {
addrs := []string{}
if err := l.retrieveCiliumClientset(); err != nil {
return addrs, err
}
pools, err := l.ciliumClient.CiliumLoadBalancerIPPools().List(ctx, metav1.ListOptions{
pools, err := l.CiliumClient.CiliumLoadBalancerIPPools().List(ctx, metav1.ListOptions{
LabelSelector: "app.kubernetes.io/managed-by=linode-ccm",
})
if err != nil {
Expand All @@ -94,11 +94,11 @@ func (l *loadbalancers) getExistingSharedIPsInCluster(ctx context.Context) ([]st
return addrs, nil
}

func (l *loadbalancers) getExistingSharedIPs(ctx context.Context, ipHolder *linodego.Instance) ([]string, error) {
func (l *Loadbalancers) getExistingSharedIPs(ctx context.Context, ipHolder *linodego.Instance) ([]string, error) {
if ipHolder == nil {
return nil, nil
}
ipHolderAddrs, err := l.client.GetInstanceIPAddresses(ctx, ipHolder.ID)
ipHolderAddrs, err := l.Client.GetInstanceIPAddresses(ctx, ipHolder.ID)
if err != nil {
return nil, err
}
Expand All @@ -110,22 +110,22 @@ func (l *loadbalancers) getExistingSharedIPs(ctx context.Context, ipHolder *lino
}

// shareIPs shares the given list of IP addresses on the given Node
func (l *loadbalancers) shareIPs(ctx context.Context, addrs []string, node *v1.Node) error {
func (l *Loadbalancers) shareIPs(ctx context.Context, addrs []string, node *v1.Node) error {
nodeLinodeID, err := parseProviderID(node.Spec.ProviderID)
if err != nil {
return err
}
if err = l.retrieveKubeClient(); err != nil {
return err
}
if err = l.client.ShareIPAddresses(ctx, linodego.IPAddressesShareOptions{
if err = l.Client.ShareIPAddresses(ctx, linodego.IPAddressesShareOptions{
IPs: addrs,
LinodeID: nodeLinodeID,
}); err != nil {
return err
}
// need to make sure node is up-to-date
node, err = l.kubeClient.CoreV1().Nodes().Get(ctx, node.Name, metav1.GetOptions{})
node, err = l.KubeClient.CoreV1().Nodes().Get(ctx, node.Name, metav1.GetOptions{})
if err != nil {
return err
}
Expand All @@ -134,7 +134,7 @@ func (l *loadbalancers) shareIPs(ctx context.Context, addrs []string, node *v1.N
}
node.Labels[annotations.AnnLinodeNodeIPSharingUpdated] = "true"
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
_, err := l.kubeClient.CoreV1().Nodes().Update(ctx, node, metav1.UpdateOptions{})
_, err := l.KubeClient.CoreV1().Nodes().Update(ctx, node, metav1.UpdateOptions{})
return err
})
if retryErr != nil {
Expand All @@ -151,16 +151,16 @@ func (l *loadbalancers) shareIPs(ctx context.Context, addrs []string, node *v1.N
// perform IP sharing (via a specified node selector) have the expected IPs shared
// in the event that a Node joins the cluster after the LoadBalancer Service already
// exists
func (l *loadbalancers) handleIPSharing(ctx context.Context, node *v1.Node, ipHolderSuffix string) error {
func (l *Loadbalancers) handleIPSharing(ctx context.Context, node *v1.Node, ipHolderSuffix string) error {
// ignore cases where the provider ID has been set
if node.Spec.ProviderID == "" {
klog.Info("skipping IP while providerID is unset")
return nil
}
// If performing Service load-balancing via IP sharing + BGP, check for a special annotation
// added by the CCM gets set when load-balancer IPs have been successfully shared on the node
if Options.BGPNodeSelector != "" {
kv := strings.Split(Options.BGPNodeSelector, "=")
if l.Options.BGPNodeSelector != "" {
kv := strings.Split(l.Options.BGPNodeSelector, "=")
// Check if node should be participating in IP sharing via the given selector
if val, ok := node.Labels[kv[0]]; !ok || len(kv) != 2 || val != kv[1] {
// not a selected Node
Expand Down Expand Up @@ -210,13 +210,13 @@ func (l *loadbalancers) handleIPSharing(ctx context.Context, node *v1.Node, ipHo

// createSharedIP requests an additional IP that can be shared on Nodes to support
// loadbalancing via Cilium LB IPAM + BGP Control Plane.
func (l *loadbalancers) createSharedIP(ctx context.Context, nodes []*v1.Node, ipHolderSuffix string) (string, error) {
func (l *Loadbalancers) createSharedIP(ctx context.Context, nodes []*v1.Node, ipHolderSuffix string) (string, error) {
ipHolder, err := l.ensureIPHolder(ctx, ipHolderSuffix)
if err != nil {
return "", err
}

newSharedIP, err := l.client.AddInstanceIPAddress(ctx, ipHolder.ID, true)
newSharedIP, err := l.Client.AddInstanceIPAddress(ctx, ipHolder.ID, true)
if err != nil {
return "", err
}
Expand All @@ -243,7 +243,7 @@ func (l *loadbalancers) createSharedIP(ctx context.Context, nodes []*v1.Node, ip
}

// share the IPs with nodes participating in Cilium BGP peering
if Options.BGPNodeSelector == "" {
if l.Options.BGPNodeSelector == "" {
for _, node := range nodes {
if _, ok := node.Labels[commonControlPlaneLabel]; !ok {
if err = l.shareIPs(ctx, addrs, node); err != nil {
Expand All @@ -252,7 +252,7 @@ func (l *loadbalancers) createSharedIP(ctx context.Context, nodes []*v1.Node, ip
}
}
} else {
kv := strings.Split(Options.BGPNodeSelector, "=")
kv := strings.Split(l.Options.BGPNodeSelector, "=")
for _, node := range nodes {
if val, ok := node.Labels[kv[0]]; ok && len(kv) == 2 && val == kv[1] {
if err = l.shareIPs(ctx, addrs, node); err != nil {
Expand All @@ -267,13 +267,13 @@ func (l *loadbalancers) createSharedIP(ctx context.Context, nodes []*v1.Node, ip

// deleteSharedIP cleans up the shared IP for a LoadBalancer Service if it was assigned
// by Cilium LB IPAM, removing it from the ip-holder
func (l *loadbalancers) deleteSharedIP(ctx context.Context, service *v1.Service) error {
func (l *Loadbalancers) deleteSharedIP(ctx context.Context, service *v1.Service) error {
err := l.retrieveKubeClient()
if err != nil {
return err
}
nodeList, err := l.kubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{
LabelSelector: Options.BGPNodeSelector,
nodeList, err := l.KubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{
LabelSelector: l.Options.BGPNodeSelector,
})
if err != nil {
return err
Expand All @@ -282,8 +282,8 @@ func (l *loadbalancers) deleteSharedIP(ctx context.Context, service *v1.Service)

serviceNn := getServiceNn(service)
var ipHolderSuffix string
if Options.IpHolderSuffix != "" {
ipHolderSuffix = Options.IpHolderSuffix
if l.Options.IpHolderSuffix != "" {
ipHolderSuffix = l.Options.IpHolderSuffix
klog.V(3).Infof("using parameter-based IP Holder suffix %s for Service %s", ipHolderSuffix, serviceNn)
}

Expand All @@ -304,14 +304,14 @@ func (l *loadbalancers) deleteSharedIP(ctx context.Context, service *v1.Service)
if err != nil {
return err
}
err = l.client.DeleteInstanceIPAddress(ctx, nodeLinodeID, ingress.IP)
err = l.Client.DeleteInstanceIPAddress(ctx, nodeLinodeID, ingress.IP)
if IgnoreLinodeAPIError(err, http.StatusNotFound) != nil {
return err
}
}

// finally delete the shared IP on the ip-holder
err = l.client.DeleteInstanceIPAddress(ctx, ipHolder.ID, ingress.IP)
err = l.Client.DeleteInstanceIPAddress(ctx, ipHolder.ID, ingress.IP)
if IgnoreLinodeAPIError(err, http.StatusNotFound) != nil {
return err
}
Expand All @@ -323,17 +323,17 @@ func (l *loadbalancers) deleteSharedIP(ctx context.Context, service *v1.Service)

// To hold the IP in lieu of a proper IP reservation system, a special Nanode is
// created but not booted and used to hold all shared IPs.
func (l *loadbalancers) ensureIPHolder(ctx context.Context, suffix string) (*linodego.Instance, error) {
func (l *Loadbalancers) ensureIPHolder(ctx context.Context, suffix string) (*linodego.Instance, error) {
ipHolder, err := l.getIPHolder(ctx, suffix)
if err != nil {
return nil, err
}
if ipHolder != nil {
return ipHolder, nil
}
label := generateClusterScopedIPHolderLinodeName(l.zone, suffix)
ipHolder, err = l.client.CreateInstance(ctx, linodego.InstanceCreateOptions{
Region: l.zone,
label := generateClusterScopedIPHolderLinodeName(l.Zone, suffix)
ipHolder, err = l.Client.CreateInstance(ctx, linodego.InstanceCreateOptions{
Region: l.Zone,
Type: "g6-nanode-1",
Label: label,
RootPass: uuid.NewString(),
Expand All @@ -353,16 +353,16 @@ func (l *loadbalancers) ensureIPHolder(ctx context.Context, suffix string) (*lin
return ipHolder, nil
}

func (l *loadbalancers) getIPHolder(ctx context.Context, suffix string) (*linodego.Instance, error) {
func (l *Loadbalancers) getIPHolder(ctx context.Context, suffix string) (*linodego.Instance, error) {
// even though we have updated the naming convention, leaving this in ensures we have backwards compatibility
filter := map[string]string{"label": fmt.Sprintf("%s-%s", ipHolderLabelPrefix, l.zone)}
filter := map[string]string{"label": fmt.Sprintf("%s-%s", ipHolderLabelPrefix, l.Zone)}
rawFilter, err := json.Marshal(filter)
if err != nil {
panic("this should not have failed")
}
var ipHolder *linodego.Instance
// TODO (rk): should we switch to using GET instead of LIST? we would be able to wrap logic around errors
linodes, err := l.client.ListInstances(ctx, linodego.NewListOptions(1, string(rawFilter)))
linodes, err := l.Client.ListInstances(ctx, linodego.NewListOptions(1, string(rawFilter)))
if err != nil {
return nil, err
}
Expand All @@ -373,12 +373,12 @@ func (l *loadbalancers) getIPHolder(ctx context.Context, suffix string) (*linode
// a) an ip holder instance does not exist yet
// or
// b) another cluster already holds the linode grant to an ip holder using the old naming convention
filter = map[string]string{"label": generateClusterScopedIPHolderLinodeName(l.zone, suffix)}
filter = map[string]string{"label": generateClusterScopedIPHolderLinodeName(l.Zone, suffix)}
rawFilter, err = json.Marshal(filter)
if err != nil {
panic("this should not have failed")
}
linodes, err = l.client.ListInstances(ctx, linodego.NewListOptions(1, string(rawFilter)))
linodes, err = l.Client.ListInstances(ctx, linodego.NewListOptions(1, string(rawFilter)))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -407,15 +407,15 @@ func generateClusterScopedIPHolderLinodeName(zone, suffix string) (label string)
return label
}

func (l *loadbalancers) retrieveCiliumClientset() error {
if l.ciliumClient != nil {
func (l *Loadbalancers) retrieveCiliumClientset() error {
if l.CiliumClient != nil {
return nil
}
var (
kubeConfig *rest.Config
err error
)
kubeconfigFlag := Options.KubeconfigFlag
kubeconfigFlag := l.Options.KubeconfigFlag
if kubeconfigFlag == nil || kubeconfigFlag.Value.String() == "" {
kubeConfig, err = rest.InClusterConfig()
} else {
Expand All @@ -424,15 +424,15 @@ func (l *loadbalancers) retrieveCiliumClientset() error {
if err != nil {
return err
}
l.ciliumClient, err = ciliumclient.NewForConfig(kubeConfig)
l.CiliumClient, err = ciliumclient.NewForConfig(kubeConfig)

return err
}

// for LoadBalancer Services not backed by a NodeBalancer, a CiliumLoadBalancerIPPool resource
// will be created specifically for the Service with the requested shared IP
// NOTE: Cilium CRDs must be installed for this to work
func (l *loadbalancers) createCiliumLBIPPool(ctx context.Context, service *v1.Service, sharedIP string) (*v2alpha1.CiliumLoadBalancerIPPool, error) {
func (l *Loadbalancers) createCiliumLBIPPool(ctx context.Context, service *v1.Service, sharedIP string) (*v2alpha1.CiliumLoadBalancerIPPool, error) {
if err := l.retrieveCiliumClientset(); err != nil {
return nil, err
}
Expand All @@ -455,52 +455,52 @@ func (l *loadbalancers) createCiliumLBIPPool(ctx context.Context, service *v1.Se
},
}

return l.ciliumClient.CiliumLoadBalancerIPPools().Create(ctx, ciliumLBIPPool, metav1.CreateOptions{})
return l.CiliumClient.CiliumLoadBalancerIPPools().Create(ctx, ciliumLBIPPool, metav1.CreateOptions{})
}

// NOTE: Cilium CRDs must be installed for this to work
func (l *loadbalancers) deleteCiliumLBIPPool(ctx context.Context, service *v1.Service) error {
func (l *Loadbalancers) deleteCiliumLBIPPool(ctx context.Context, service *v1.Service) error {
if err := l.retrieveCiliumClientset(); err != nil {
return err
}

return l.ciliumClient.CiliumLoadBalancerIPPools().Delete(
return l.CiliumClient.CiliumLoadBalancerIPPools().Delete(
ctx,
fmt.Sprintf("%s-%s-pool", service.Namespace, service.Name),
metav1.DeleteOptions{},
)
}

// NOTE: Cilium CRDs must be installed for this to work
func (l *loadbalancers) getCiliumLBIPPool(ctx context.Context, service *v1.Service) (*v2alpha1.CiliumLoadBalancerIPPool, error) {
func (l *Loadbalancers) getCiliumLBIPPool(ctx context.Context, service *v1.Service) (*v2alpha1.CiliumLoadBalancerIPPool, error) {
if err := l.retrieveCiliumClientset(); err != nil {
return nil, err
}

return l.ciliumClient.CiliumLoadBalancerIPPools().Get(
return l.CiliumClient.CiliumLoadBalancerIPPools().Get(
ctx,
fmt.Sprintf("%s-%s-pool", service.Namespace, service.Name),
metav1.GetOptions{},
)
}

// NOTE: Cilium CRDs must be installed for this to work
func (l *loadbalancers) ensureCiliumBGPPeeringPolicy(ctx context.Context) error {
func (l *Loadbalancers) ensureCiliumBGPPeeringPolicy(ctx context.Context) error {
if raw, ok := os.LookupEnv("BGP_CUSTOM_ID_MAP"); ok && raw != "" {
klog.Info("BGP_CUSTOM_ID_MAP env variable specified, using it instead of the default region map")
if err := json.Unmarshal([]byte(raw), &regionIDMap); err != nil {
return err
}
}
regionID, ok := regionIDMap[l.zone]
regionID, ok := regionIDMap[l.Zone]
if !ok {
return fmt.Errorf("unsupported region for BGP: %s", l.zone)
return fmt.Errorf("unsupported region for BGP: %s", l.Zone)
}
if err := l.retrieveCiliumClientset(); err != nil {
return err
}
// check if policy already exists
policy, err := l.ciliumClient.CiliumBGPPeeringPolicies().Get(ctx, ciliumBGPPeeringPolicyName, metav1.GetOptions{})
policy, err := l.CiliumClient.CiliumBGPPeeringPolicies().Get(ctx, ciliumBGPPeeringPolicyName, metav1.GetOptions{})
if err != nil && !k8serrors.IsNotFound(err) {
klog.Infof("Failed to get CiliumBGPPeeringPolicy: %s", err.Error())
return err
Expand All @@ -513,7 +513,7 @@ func (l *loadbalancers) ensureCiliumBGPPeeringPolicy(ctx context.Context) error
// otherwise create it
var nodeSelector slimv1.LabelSelector
// If no BGPNodeSelector is specified, select all worker nodes.
if Options.BGPNodeSelector == "" {
if l.Options.BGPNodeSelector == "" {
nodeSelector = slimv1.LabelSelector{
MatchExpressions: []slimv1.LabelSelectorRequirement{
{
Expand All @@ -523,9 +523,9 @@ func (l *loadbalancers) ensureCiliumBGPPeeringPolicy(ctx context.Context) error
},
}
} else {
kv := strings.Split(Options.BGPNodeSelector, "=")
kv := strings.Split(l.Options.BGPNodeSelector, "=")
if len(kv) != BGPNodeSelectorFlagInputLen {
return fmt.Errorf("invalid node selector %s", Options.BGPNodeSelector)
return fmt.Errorf("invalid node selector %s", l.Options.BGPNodeSelector)
}

nodeSelector = slimv1.LabelSelector{MatchLabels: map[string]string{kv[0]: kv[1]}}
Expand Down Expand Up @@ -581,7 +581,7 @@ func (l *loadbalancers) ensureCiliumBGPPeeringPolicy(ctx context.Context) error
}

klog.Info("Creating CiliumBGPPeeringPolicy")
_, err = l.ciliumClient.CiliumBGPPeeringPolicies().Create(ctx, ciliumBGPPeeringPolicy, metav1.CreateOptions{})
_, err = l.CiliumClient.CiliumBGPPeeringPolicies().Create(ctx, ciliumBGPPeeringPolicy, metav1.CreateOptions{})

return err
}
Loading
Loading