Skip to content

Commit d8a8a56

Browse files
authored
Merge branch 'main' into renovate/golang.org-x-sync-0.x
2 parents 57b29fa + 2b6a4a7 commit d8a8a56

32 files changed

+971
-130
lines changed

cmd/thv-operator/api/v1alpha1/virtualmcpserver_types.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ type VirtualMCPServerSpec struct {
5454
// +kubebuilder:pruning:PreserveUnknownFields
5555
// +kubebuilder:validation:Type=object
5656
PodTemplateSpec *runtime.RawExtension `json:"podTemplateSpec,omitempty"`
57+
58+
// Telemetry configures OpenTelemetry-based observability for the Virtual MCP server
59+
// including distributed tracing, OTLP metrics export, and Prometheus metrics endpoint
60+
// +optional
61+
Telemetry *TelemetryConfig `json:"telemetry,omitempty"`
5762
}
5863

5964
// GroupRef references an MCPGroup resource
@@ -524,6 +529,10 @@ const (
524529

525530
// ConditionReasonCompositeToolRefNotFound indicates a referenced VirtualMCPCompositeToolDefinition was not found
526531
ConditionReasonCompositeToolRefNotFound = "CompositeToolRefNotFound"
532+
533+
// ConditionReasonCompositeToolRefInvalid indicates a referenced VirtualMCPCompositeToolDefinition is invalid
534+
ConditionReasonCompositeToolRefInvalid = "CompositeToolRefInvalid"
535+
527536
// ConditionReasonVirtualMCPServerPodTemplateSpecValid indicates PodTemplateSpec validation succeeded
528537
ConditionReasonVirtualMCPServerPodTemplateSpecValid = "PodTemplateSpecValid"
529538

cmd/thv-operator/api/v1alpha1/zz_generated.deepcopy.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/thv-operator/controllers/virtualmcpserver_controller.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,29 @@ func (r *VirtualMCPServerReconciler) validateCompositeToolRefs(
334334
ctxLogger.Error(err, "Failed to get VirtualMCPCompositeToolDefinition", "name", ref.Name)
335335
return err
336336
}
337+
338+
// Check that the composite tool definition is validated and valid
339+
if compositeToolDef.Status.ValidationStatus == mcpv1alpha1.ValidationStatusInvalid {
340+
message := fmt.Sprintf("Referenced VirtualMCPCompositeToolDefinition %s is invalid", ref.Name)
341+
if len(compositeToolDef.Status.ValidationErrors) > 0 {
342+
message = fmt.Sprintf("%s: %s", message, strings.Join(compositeToolDef.Status.ValidationErrors, "; "))
343+
}
344+
statusManager.SetPhase(mcpv1alpha1.VirtualMCPServerPhaseFailed)
345+
statusManager.SetMessage(message)
346+
statusManager.SetCompositeToolRefsValidatedCondition(
347+
mcpv1alpha1.ConditionReasonCompositeToolRefInvalid,
348+
message,
349+
metav1.ConditionFalse,
350+
)
351+
return fmt.Errorf("referenced VirtualMCPCompositeToolDefinition %s is invalid", ref.Name)
352+
}
353+
354+
// If ValidationStatus is Unknown, we still allow it (validation might be in progress)
355+
// but log a warning
356+
if compositeToolDef.Status.ValidationStatus == mcpv1alpha1.ValidationStatusUnknown {
357+
ctxLogger.V(1).Info("Referenced composite tool definition validation status is Unknown, proceeding",
358+
"name", ref.Name, "namespace", vmcp.Namespace)
359+
}
337360
}
338361

339362
// All composite tool refs are valid

cmd/thv-operator/pkg/vmcpconfig/converter.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1"
1717
"github.com/stacklok/toolhive/cmd/thv-operator/pkg/oidc"
18+
"github.com/stacklok/toolhive/cmd/thv-operator/pkg/spectoconfig"
1819
authtypes "github.com/stacklok/toolhive/pkg/vmcp/auth/types"
1920
vmcpconfig "github.com/stacklok/toolhive/pkg/vmcp/config"
2021
)
@@ -110,6 +111,8 @@ func (c *Converter) Convert(
110111
config.Operational = c.convertOperational(ctx, vmcp)
111112
}
112113

114+
config.Telemetry = spectoconfig.ConvertTelemetryConfig(ctx, vmcp.Spec.Telemetry, vmcp.Name)
115+
113116
// Apply operational defaults (fills missing values)
114117
config.EnsureOperationalDefaults()
115118

cmd/thv/app/run.go

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@ import (
77
"net"
88
"net/url"
99
"os"
10-
"os/signal"
1110
"strings"
12-
"syscall"
1311
"time"
1412

1513
"github.com/spf13/cobra"
@@ -126,7 +124,7 @@ func init() {
126124
AddOIDCFlags(runCmd)
127125
}
128126

129-
func cleanupAndWait(workloadManager workloads.Manager, name string, cancel context.CancelFunc, errCh <-chan error) {
127+
func cleanupAndWait(workloadManager workloads.Manager, name string) {
130128
cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 30*time.Second)
131129
defer cleanupCancel()
132130

@@ -138,13 +136,6 @@ func cleanupAndWait(workloadManager workloads.Manager, name string, cancel conte
138136
logger.Warnf("DeleteWorkloads group error for %q: %v", name, err)
139137
}
140138
}
141-
142-
cancel()
143-
select {
144-
case <-errCh:
145-
case <-time.After(5 * time.Second):
146-
logger.Warnf("Timeout waiting for workload to stop")
147-
}
148139
}
149140

150141
// nolint:gocyclo // This function is complex by design
@@ -304,28 +295,26 @@ func getworkloadDefaultName(_ context.Context, serverOrImage string) string {
304295
}
305296

306297
func runForeground(ctx context.Context, workloadManager workloads.Manager, runnerConfig *runner.RunConfig) error {
307-
ctx, cancel := context.WithCancel(ctx)
308-
defer cancel()
309-
310-
sigCh := make(chan os.Signal, 1)
311-
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
312-
defer signal.Stop(sigCh)
313298

314299
errCh := make(chan error, 1)
315300
go func() {
316301
errCh <- workloadManager.RunWorkload(ctx, runnerConfig)
317302
}()
318303

319-
select {
320-
case sig := <-sigCh:
321-
if !process.IsDetached() {
322-
logger.Infof("Received signal: %v, stopping server %q", sig, runnerConfig.BaseName)
323-
cleanupAndWait(workloadManager, runnerConfig.BaseName, cancel, errCh)
324-
}
325-
return nil
326-
case err := <-errCh:
327-
return err
304+
// workloadManager.RunWorkload will block until the context is cancelled
305+
// or an unrecoverable error is returned. In either case, it will stop the server.
306+
// We wait until workloadManager.RunWorkload exits before deleting the workload,
307+
// so stopping and deleting don't race.
308+
//
309+
// There's room for improvement in the factoring here.
310+
// Shutdown and cancellation logic is unnecessarily spread across two goroutines.
311+
err := <-errCh
312+
if !process.IsDetached() {
313+
logger.Infof("RunWorkload Exited. Error: %v, stopping server %q", err, runnerConfig.BaseName)
314+
cleanupAndWait(workloadManager, runnerConfig.BaseName)
328315
}
316+
return err
317+
329318
}
330319

331320
func validateGroup(ctx context.Context, workloadsManager workloads.Manager, serverOrImage string) error {

cmd/thv/main.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
package main
33

44
import (
5+
"context"
56
"os"
67
"os/signal"
78
"syscall"
@@ -12,7 +13,6 @@ import (
1213
"github.com/stacklok/toolhive/cmd/thv/app"
1314
"github.com/stacklok/toolhive/pkg/client"
1415
"github.com/stacklok/toolhive/pkg/container"
15-
"github.com/stacklok/toolhive/pkg/container/runtime"
1616
"github.com/stacklok/toolhive/pkg/lockfile"
1717
"github.com/stacklok/toolhive/pkg/logger"
1818
"github.com/stacklok/toolhive/pkg/migration"
@@ -23,7 +23,7 @@ func main() {
2323
logger.Initialize()
2424

2525
// Setup signal handling for graceful cleanup
26-
setupSignalHandler()
26+
ctx := setupSignalHandler()
2727

2828
// Clean up stale lock files on startup
2929
cleanupStaleLockFiles()
@@ -47,8 +47,10 @@ func main() {
4747
migration.CheckAndPerformDefaultGroupMigration()
4848
}
4949

50+
cmd := app.NewRootCmd(!app.IsCompletionCommand(os.Args))
51+
5052
// Skip update check for completion command or if we are running in kubernetes
51-
if err := app.NewRootCmd(!app.IsCompletionCommand(os.Args) && !runtime.IsKubernetesRuntime()).Execute(); err != nil {
53+
if err := cmd.ExecuteContext(ctx); err != nil {
5254
// Clean up any remaining lock files on error exit
5355
lockfile.CleanupAllLocks()
5456
os.Exit(1)
@@ -59,16 +61,19 @@ func main() {
5961
}
6062

6163
// setupSignalHandler configures signal handling to ensure lock files are cleaned up
62-
func setupSignalHandler() {
64+
func setupSignalHandler() context.Context {
6365
sigCh := make(chan os.Signal, 1)
6466
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
6567

68+
ctx, cancel := context.WithCancel(context.Background())
6669
go func() {
6770
<-sigCh
6871
logger.Debugf("Received signal, cleaning up lock files...")
6972
lockfile.CleanupAllLocks()
70-
os.Exit(0)
73+
cancel()
7174
}()
75+
76+
return ctx
7277
}
7378

7479
// cleanupStaleLockFiles removes stale lock files from known directories on startup

cmd/vmcp/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ The Virtual MCP Server (vmcp) is a standalone binary that aggregates multiple MC
1414
-**Session Management**: MCP protocol session tracking with TTL-based cleanup
1515
-**Health Endpoints**: `/health` and `/ping` for service monitoring
1616
-**Configuration Validation**: `vmcp validate` command for config verification
17+
-**Observability**: OpenTelemetry metrics and traces for backend operations and workflow executions
1718

1819
### In Progress
1920
- 🚧 **Incoming Authentication** (Issue #165): OIDC, local, anonymous authentication
@@ -121,6 +122,7 @@ vmcp uses a YAML configuration file to define:
121122
3. **Outgoing Authentication**: Virtual MCP → Backend API token exchange
122123
4. **Tool Aggregation**: Conflict resolution and filtering strategies
123124
5. **Operational Settings**: Timeouts, health checks, circuit breakers
125+
6. **Telemetry**: OpenTelemetry metrics/tracing and Prometheus endpoint
124126

125127
See [examples/vmcp-config.yaml](../../examples/vmcp-config.yaml) for a complete example.
126128

cmd/vmcp/app/commands.go

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/stacklok/toolhive/pkg/env"
1313
"github.com/stacklok/toolhive/pkg/groups"
1414
"github.com/stacklok/toolhive/pkg/logger"
15+
"github.com/stacklok/toolhive/pkg/telemetry"
1516
"github.com/stacklok/toolhive/pkg/vmcp"
1617
"github.com/stacklok/toolhive/pkg/vmcp/aggregator"
1718
"github.com/stacklok/toolhive/pkg/vmcp/auth/factory"
@@ -288,7 +289,6 @@ func runServe(cmd *cobra.Command, _ []string) error {
288289
// Create router
289290
rtr := vmcprouter.NewDefaultRouter()
290291

291-
// Setup authentication middleware
292292
logger.Infof("Setting up incoming authentication (type: %s)", cfg.IncomingAuth.Type)
293293

294294
authMiddleware, authInfoHandler, err := factory.NewIncomingAuthMiddleware(ctx, cfg.IncomingAuth)
@@ -303,13 +303,30 @@ func runServe(cmd *cobra.Command, _ []string) error {
303303
host, _ := cmd.Flags().GetString("host")
304304
port, _ := cmd.Flags().GetInt("port")
305305

306+
// If telemetry is configured, create the provider.
307+
var telemetryProvider *telemetry.Provider
308+
if cfg.Telemetry != nil {
309+
var err error
310+
telemetryProvider, err = telemetry.NewProvider(ctx, *cfg.Telemetry)
311+
if err != nil {
312+
return fmt.Errorf("failed to create telemetry provider: %w", err)
313+
}
314+
defer func() {
315+
err := telemetryProvider.Shutdown(ctx)
316+
if err != nil {
317+
logger.Errorf("failed to shutdown telemetry provider: %v", err)
318+
}
319+
}()
320+
}
321+
306322
serverCfg := &vmcpserver.Config{
307-
Name: cfg.Name,
308-
Version: getVersion(),
309-
Host: host,
310-
Port: port,
311-
AuthMiddleware: authMiddleware,
312-
AuthInfoHandler: authInfoHandler,
323+
Name: cfg.Name,
324+
Version: getVersion(),
325+
Host: host,
326+
Port: port,
327+
AuthMiddleware: authMiddleware,
328+
AuthInfoHandler: authInfoHandler,
329+
TelemetryProvider: telemetryProvider,
313330
}
314331

315332
// Convert composite tool configurations to workflow definitions
@@ -322,7 +339,7 @@ func runServe(cmd *cobra.Command, _ []string) error {
322339
}
323340

324341
// Create server with discovery manager, backends, and workflow definitions
325-
srv, err := vmcpserver.New(serverCfg, rtr, backendClient, discoveryMgr, backends, workflowDefs)
342+
srv, err := vmcpserver.New(ctx, serverCfg, rtr, backendClient, discoveryMgr, backends, workflowDefs)
326343
if err != nil {
327344
return fmt.Errorf("failed to create Virtual MCP Server: %w", err)
328345
}

deploy/charts/operator-crds/Chart.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@ apiVersion: v2
22
name: toolhive-operator-crds
33
description: A Helm chart for installing the ToolHive Operator CRDs into Kubernetes.
44
type: application
5-
version: 0.0.75
5+
version: 0.0.76
66
appVersion: "0.0.1"

deploy/charts/operator-crds/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# ToolHive Operator CRDs Helm Chart
22

3-
![Version: 0.0.75](https://img.shields.io/badge/Version-0.0.75-informational?style=flat-square)
3+
![Version: 0.0.76](https://img.shields.io/badge/Version-0.0.76-informational?style=flat-square)
44
![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square)
55

66
A Helm chart for installing the ToolHive Operator CRDs into Kubernetes.

0 commit comments

Comments
 (0)