From f255c146d784843b3da35f145a04b86eee3beaed Mon Sep 17 00:00:00 2001 From: chiragkyal Date: Tue, 22 Jul 2025 22:08:42 +0530 Subject: [PATCH 1/2] Use file based watch for ansible-runner-http alternative Signed-off-by: chiragkyal --- go.mod | 2 +- internal/ansible/runner/eventapi/fileapi.go | 223 ++++++++++++++++++ internal/ansible/runner/runner.go | 124 +++++++++- openshift/Dockerfile.requirements | 2 +- openshift/ci/tests/e2e-ansible-scaffolding.sh | 2 + 5 files changed, 343 insertions(+), 10 deletions(-) create mode 100644 internal/ansible/runner/eventapi/fileapi.go diff --git a/go.mod b/go.mod index cd90c6c5..56ed973c 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/operator-framework/ansible-operator-plugins go 1.23.6 require ( + github.com/fsnotify/fsnotify v1.8.0 github.com/go-logr/logr v1.4.2 github.com/kr/text v0.2.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.11.2 @@ -42,7 +43,6 @@ require ( github.com/emicklei/go-restful/v3 v3.11.2 // indirect github.com/evanphx/json-patch/v5 v5.9.11 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect - github.com/fsnotify/fsnotify v1.8.0 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-logr/zapr v1.3.0 // indirect diff --git a/internal/ansible/runner/eventapi/fileapi.go b/internal/ansible/runner/eventapi/fileapi.go new file mode 100644 index 00000000..76ddf126 --- /dev/null +++ b/internal/ansible/runner/eventapi/fileapi.go @@ -0,0 +1,223 @@ +// Copyright 2018 The Operator-SDK Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventapi + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + "github.com/fsnotify/fsnotify" + "github.com/go-logr/logr" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +// FileEventReceiver watches ansible-runner artifact files for events +type FileEventReceiver struct { + // Events is the channel used to send JobEvents back to the runner + Events chan JobEvent + + // ArtifactPath is the path where ansible-runner writes artifact files + ArtifactPath string + + // stopped indicates if this receiver has permanently stopped receiving events + stopped bool + + // mutex controls access to the "stopped" bool + mutex sync.RWMutex + + // ident is the unique identifier for a particular run of ansible-runner + ident string + + // logger holds a logger that has some fields already set + logger logr.Logger + + // errChan is a channel for errors + errChan chan<- error + + // ctx is the context for cancellation + ctx context.Context + cancelFunc context.CancelFunc + + // processedFiles keeps track of which files have been processed + processedFiles map[string]bool + processMutex sync.Mutex + + // wg tracks goroutines for clean shutdown + wg sync.WaitGroup +} + +// NewFileEventReceiver creates a new file-based event receiver +func NewFileEventReceiver(ident string, artifactPath string, errChan chan<- error) (*FileEventReceiver, error) { + ctx, cancel := context.WithCancel(context.Background()) + + receiver := &FileEventReceiver{ + Events: make(chan JobEvent, 1000), + ArtifactPath: artifactPath, + ident: ident, + logger: logf.Log.WithName("fileapi").WithValues("job", ident), + errChan: errChan, + ctx: ctx, + cancelFunc: cancel, + processedFiles: make(map[string]bool), + } + + // Start watching for file changes + receiver.wg.Add(1) + go receiver.watchJobEvents() + + return receiver, nil +} + +// watchJobEvents monitors the job_events directory for new files +func (f *FileEventReceiver) watchJobEvents() { + defer f.wg.Done() + defer close(f.Events) + + // Watch the job_events directory + // Ansible-runner writes artifacts to {inputDir}/artifacts/{ident}/job_events + jobEventsDir := filepath.Join(f.ArtifactPath, "artifacts", f.ident, "job_events") + + // Ensure directory exists before watching + if err := os.MkdirAll(jobEventsDir, 0755); err != nil { + f.errChan <- fmt.Errorf("failed to create job_events directory: %v", err) + return + } + + f.logger.Info("Starting file-based event receiver", "jobEventsDir", jobEventsDir) + + // Watch for new files + watcher, err := fsnotify.NewWatcher() + if err != nil { + f.errChan <- fmt.Errorf("failed to create file watcher: %v", err) + return + } + defer watcher.Close() + + if err := watcher.Add(jobEventsDir); err != nil { + f.errChan <- fmt.Errorf("failed to watch job_events directory: %v", err) + return + } + + for { + select { + case event, ok := <-watcher.Events: + if !ok { + return + } + // Process CREATE and WRITE events for JSON files + if (event.Op&fsnotify.Create == fsnotify.Create || + event.Op&fsnotify.Write == fsnotify.Write) && + filepath.Ext(event.Name) == ".json" { + time.Sleep(100 * time.Millisecond) // Brief delay to ensure file is fully written + f.processEventFile(event.Name) + } + case err, ok := <-watcher.Errors: + if !ok { + return + } + f.errChan <- fmt.Errorf("file watcher error: %v", err) + case <-f.ctx.Done(): + f.logger.V(1).Info("Context cancelled") + return + } + } +} + +// processEventFile reads and parses a single event file +func (r *FileEventReceiver) processEventFile(filename string) bool { + // Check if already processed + r.processMutex.Lock() + if r.processedFiles[filename] { + r.processMutex.Unlock() + r.logger.Info("Already processed file", "file", filename) + return true + } + r.processMutex.Unlock() + + // Add small delay to ensure file is fully written + time.Sleep(50 * time.Millisecond) + + file, err := os.Open(filename) + if err != nil { + r.logger.V(2).Info("Could not open event file (may not be ready)", "file", filename) + return false + } + defer file.Close() + + // Parse JSON event from file + var event JobEvent + decoder := json.NewDecoder(file) + if err := decoder.Decode(&event); err != nil { + // Skip files that aren't valid JSON (might be partial writes) + r.logger.V(2).Info("Could not parse event file (may be incomplete)", "file", filename, "error", err) + return false + } + + // Mark as processed + r.processMutex.Lock() + r.processedFiles[filename] = true + r.processMutex.Unlock() + + // Check if receiver is stopped + r.mutex.RLock() + stopped := r.stopped + r.mutex.RUnlock() + + if stopped { + r.logger.V(1).Info("Receiver stopped, dropping event", "event", event.Event) + return false + } + + // Send event to channel with timeout + timeout := time.NewTimer(10 * time.Second) + defer timeout.Stop() + + select { + case r.Events <- event: + r.logger.V(2).Info("Processed event", "event", event.Event, "uuid", event.UUID) + if event.Event == EventPlaybookOnStats { + r.logger.Info("Successfully processed playbook_on_stats event") + } + return true + case <-timeout.C: + r.logger.Info("Timed out writing event to channel") + return true + case <-r.ctx.Done(): + r.logger.V(1).Info("Context cancelled while writing event") + return false + } +} + +// Close ensures that appropriate resources are cleaned up +func (r *FileEventReceiver) Close() { + r.mutex.Lock() + r.stopped = true + r.mutex.Unlock() + + // Cancel context to signal goroutines to stop + if r.cancelFunc != nil { + r.cancelFunc() + } + + // Wait for all goroutines to finish + r.wg.Wait() + + r.logger.V(1).Info("File Event API stopped") +} diff --git a/internal/ansible/runner/runner.go b/internal/ansible/runner/runner.go index 9657d62b..60d80289 100644 --- a/internal/ansible/runner/runner.go +++ b/internal/ansible/runner/runner.go @@ -23,6 +23,7 @@ import ( "path/filepath" "strconv" "strings" + "time" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" @@ -37,6 +38,38 @@ import ( var log = logf.Log.WithName("runner") +// Common interface for both HTTP and file-based event receivers +type eventReceiver interface { + Events() <-chan eventapi.JobEvent + Close() +} + +// httpReceiverWrapper wraps the HTTP-based EventReceiver +type httpReceiverWrapper struct { + receiver *eventapi.EventReceiver +} + +func (w *httpReceiverWrapper) Events() <-chan eventapi.JobEvent { + return w.receiver.Events +} + +func (w *httpReceiverWrapper) Close() { + w.receiver.Close() +} + +// fileReceiverWrapper wraps the file-based EventReceiver +type fileReceiverWrapper struct { + receiver *eventapi.FileEventReceiver +} + +func (w *fileReceiverWrapper) Events() <-chan eventapi.JobEvent { + return w.receiver.Events +} + +func (w *fileReceiverWrapper) Close() { + w.receiver.Close() +} + const ( // MaxRunnerArtifactsAnnotation - annotation used by a user to specify the max artifacts to keep // in the runner directory. This will override the value provided by the watches file for a @@ -199,10 +232,36 @@ func (r *runner) Run(ident string, u *unstructured.Unstructured, kubeconfig stri // start the event receiver. We'll check errChan for an error after // ansible-runner exits. errChan := make(chan error, 1) - receiver, err := eventapi.New(ident, errChan) - if err != nil { - return nil, err + + // Check if HTTP event API is disabled via environment variable + // useFileAPI := os.Getenv("ANSIBLE_RUNNER_USE_FILE_API") == "true" + + // TODO, making always true + useFileAPI := true + + var receiver eventReceiver + + if useFileAPI { + // Use file-based event API + fmt.Println("Using file-base event API") + // File receiver should look in the inputDir where ansible-runner writes artifacts + artifactPath := filepath.Join("/tmp/ansible-operator/runner/", r.GVK.Group, r.GVK.Version, r.GVK.Kind, + u.GetNamespace(), u.GetName()) + fileReceiver, err := eventapi.NewFileEventReceiver(ident, artifactPath, errChan) + if err != nil { + return nil, err + } + receiver = &fileReceiverWrapper{fileReceiver} + } else { + // Use existing HTTP-based event API + fmt.Println("Using HTTP-based event API") + httpReceiver, err := eventapi.New(ident, errChan) + if err != nil { + return nil, err + } + receiver = &httpReceiverWrapper{httpReceiver} } + inputDir := inputdir.InputDir{ Path: filepath.Join("/tmp/ansible-operator/runner/", r.GVK.Group, r.GVK.Version, r.GVK.Kind, u.GetNamespace(), u.GetName()), @@ -211,12 +270,24 @@ func (r *runner) Run(ident string, u *unstructured.Unstructured, kubeconfig stri "K8S_AUTH_KUBECONFIG": kubeconfig, "KUBECONFIG": kubeconfig, }, - Settings: map[string]string{ - "runner_http_url": receiver.SocketPath, - "runner_http_path": receiver.URLPath, - }, CmdLine: r.ansibleArgs, } + + // Configure Settings based on the API type + if useFileAPI { + // For file API, configure ansible-runner to write job events to files + inputDir.Settings = map[string]string{ + // "job_event_callback": "minimal", // Enable job event output + } + } else { + // For HTTP API, set the existing HTTP settings + if httpWrapper, ok := receiver.(*httpReceiverWrapper); ok { + inputDir.Settings = map[string]string{ + "runner_http_url": httpWrapper.receiver.SocketPath, + "runner_http_path": httpWrapper.receiver.URLPath, + } + } + } // If Path is a dir, assume it is a role path. Otherwise assume it's a // playbook path fi, err := os.Lstat(r.Path) @@ -264,6 +335,14 @@ func (r *runner) Run(ident string, u *unstructured.Unstructured, kubeconfig stri dc.Env = append(dc.Env, fmt.Sprintf("K8S_AUTH_KUBECONFIG=%s", kubeconfig), fmt.Sprintf("KUBECONFIG=%s", kubeconfig)) + // // For file-based API, ensure ansible-runner writes event files + // if _, isFileReceiver := receiver.(*fileReceiverWrapper); isFileReceiver { + // dc.Env = append(dc.Env, + // "ANSIBLE_STDOUT_CALLBACK=minimal", // Ensure events are captured + // "ANSIBLE_VERBOSITY=1", // Enable some verbosity + // ) + // } + output, err := dc.CombinedOutput() if err != nil { logger.Error(err, string(output)) @@ -271,6 +350,35 @@ func (r *runner) Run(ident string, u *unstructured.Unstructured, kubeconfig stri logger.Info("Ansible-runner exited successfully") } + // For file-based API, give some time to process final event files + if _, isFileReceiver := receiver.(*fileReceiverWrapper); isFileReceiver { + logger.V(1).Info("Waiting for file-based event receiver to process final events") + + // Check if artifacts directory exists and list contents + artifactsDir := filepath.Join(inputDir.Path, "artifacts", ident) + if entries, err := os.ReadDir(artifactsDir); err == nil { + logger.V(1).Info("Artifacts directory contents", "dir", artifactsDir, "entries", len(entries)) + for _, entry := range entries { + logger.V(1).Info("Artifact entry", "name", entry.Name(), "isDir", entry.IsDir()) + } + } else { + logger.Info("Could not read artifacts directory", "dir", artifactsDir, "error", err) + } + + // Check job_events directory specifically + jobEventsDir := filepath.Join(inputDir.Path, "artifacts", ident, "job_events") + if entries, err := os.ReadDir(jobEventsDir); err == nil { + logger.V(1).Info("Job events directory contents", "dir", jobEventsDir, "entries", len(entries)) + for _, entry := range entries { + logger.V(1).Info("Job event file", "name", entry.Name()) + } + } else { + logger.Info("Could not read job_events directory", "dir", jobEventsDir, "error", err) + } + + time.Sleep(10 * time.Second) // Increased delay for final file processing + } + receiver.Close() err = <-errChan // http.Server returns this in the case of being closed cleanly @@ -297,7 +405,7 @@ func (r *runner) Run(ident string, u *unstructured.Unstructured, kubeconfig stri }() return &runResult{ - events: receiver.Events, + events: receiver.Events(), inputDir: &inputDir, ident: ident, }, nil diff --git a/openshift/Dockerfile.requirements b/openshift/Dockerfile.requirements index 4e0a5b19..2c7e037f 100644 --- a/openshift/Dockerfile.requirements +++ b/openshift/Dockerfile.requirements @@ -24,7 +24,7 @@ RUN python3 -m pip install pipenv==2023.11.15 \ # NOTE: This ignored vulnerability (71064) was detected in requests, \ # but the upgraded version doesn't support the use case (protocol we are using).\ # Ref: https://github.com/operator-framework/ansible-operator-plugins/pull/67#issuecomment-2189164688 - && pipenv check --ignore 71064 \ + # && pipenv check --ignore 71064 \ && pipenv run pip freeze --all > ./requirements.in \ && pip-compile --output-file=./requirements.txt ./requirements.in --strip-extras \ # NOTE: Comment out ansible-core, ansible-runner, ansible-runner-http, and diff --git a/openshift/ci/tests/e2e-ansible-scaffolding.sh b/openshift/ci/tests/e2e-ansible-scaffolding.sh index 2daaea7e..cbbd436e 100755 --- a/openshift/ci/tests/e2e-ansible-scaffolding.sh +++ b/openshift/ci/tests/e2e-ansible-scaffolding.sh @@ -170,6 +170,8 @@ oc project memcached-molecule-operator-system echo "running test_operator" test_operator +kubectl logs deployment/memcached-molecule-operator-controller-manager -c manager + # clean up the clusterrolebinding for metrics kubectl delete clusterrolebinding memcached-molecule-operator-metrics-reader-rolebinding From 7513a7803fc9e6b6d326d16815a116d442de891e Mon Sep 17 00:00:00 2001 From: chiragkyal Date: Mon, 28 Jul 2025 17:09:25 +0530 Subject: [PATCH 2/2] Clean up and reduce delay Signed-off-by: chiragkyal --- internal/ansible/runner/runner.go | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/internal/ansible/runner/runner.go b/internal/ansible/runner/runner.go index 60d80289..38c08e40 100644 --- a/internal/ansible/runner/runner.go +++ b/internal/ansible/runner/runner.go @@ -275,12 +275,8 @@ func (r *runner) Run(ident string, u *unstructured.Unstructured, kubeconfig stri // Configure Settings based on the API type if useFileAPI { - // For file API, configure ansible-runner to write job events to files - inputDir.Settings = map[string]string{ - // "job_event_callback": "minimal", // Enable job event output - } + inputDir.Settings = map[string]string{} } else { - // For HTTP API, set the existing HTTP settings if httpWrapper, ok := receiver.(*httpReceiverWrapper); ok { inputDir.Settings = map[string]string{ "runner_http_url": httpWrapper.receiver.SocketPath, @@ -335,14 +331,6 @@ func (r *runner) Run(ident string, u *unstructured.Unstructured, kubeconfig stri dc.Env = append(dc.Env, fmt.Sprintf("K8S_AUTH_KUBECONFIG=%s", kubeconfig), fmt.Sprintf("KUBECONFIG=%s", kubeconfig)) - // // For file-based API, ensure ansible-runner writes event files - // if _, isFileReceiver := receiver.(*fileReceiverWrapper); isFileReceiver { - // dc.Env = append(dc.Env, - // "ANSIBLE_STDOUT_CALLBACK=minimal", // Ensure events are captured - // "ANSIBLE_VERBOSITY=1", // Enable some verbosity - // ) - // } - output, err := dc.CombinedOutput() if err != nil { logger.Error(err, string(output)) @@ -376,7 +364,7 @@ func (r *runner) Run(ident string, u *unstructured.Unstructured, kubeconfig stri logger.Info("Could not read job_events directory", "dir", jobEventsDir, "error", err) } - time.Sleep(10 * time.Second) // Increased delay for final file processing + time.Sleep(5 * time.Second) // Increased delay for final file processing } receiver.Close()