Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 26 additions & 20 deletions pkg/controller/encryption/ipsec/ipsec_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"kmesh.net/kmesh/pkg/utils"
)

// MaxRetries defines the maximum number of retry attempts for failed operations
const (
MaxRetries = 5
)
Expand All @@ -61,7 +62,8 @@ type lpmKey struct {
ip [4]uint32
}

type IPSecController struct {
// Controller manages IPSec configuration and synchronization across Kubernetes nodes
type Controller struct {
informer cache.SharedIndexInformer
lister kmeshnodeinfov1alpha1.KmeshNodeInfoLister
queue workqueue.TypedRateLimitingInterface[any]
Expand All @@ -72,7 +74,8 @@ type IPSecController struct {
tcDecryptProg *ebpf.Program
}

func NewIPsecController(k8sClientSet kubernetes.Interface, kniMap *ebpf.Map, decryptProg *ebpf.Program) (*IPSecController, error) {
// NewController creates a new IPSec controller instance with the provided Kubernetes client, KNI map, and decryption program
func NewController(k8sClientSet kubernetes.Interface, kniMap *ebpf.Map, decryptProg *ebpf.Program) (*Controller, error) {
clientSet, err := kube.GetKmeshNodeInfoClient()
if err != nil {
return nil, fmt.Errorf("failed to get kmesh node info client: %v", err)
Expand All @@ -81,7 +84,7 @@ func NewIPsecController(k8sClientSet kubernetes.Interface, kniMap *ebpf.Map, dec
nodeinfoLister := factroy.Kmesh().V1alpha1().KmeshNodeInfos().Lister()
nodeinfoInformer := factroy.Kmesh().V1alpha1().KmeshNodeInfos().Informer()

ipsecController := &IPSecController{
ipsecController := &Controller{
informer: nodeinfoInformer,
lister: nodeinfoLister,
queue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]()),
Expand Down Expand Up @@ -138,7 +141,8 @@ func NewIPsecController(k8sClientSet kubernetes.Interface, kniMap *ebpf.Map, dec
return ipsecController, nil
}

func (c *IPSecController) Run(stop <-chan struct{}) {
// Run starts the IPSec controller, initializing informers, attaching TC programs, and processing node information updates until the stop channel is closed
func (c *Controller) Run(stop <-chan struct{}) {
defer c.queue.ShutDown()
go c.informer.Run(stop)
if !cache.WaitForCacheSync(stop, c.informer.HasSynced) {
Expand Down Expand Up @@ -177,7 +181,8 @@ func (c *IPSecController) Run(stop <-chan struct{}) {
<-stop
}

func (c *IPSecController) Stop() {
// Stop gracefully shuts down the IPSec controller, cleaning up resources, detaching TC programs, and removing node information if not restarting
func (c *Controller) Stop() {
c.ipsecHandler.StopWatch()
if restart.GetStartType() == restart.Normal {
_ = c.knclient.Delete(context.TODO(), c.kmeshNodeInfo.Name, metav1.DeleteOptions{})
Expand All @@ -186,7 +191,7 @@ func (c *IPSecController) Stop() {
}
}

func (c *IPSecController) handleTc(mode int) error {
func (c *Controller) handleTc(mode int) error {
ifaces, err := net.Interfaces()
if err != nil {
return fmt.Errorf("failed to get interfaces: %v", err)
Expand Down Expand Up @@ -219,7 +224,7 @@ func (c *IPSecController) handleTc(mode int) error {
return nil
}

func (c *IPSecController) attachTcDecrypt() error {
func (c *Controller) attachTcDecrypt() error {
nodeNsPath := kmesh_netns.GetNodeNSpath()
attachFunc := func(netns.NetNS) error {
return c.handleTc(constants.TC_ATTACH)
Expand All @@ -231,7 +236,7 @@ func (c *IPSecController) attachTcDecrypt() error {
return nil
}

func (c *IPSecController) detachTcDecrypt() error {
func (c *Controller) detachTcDecrypt() error {
nodeNsPath := kmesh_netns.GetNodeNSpath()
detachFunc := func(netns.NetNS) error {
return c.handleTc(constants.TC_DETACH)
Expand All @@ -243,7 +248,7 @@ func (c *IPSecController) detachTcDecrypt() error {
return nil
}

func (c *IPSecController) handleKNIAdd(obj interface{}) {
func (c *Controller) handleKNIAdd(obj interface{}) {
kni, ok := obj.(*v1alpha1.KmeshNodeInfo)
if !ok {
log.Errorf("expected *v1alpha1_core.KmeshNodeInfo but got %T in handle add func", obj)
Expand All @@ -256,7 +261,7 @@ func (c *IPSecController) handleKNIAdd(obj interface{}) {
c.queue.AddRateLimited(kni.Name)
}

func (c *IPSecController) handleKNIUpdate(oldObj, newObj interface{}) {
func (c *Controller) handleKNIUpdate(oldObj, newObj interface{}) {
newKni, okNew := newObj.(*v1alpha1.KmeshNodeInfo)
if !okNew {
log.Errorf("expected *v1alpha1_core.KmeshNodeInfo but got %T in handle update new obj func", newObj)
Expand All @@ -280,7 +285,7 @@ func (c *IPSecController) handleKNIUpdate(oldObj, newObj interface{}) {
c.queue.AddRateLimited(newKni.Name)
}

func (c *IPSecController) handleKNIDelete(obj interface{}) {
func (c *Controller) handleKNIDelete(obj interface{}) {
node, ok := obj.(*v1alpha1.KmeshNodeInfo)
if !ok {
log.Errorf("expected *v1alpha1_core.KmeshNodeInfo but got %T in handle delete func", obj)
Expand Down Expand Up @@ -308,7 +313,7 @@ func (c *IPSecController) handleKNIDelete(obj interface{}) {
}
}

func (c *IPSecController) handleOneNodeInfo(node *v1alpha1.KmeshNodeInfo) error {
func (c *Controller) handleOneNodeInfo(node *v1alpha1.KmeshNodeInfo) error {
// can't change ipsec information when process
c.ipsecHandler.mutex.Lock()
defer c.ipsecHandler.mutex.Unlock()
Expand All @@ -331,7 +336,7 @@ func (c *IPSecController) handleOneNodeInfo(node *v1alpha1.KmeshNodeInfo) error
return nil
}

func (c *IPSecController) generalKNIMapKey(remoteCIDR string) (*lpmKey, error) {
func (c *Controller) generalKNIMapKey(remoteCIDR string) (*lpmKey, error) {
prefix, err := netip.ParsePrefix(remoteCIDR)
if err != nil {
err = fmt.Errorf("update kni map podCIDR failed, podCIDR is %v, %v", remoteCIDR, err)
Expand All @@ -353,7 +358,7 @@ func (c *IPSecController) generalKNIMapKey(remoteCIDR string) (*lpmKey, error) {
return kniKey, nil
}

func (c *IPSecController) updateKNIMapCIDR(remoteCIDR string, mapfd *ebpf.Map) error {
func (c *Controller) updateKNIMapCIDR(remoteCIDR string, mapfd *ebpf.Map) error {
kniKey, err := c.generalKNIMapKey(remoteCIDR)
if err != nil {
return err
Expand All @@ -364,15 +369,15 @@ func (c *IPSecController) updateKNIMapCIDR(remoteCIDR string, mapfd *ebpf.Map) e
return mapfd.Update(kniKey, &kniValue, ebpf.UpdateAny)
}

func (c *IPSecController) deleteKNIMapCIDR(remoteCIDR string, mapfd *ebpf.Map) {
func (c *Controller) deleteKNIMapCIDR(remoteCIDR string, mapfd *ebpf.Map) {
kniKey, err := c.generalKNIMapKey(remoteCIDR)
if err != nil {
return
}
_ = mapfd.Delete(kniKey)
}

func (c *IPSecController) syncAllNodeInfo() error {
func (c *Controller) syncAllNodeInfo() error {
nodeList, err := c.lister.KmeshNodeInfos(kube.KmeshNamespace).List(labels.Everything())
if err != nil {
return fmt.Errorf("failed to get kmesh node info list: %v", err)
Expand All @@ -388,7 +393,7 @@ func (c *IPSecController) syncAllNodeInfo() error {
return nil
}

func (c *IPSecController) updateLocalKmeshNodeInfo() error {
func (c *Controller) updateLocalKmeshNodeInfo() error {
node, _ := c.lister.KmeshNodeInfos(kube.KmeshNamespace).Get(c.kmeshNodeInfo.Name)
if node == nil {
_, err := c.knclient.Create(context.TODO(), &c.kmeshNodeInfo, metav1.CreateOptions{})
Expand All @@ -410,7 +415,8 @@ func (c *IPSecController) updateLocalKmeshNodeInfo() error {
return nil
}

func (c *IPSecController) CleanAllIPsec() {
// CleanAllIPsec removes all IPSec configurations and rules from the current node
func (c *Controller) CleanAllIPsec() {
nodeNsPath := kmesh_netns.GetNodeNSpath()
cleanFunc := func(netns.NetNS) error {
c.ipsecHandler.Flush()
Expand All @@ -420,7 +426,7 @@ func (c *IPSecController) CleanAllIPsec() {
_ = netns.WithNetNSPath(nodeNsPath, cleanFunc)
}

func (c *IPSecController) processNextItem() bool {
func (c *Controller) processNextItem() bool {
key, quit := c.queue.Get()
if quit {
return false
Expand Down Expand Up @@ -456,7 +462,7 @@ func (c *IPSecController) processNextItem() bool {
}

// this function need ipsechanler mutex lock before use
func (c *IPSecController) handleIpsecUpdate() {
func (c *Controller) handleIpsecUpdate() {
c.kmeshNodeInfo.Spec.SPI = c.ipsecHandler.Spi
nodeNsPath := kmesh_netns.GetNodeNSpath()

Expand Down
Loading