Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/analysisengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type ClusterInfo struct {
Region string
CloudProvider string
Version string
Expiration string
}

// Config holds configuration for the analysis engine
Expand Down
81 changes: 80 additions & 1 deletion internal/reporter/slack.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"

commonslack "github.com/openshift/osde2e/pkg/common/slack"
Expand Down Expand Up @@ -40,7 +42,25 @@ func (s *SlackReporter) Report(ctx context.Context, result *AnalysisResult, conf
// Create simple message
message := s.formatMessage(result, config)

// Send to Slack using common package
// Check if we have bot token for enhanced functionality
botToken, hasBotToken := config.Settings["bot_token"].(string)
channel, hasChannel := config.Settings["channel"].(string)
reportDir, hasReportDir := config.Settings["report_dir"].(string)

// If we have bot token, use chat.postMessage with file attachments
if hasBotToken && botToken != "" && hasChannel && channel != "" && hasReportDir && reportDir != "" {
logFiles := s.collectLogFiles(reportDir)
if err := s.client.PostMessageWithFiles(ctx, botToken, channel, message, logFiles); err != nil {
// Fall back to webhook on error
fmt.Fprintf(os.Stderr, "Warning: Failed to post message with files, falling back to webhook: %v\n", err)
if err := s.client.SendWebhook(ctx, webhookURL, message); err != nil {
return fmt.Errorf("failed to send to Slack: %w", err)
}
}
return nil
}

// Fall back to simple webhook (no files)
if err := s.client.SendWebhook(ctx, webhookURL, message); err != nil {
return fmt.Errorf("failed to send to Slack: %w", err)
}
Expand All @@ -62,6 +82,25 @@ func (s *SlackReporter) formatMessage(result *AnalysisResult, config *ReporterCo
summary := fmt.Sprintf("%s Pipeline Failed at E2E Test\n", statusEmoji)
text := ""

// Add cluster information to summary
if clusterInfo, ok := config.Settings["cluster_info"].(*ClusterInfo); ok && clusterInfo != nil {
summary += "\n====== Cluster Information ======\n"
summary += fmt.Sprintf("Cluster ID: %s\n", clusterInfo.ID)
if clusterInfo.Expiration != "" {
summary += fmt.Sprintf("Expiration: %s\n", clusterInfo.Expiration)
}
if clusterInfo.Name != "" {
summary += fmt.Sprintf("Name: %s\n", clusterInfo.Name)
}
if clusterInfo.Version != "" {
summary += fmt.Sprintf("Version: %s\n", clusterInfo.Version)
}
if clusterInfo.Provider != "" {
summary += fmt.Sprintf("Provider: %s\n", clusterInfo.Provider)
}
summary += "\n"
}

if image, ok := config.Settings["image"].(string); ok && image != "" {
imageInfo := strings.Split(image, ":")
image := imageInfo[0]
Expand Down Expand Up @@ -143,6 +182,46 @@ func (s *SlackReporter) formatAnalysisContent(content string) string {
return formatted.String()
}

// collectLogFiles collects all log and XML files from the report directory
func (s *SlackReporter) collectLogFiles(reportDir string) []string {
var files []string

err := filepath.Walk(reportDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return nil // Skip errors, continue walking
}

if info.IsDir() {
return nil
}

// Collect .log, .txt, and .xml files
ext := strings.ToLower(filepath.Ext(path))
if ext == ".log" || ext == ".txt" || ext == ".xml" {
files = append(files, path)
}

return nil
})

if err != nil {
fmt.Fprintf(os.Stderr, "Warning: Error collecting log files: %v\n", err)
}

return files
}

// ClusterInfo holds cluster information for reporting (mirrored from analysisengine to avoid import cycle)
type ClusterInfo struct {
ID string
Name string
Provider string
Region string
CloudProvider string
Version string
Expiration string
}

// SlackReporterConfig creates a reporter config for Slack
func SlackReporterConfig(webhookURL string, enabled bool) ReporterConfig {
return ReporterConfig{
Expand Down
9 changes: 9 additions & 0 deletions pkg/common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,12 +633,17 @@ var LogAnalysis = struct {
// SlackChannel is the default Slack channel for OSDE2E notifications
// Env: LOG_ANALYSIS_SLACK_CHANNEL
SlackChannel string

// SlackBotToken is the Slack bot token for enhanced features (file attachments)
// Env: LOG_ANALYSIS_SLACK_BOT_TOKEN
SlackBotToken string
}{
EnableAnalysis: "logAnalysis.enableAnalysis",
APIKey: "logAnalysis.apiKey",
Model: "logAnalysis.model",
SlackWebhook: "logAnalysis.slackWebhook",
SlackChannel: "logAnalysis.slackChannel",
SlackBotToken: "logAnalysis.slackBotToken",
}

func InitOSDe2eViper() {
Expand Down Expand Up @@ -947,6 +952,10 @@ func InitOSDe2eViper() {

viper.SetDefault(LogAnalysis.SlackChannel, defaultNotificationsChannel)
_ = viper.BindEnv(LogAnalysis.SlackChannel, "LOG_ANALYSIS_SLACK_CHANNEL")

viper.SetDefault(LogAnalysis.SlackBotToken, "")
_ = viper.BindEnv(LogAnalysis.SlackBotToken, "LOG_ANALYSIS_SLACK_BOT_TOKEN")
RegisterSecret(LogAnalysis.SlackBotToken, "slack-bot-token")
}

func init() {
Expand Down
162 changes: 162 additions & 0 deletions pkg/common/slack/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"mime/multipart"
"net/http"
"os"
"path/filepath"
"time"
)

Expand Down Expand Up @@ -83,3 +87,161 @@ func SendWebhook(ctx context.Context, webhookURL string, payload interface{}) er
client := NewClient()
return client.SendWebhook(ctx, webhookURL, payload)
}

// PostMessageWithFiles posts a message to Slack with file attachments using chat.postMessage + files.upload
// This provides a richer experience than webhooks by attaching files to the message
func (c *Client) PostMessageWithFiles(ctx context.Context, botToken string, channel string, message interface{}, filePaths []string) error {
// First, post the message to get a timestamp
ts, err := c.postMessage(ctx, botToken, channel, message)
if err != nil {
return fmt.Errorf("failed to post message: %w", err)
}

// Then upload files as threaded replies to that message
if len(filePaths) > 0 {
for _, filePath := range filePaths {
if err := c.uploadFileToThread(ctx, botToken, channel, ts, filePath); err != nil {
// Log error but continue with other files
fmt.Fprintf(os.Stderr, "Warning: Failed to upload %s: %v\n", filePath, err)
}
}
}

return nil
}

// postMessage posts a message using chat.postMessage and returns the message timestamp
func (c *Client) postMessage(ctx context.Context, botToken string, channel string, message interface{}) (string, error) {
// Extract text content from message
var text string
if msg, ok := message.(map[string]interface{}); ok {
if summary, ok := msg["summary"].(string); ok {
text = summary
}
if analysis, ok := msg["analysis"].(string); ok {
if text != "" {
text += "\n\n"
}
text += analysis
}
}

// Create payload for chat.postMessage
payload := map[string]interface{}{
"channel": channel,
"text": text,
}

jsonData, err := json.Marshal(payload)
if err != nil {
return "", fmt.Errorf("failed to marshal payload: %w", err)
}

req, err := http.NewRequestWithContext(ctx, "POST", "https://slack.com/api/chat.postMessage", bytes.NewBuffer(jsonData))
if err != nil {
return "", fmt.Errorf("failed to create request: %w", err)
}

req.Header.Set("Content-Type", "application/json; charset=utf-8")
req.Header.Set("Authorization", "Bearer "+botToken)

client := &http.Client{Timeout: c.timeout}
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("HTTP request failed: %w", err)
}
defer resp.Body.Close()

respBody, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("failed to read response: %w", err)
}

var result map[string]interface{}
if err := json.Unmarshal(respBody, &result); err != nil {
return "", fmt.Errorf("failed to parse response: %w", err)
}

if ok, exists := result["ok"].(bool); !exists || !ok {
if errMsg, exists := result["error"].(string); exists {
return "", fmt.Errorf("slack API error: %s", errMsg)
}
return "", fmt.Errorf("slack API returned ok=false")
}

// Extract timestamp
ts, _ := result["ts"].(string)
return ts, nil
}

// uploadFileToThread uploads a file to a Slack thread
func (c *Client) uploadFileToThread(ctx context.Context, botToken string, channel string, threadTs string, filePath string) error {
file, err := os.Open(filepath.Clean(filePath))
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
defer file.Close()

body := &bytes.Buffer{}
writer := multipart.NewWriter(body)

// Add file field
part, err := writer.CreateFormFile("file", filepath.Base(filePath))
if err != nil {
return fmt.Errorf("failed to create form file: %w", err)
}

if _, err := io.Copy(part, file); err != nil {
return fmt.Errorf("failed to copy file content: %w", err)
}

// Add channel field
if err := writer.WriteField("channels", channel); err != nil {
return fmt.Errorf("failed to write channel field: %w", err)
}

// Add thread_ts to attach to message thread
if threadTs != "" {
if err := writer.WriteField("thread_ts", threadTs); err != nil {
return fmt.Errorf("failed to write thread_ts field: %w", err)
}
}

if err := writer.Close(); err != nil {
return fmt.Errorf("failed to close multipart writer: %w", err)
}

req, err := http.NewRequestWithContext(ctx, "POST", "https://slack.com/api/files.upload", body)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}

req.Header.Set("Content-Type", writer.FormDataContentType())
req.Header.Set("Authorization", "Bearer "+botToken)

client := &http.Client{Timeout: c.timeout}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("HTTP request failed: %w", err)
}
defer resp.Body.Close()

respBody, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response: %w", err)
}

var result map[string]interface{}
if err := json.Unmarshal(respBody, &result); err != nil {
return fmt.Errorf("failed to parse response: %w", err)
}

if ok, exists := result["ok"].(bool); !exists || !ok {
if errMsg, exists := result["error"].(string); exists {
return fmt.Errorf("slack API error: %s", errMsg)
}
return fmt.Errorf("slack API returned ok=false")
}

return nil
}
15 changes: 15 additions & 0 deletions pkg/e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,22 @@ func runLogAnalysis(ctx context.Context, err error) {
return
}

// Get cluster expiration timestamp
clusterExpiration := ""
if provider != nil {
if cluster, err := provider.GetCluster(viper.GetString(config.Cluster.ID)); err == nil {
clusterExpiration = cluster.ExpirationTimestamp().Format("2006-01-02 15:04:05 MST")
}
}

clusterInfo := &analysisengine.ClusterInfo{
ID: viper.GetString(config.Cluster.ID),
Name: viper.GetString(config.Cluster.Name),
Provider: viper.GetString(config.Provider),
Region: viper.GetString(config.CloudProvider.Region),
CloudProvider: viper.GetString(config.CloudProvider.CloudProviderID),
Version: viper.GetString(config.Cluster.Version),
Expiration: clusterExpiration,
}

// Setup notification config - composable approach for multiple reporters
Expand All @@ -63,9 +72,15 @@ func runLogAnalysis(ctx context.Context, err error) {
enableSlackNotify := viper.GetBool(config.Tests.EnableSlackNotify)
slackWebhook := viper.GetString(config.LogAnalysis.SlackWebhook)
defaultChannel := viper.GetString(config.LogAnalysis.SlackChannel)
slackBotToken := viper.GetString(config.LogAnalysis.SlackBotToken)
if enableSlackNotify && slackWebhook != "" && defaultChannel != "" {
slackConfig := reporter.SlackReporterConfig(slackWebhook, true)
slackConfig.Settings["channel"] = defaultChannel
slackConfig.Settings["cluster_info"] = clusterInfo
slackConfig.Settings["report_dir"] = reportDir
if slackBotToken != "" {
slackConfig.Settings["bot_token"] = slackBotToken
}
reporters = append(reporters, slackConfig)
}

Expand Down