diff --git a/cmd/microshift/main.go b/cmd/microshift/main.go index bf275fb316..a97eedd04a 100644 --- a/cmd/microshift/main.go +++ b/cmd/microshift/main.go @@ -42,5 +42,6 @@ func newCommand() *cobra.Command { cmd.AddCommand(cmds.NewRestoreCommand()) cmd.AddCommand(cmds.NewHealthcheckCommand()) cmd.AddCommand(cmds.NewAddNodeCommand()) + cmd.AddCommand(cmds.NewPromoteLearnerCommand()) return cmd } diff --git a/pkg/cmd/addnode.go b/pkg/cmd/addnode.go index 864876e892..fddecefaaf 100644 --- a/pkg/cmd/addnode.go +++ b/pkg/cmd/addnode.go @@ -99,7 +99,7 @@ func runAddNode(ctx context.Context, opts *AddNodeOptions) error { } nodeName := cfg.CanonicalNodeName() - if isNodeAlreadyInCluster(ctx, client, nodeName) { + if isNodeInKubernetesCluster(ctx, client, nodeName) { klog.Infof("Node %s is already part of the cluster. Skipping join process.", nodeName) return nil } @@ -126,12 +126,12 @@ func runAddNode(ctx context.Context, opts *AddNodeOptions) error { } klog.Info("Etcd certificates generated successfully") - clusterMembers, err := getClusterNodes(ctx, client) + etcdMembers, err := getEtcdClusterNodes(ctx, client) if err != nil { return fmt.Errorf("failed to get cluster information: %w", err) } - if err := configureEtcdForCluster(ctx, cfg, clusterMembers, opts.Learner); err != nil { + if err := configureEtcdForCluster(ctx, cfg, etcdMembers, opts.Learner); err != nil { return fmt.Errorf("failed to configure etcd for cluster: %w", err) } @@ -374,7 +374,7 @@ func generateEtcdCertificates(cfg *config.Config) error { return nil } -func getClusterNodes(ctx context.Context, client kubernetes.Interface) ([]string, error) { +func getEtcdClusterNodes(ctx context.Context, client kubernetes.Interface) ([]string, error) { nodes, err := client.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { return nil, fmt.Errorf("failed to list nodes: %w", err) @@ -382,9 +382,6 @@ func getClusterNodes(ctx context.Context, client kubernetes.Interface) ([]string var members []string for _, node := range nodes.Items { - if !isNodeReady(&node) { - continue - } nodeIP := "" for _, addr := range node.Status.Addresses { if addr.Type == corev1.NodeInternalIP { @@ -393,6 +390,7 @@ func getClusterNodes(ctx context.Context, client kubernetes.Interface) ([]string } } if nodeIP != "" { + //TODO net.JoinHostPort members = append(members, fmt.Sprintf("%s=https://%s:2380", node.Name, nodeIP)) } } @@ -532,7 +530,7 @@ func restartMicroShift() error { return nil } -func isNodeAlreadyInCluster(ctx context.Context, client kubernetes.Interface, nodeName string) bool { +func isNodeInKubernetesCluster(ctx context.Context, client kubernetes.Interface, nodeName string) bool { _, err := client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) if err != nil { return false diff --git a/pkg/cmd/promotelearner.go b/pkg/cmd/promotelearner.go new file mode 100644 index 0000000000..4eb79ab4e2 --- /dev/null +++ b/pkg/cmd/promotelearner.go @@ -0,0 +1,95 @@ +package cmd + +import ( + "context" + "fmt" + + "github.com/openshift/microshift/pkg/config" + "github.com/openshift/microshift/pkg/controllers" + "github.com/openshift/microshift/pkg/version" + "github.com/spf13/cobra" + + "k8s.io/klog/v2" +) + +func NewPromoteLearnerCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "promote-learner", + Short: "Promote learner node to member in the etcd cluster", + Long: `This command promotes the local etcd instance from a learner node to a full voting member within an existing MicroShift etcd cluster. +It: + - Connects to the etcd cluster using the current node's configuration. + - Verifies that the local etcd instance is currently a learner. + - Issues a promote request to the cluster. + - Restarts the MicroShift service to make the membership change effective to apiserver. +Use this command only after the learner node has fully caught up with the cluster and you are ready for it to become a voting member. + `, + RunE: func(cmd *cobra.Command, args []string) error { + return runPromoteLearner(cmd.Context()) + }, + } + + if version.Get().BuildVariant != version.BuildVariantCommunity { + cmd.Hidden = true + } + + return cmd +} + +func runPromoteLearner(ctx context.Context) error { + klog.Info("Starting learner promotion process") + + cfg, err := config.ActiveConfig() + if err != nil { + return fmt.Errorf("failed to load MicroShift configuration: %w", err) + } + + klog.Info("Promoting etcd learner to member") + if err := promoteEtcdLearner(ctx, cfg); err != nil { + return fmt.Errorf("failed to promote etcd learner: %w", err) + } + + klog.Info("etcd node successfully promoted to member. Restart MicroShift service") + + return nil +} + +func promoteEtcdLearner(ctx context.Context, cfg *config.Config) error { + client, err := controllers.GetClusterEtcdClient(ctx, cfg.KubeConfigPath(config.KubeAdmin)) + if err != nil { + return fmt.Errorf("failed to create etcd client: %v", err) + } + defer func() { _ = client.Close() }() + + memberResponse, err := client.MemberList(ctx) + if err != nil { + return fmt.Errorf("failed to list etcd members: %v", err) + } + + found, learner := false, false + var id uint64 = 0 + + for _, member := range memberResponse.Members { + if member.Name == cfg.CanonicalNodeName() { + found = true + if member.IsLearner { + learner = true + id = member.ID + } + } + } + + if !found { + return fmt.Errorf("node %s is not in the etcd cluster", cfg.CanonicalNodeName()) + } + if !learner { + return fmt.Errorf("node %s is not a learner", cfg.CanonicalNodeName()) + } + + response, err := client.MemberPromote(ctx, id) + if err != nil { + return fmt.Errorf("failed to promote etcd learner: %v", err) + } + klog.Infof("Successfully promoted etcd learner %s with response: %v", cfg.CanonicalNodeName(), response) + return nil +} diff --git a/pkg/controllers/etcd.go b/pkg/controllers/etcd.go index e5631c6173..0ce524141e 100644 --- a/pkg/controllers/etcd.go +++ b/pkg/controllers/etcd.go @@ -19,6 +19,7 @@ import ( "context" "errors" "fmt" + "net" "os" "os/exec" "path/filepath" @@ -26,6 +27,10 @@ import ( "github.com/openshift/microshift/pkg/config" "github.com/openshift/microshift/pkg/util/cryptomaterial" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" klog "k8s.io/klog/v2" "go.etcd.io/etcd/client/pkg/v3/transport" @@ -251,3 +256,82 @@ func getEtcdClient(ctx context.Context) (*clientv3.Client, error) { } return cli, nil } + +// GetClusterEtcdClient creates a new etcd client for the cluster. +// It uses the kubeconfig to list the nodes in the cluster to test which ones are learners +// and then creates a new client with voting members only. +func GetClusterEtcdClient(ctx context.Context, kubeConfigPath string) (*clientv3.Client, error) { + certsDir := cryptomaterial.CertsDirectory(config.DataDir) + etcdAPIServerClientCertDir := cryptomaterial.EtcdAPIServerClientCertDir(certsDir) + + tlsInfo := transport.TLSInfo{ + CertFile: cryptomaterial.ClientCertPath(etcdAPIServerClientCertDir), + KeyFile: cryptomaterial.ClientKeyPath(etcdAPIServerClientCertDir), + TrustedCAFile: cryptomaterial.CACertPath(cryptomaterial.EtcdSignerDir(certsDir)), + } + tlsConfig, err := tlsInfo.ClientConfig() + if err != nil { + return nil, err + } + + client, err := clientv3.New(clientv3.Config{ + Endpoints: []string{"https://localhost:2379"}, + DialTimeout: 5 * time.Second, + TLS: tlsConfig, + Context: ctx, + }) + if err != nil { + return nil, err + } + defer func() { _ = client.Close() }() + + restConfig, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath) + if err != nil { + return nil, fmt.Errorf("failed to load kubeconfig from %s: %v", kubeConfigPath, err) + } + adminClient, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return nil, fmt.Errorf("failed to create admin kubernetes client: %w", err) + } + + nodes, err := adminClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to list nodes: %w", err) + } + + var memberEndpoints []string + for _, node := range nodes.Items { + var nodeIP string + for _, addr := range node.Status.Addresses { + if addr.Type == corev1.NodeInternalIP { + nodeIP = addr.Address + break + } + } + if nodeIP == "" { + continue + } + endpoint := net.JoinHostPort(nodeIP, "2379") + status, err := client.Status(ctx, endpoint) + if err != nil { + continue + } + if status != nil && !status.IsLearner { + memberEndpoints = append(memberEndpoints, fmt.Sprintf("https://%s", endpoint)) + } + } + if len(memberEndpoints) == 0 { + memberEndpoints = []string{"https://localhost:2379"} + } + + clusterClient, err := clientv3.New(clientv3.Config{ + Endpoints: memberEndpoints, + DialTimeout: 5 * time.Second, + TLS: tlsConfig, + Context: ctx, + }) + if err != nil { + return nil, err + } + return clusterClient, nil +} diff --git a/pkg/node/kubelet.go b/pkg/node/kubelet.go index 6a0c8ce6d8..2d5a033863 100644 --- a/pkg/node/kubelet.go +++ b/pkg/node/kubelet.go @@ -49,26 +49,82 @@ const ( ) type KubeletServer struct { - kubeletflags *kubeletoptions.KubeletFlags - kubeconfig *kubeletconfig.KubeletConfiguration + cfg *config.Config } func NewKubeletServer(cfg *config.Config) *KubeletServer { - s := &KubeletServer{} - s.configure(cfg) - return s + return &KubeletServer{ + cfg: cfg, + } } func (s *KubeletServer) Name() string { return componentKubelet } func (s *KubeletServer) Dependencies() []string { return []string{"kube-apiserver"} } -func (s *KubeletServer) configure(cfg *config.Config) { - if err := s.writeConfig(cfg); err != nil { - klog.Fatalf("Failed to write kubelet config %v", err) +func (s *KubeletServer) Run(ctx context.Context, ready chan<- struct{}, stopped chan<- struct{}) error { + defer close(stopped) + + kubeletFlags, kubeletConfiguration, err := configure(ctx, s.cfg) + if err != nil { + return fmt.Errorf("failed to configure kubelet: %w", err) + } + + // construct a KubeletServer from kubeletFlags and kubeletConfig + kubeletServer := &kubeletoptions.KubeletServer{ + KubeletFlags: *kubeletFlags, + KubeletConfiguration: *kubeletConfiguration, + } + + kubeletDeps, err := kubelet.UnsecuredDependencies(ctx, kubeletServer, utilfeature.DefaultFeatureGate) + if err != nil { + return fmt.Errorf("error fetching dependencies: %w", err) + } + + errc := make(chan error) + + // Run healthcheck probe and kubelet in parallel. + // No matter which ends first - if it ends with an error, + // it'll cause ServiceManager to trigger graceful shutdown. + + // run readiness check + go func() { + // This endpoint does not use TLS, but reusing the same function without verification. + healthcheckStatus := util.RetryInsecureGet(ctx, "http://localhost:10248/healthz") + if healthcheckStatus != 200 { + e := fmt.Errorf("%s failed to start", s.Name()) + klog.Error(e) + errc <- e + return + } + klog.Infof("%s is ready", s.Name()) + close(ready) + }() + + panicChannel := make(chan any, 1) + go func() { + defer func() { + if r := recover(); r != nil { + panicChannel <- r + } + }() + errc <- kubelet.Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate) + }() + + select { + case err := <-errc: + return err + case perr := <-panicChannel: + panic(perr) + } +} + +func configure(ctx context.Context, cfg *config.Config) (*kubeletoptions.KubeletFlags, *kubeletconfig.KubeletConfiguration, error) { + if err := writeConfig(cfg); err != nil { + return nil, nil, fmt.Errorf("failed to write kubelet config %v", err) } osID, err := loadOSID() if err != nil { - klog.Fatalf("Failed to read OS ID %v", err) + return nil, nil, fmt.Errorf("failed to read OS ID %v", err) } nodeIP := cfg.Node.NodeIP @@ -90,18 +146,17 @@ func (s *KubeletServer) configure(cfg *config.Config) { kubeletFlags.NodeLabels["node.openshift.io/os_id"] = osID kubeletFlags.NodeLabels["node.kubernetes.io/instance-type"] = "rhde" - kubeletConfig, err := loadConfigFile(filepath.Join(config.DataDir, "/resources/kubelet/config/config.yaml")) + kubeletConfig, err := loadConfigFile(ctx, filepath.Join(config.DataDir, "/resources/kubelet/config/config.yaml")) if err != nil { - klog.Fatalf("Failed to load Kubelet Configuration %v", err) + return nil, nil, fmt.Errorf("failed to load Kubelet configuration %v", err) } - s.kubeconfig = kubeletConfig - s.kubeletflags = kubeletFlags + return kubeletFlags, kubeletConfig, nil } -func (s *KubeletServer) writeConfig(cfg *config.Config) error { - data, err := s.generateConfig(cfg) +func writeConfig(cfg *config.Config) error { + data, err := generateConfig(cfg) if err != nil { return err } @@ -114,7 +169,7 @@ func (s *KubeletServer) writeConfig(cfg *config.Config) error { return os.WriteFile(path, data, 0400) } -func (s *KubeletServer) generateConfig(cfg *config.Config) ([]byte, error) { +func generateConfig(cfg *config.Config) ([]byte, error) { certsDir := cryptomaterial.CertsDirectory(config.DataDir) servingCertDir := cryptomaterial.KubeletServingCertDir(certsDir) @@ -164,59 +219,7 @@ func (s *KubeletServer) generateConfig(cfg *config.Config) ([]byte, error) { return data.Bytes(), nil } -func (s *KubeletServer) Run(ctx context.Context, ready chan<- struct{}, stopped chan<- struct{}) error { - defer close(stopped) - - // construct a KubeletServer from kubeletFlags and kubeletConfig - kubeletServer := &kubeletoptions.KubeletServer{ - KubeletFlags: *s.kubeletflags, - KubeletConfiguration: *s.kubeconfig, - } - - kubeletDeps, err := kubelet.UnsecuredDependencies(ctx, kubeletServer, utilfeature.DefaultFeatureGate) - if err != nil { - return fmt.Errorf("error fetching dependencies: %w", err) - } - - errc := make(chan error) - - // Run healthcheck probe and kubelet in parallel. - // No matter which ends first - if it ends with an error, - // it'll cause ServiceManager to trigger graceful shutdown. - - // run readiness check - go func() { - // This endpoint does not use TLS, but reusing the same function without verification. - healthcheckStatus := util.RetryInsecureGet(ctx, "http://localhost:10248/healthz") - if healthcheckStatus != 200 { - e := fmt.Errorf("%s failed to start", s.Name()) - klog.Error(e) - errc <- e - return - } - klog.Infof("%s is ready", s.Name()) - close(ready) - }() - - panicChannel := make(chan any, 1) - go func() { - defer func() { - if r := recover(); r != nil { - panicChannel <- r - } - }() - errc <- kubelet.Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate) - }() - - select { - case err := <-errc: - return err - case perr := <-panicChannel: - panic(perr) - } -} - -func loadConfigFile(name string) (*kubeletconfig.KubeletConfiguration, error) { +func loadConfigFile(ctx context.Context, name string) (*kubeletconfig.KubeletConfiguration, error) { const errFmt = "failed to load Kubelet config file %s, error %v" // compute absolute path based on current working dir kubeletConfigFile, err := filepath.Abs(name) @@ -227,7 +230,7 @@ func loadConfigFile(name string) (*kubeletconfig.KubeletConfiguration, error) { if err != nil { return nil, fmt.Errorf(errFmt, name, err) } - kc, err := loader.Load(context.TODO()) + kc, err := loader.Load(ctx) if err != nil { return nil, fmt.Errorf(errFmt, name, err) } diff --git a/pkg/node/kubelet_test.go b/pkg/node/kubelet_test.go index a98a071c0f..0cea9665de 100644 --- a/pkg/node/kubelet_test.go +++ b/pkg/node/kubelet_test.go @@ -43,8 +43,7 @@ reservedMemory: memory: 1100Mi numaNode: 0` - kubelet := &KubeletServer{} - data, err := kubelet.generateConfig(cfg) + data, err := generateConfig(cfg) assert.NoError(t, err) assert.Contains(t, string(data), expectedConfigPart) }