@@ -68,6 +68,19 @@ type Engine struct {
6868 subCancels map [string ]map [string ]mcpservice.CancelSubscription
6969}
7070
71+ // resourceEventSource is a narrow internal interface implemented by static
72+ // resource containers (e.g. ResourcesContainer) which expose only event
73+ // channels (UpdatedChan + ListChangedChan). The container does NOT manage
74+ // per-session subscription state or goroutines; the engine synthesizes the
75+ // ResourceSubscriptionCapability by attaching its own forwarder goroutines to
76+ // these channels. This avoids duplicate bookkeeping layer (container + engine)
77+ // and centralizes lifecycle (unsubscribe, session teardown, fanout) inside the
78+ // engine.
79+ type resourceEventSource interface {
80+ UpdatedChan (uri string ) <- chan struct {}
81+ ListChangedChan () <- chan struct {}
82+ }
83+
7184func NewEngine (host sessions.SessionHost , srv mcpservice.ServerCapabilities , opts ... EngineOption ) * Engine {
7285 e := & Engine {
7386 host : host ,
@@ -205,10 +218,13 @@ func (e *Engine) InitializeSession(ctx context.Context, userID string, req *mcp.
205218 ListChanged bool `json:"listChanged"`
206219 Subscribe bool `json:"subscribe"`
207220 }{}
221+ // Prefer explicit subscription capability; otherwise infer from event source.
208222 if subCap , hasSub , subErr := resCap .GetSubscriptionCapability (ctx , sess ); subErr != nil {
209223 return nil , nil , fmt .Errorf ("get resources subscription capability: %w" , subErr )
210224 } else if hasSub && subCap != nil {
211225 entry .Subscribe = true
226+ } else if _ , ok := resCap .(resourceEventSource ); ok {
227+ entry .Subscribe = true
212228 }
213229 if lcCap , hasLC , lcErr := resCap .GetListChangedCapability (ctx , sess ); lcErr != nil {
214230 return nil , nil , fmt .Errorf ("get resources listChanged capability: %w" , lcErr )
@@ -347,14 +363,26 @@ func (e *Engine) handleResourcesSubscribe(ctx context.Context, sess *SessionHand
347363 log .InfoContext (ctx , "engine.handle_request.unsupported" , slog .Int64 ("dur_ms" , time .Since (start ).Milliseconds ()))
348364 return jsonrpc .NewErrorResponse (req .ID , jsonrpc .ErrorCodeMethodNotFound , "resources capability not supported" , nil ), nil
349365 }
350- subCap , hasSub , err := resCap .GetSubscriptionCapability (ctx , sess )
351- if err != nil {
352- log .ErrorContext (ctx , "engine.handle_request.fail" , slog .String ("err" , err .Error ()))
353- return jsonrpc .NewErrorResponse (req .ID , jsonrpc .ErrorCodeInternalError , "internal error" , nil ), nil
354- }
355- if ! hasSub || subCap == nil {
356- log .InfoContext (ctx , "engine.handle_request.unsupported" , slog .Int64 ("dur_ms" , time .Since (start ).Milliseconds ()))
357- return jsonrpc .NewErrorResponse (req .ID , jsonrpc .ErrorCodeMethodNotFound , "subscriptions not supported" , nil ), nil
366+ // Determine subscribe support via either provider capability or internal event source.
367+ var (
368+ useEventSource bool
369+ evSrc resourceEventSource
370+ subCap mcpservice.ResourceSubscriptionCapability
371+ )
372+ if rs , ok := resCap .(resourceEventSource ); ok {
373+ useEventSource = true
374+ evSrc = rs
375+ } else {
376+ sc , hasSub , err := resCap .GetSubscriptionCapability (ctx , sess )
377+ if err != nil {
378+ log .ErrorContext (ctx , "engine.handle_request.fail" , slog .String ("err" , err .Error ()))
379+ return jsonrpc .NewErrorResponse (req .ID , jsonrpc .ErrorCodeInternalError , "internal error" , nil ), nil
380+ }
381+ if ! hasSub || sc == nil {
382+ log .InfoContext (ctx , "engine.handle_request.unsupported" , slog .Int64 ("dur_ms" , time .Since (start ).Milliseconds ()))
383+ return jsonrpc .NewErrorResponse (req .ID , jsonrpc .ErrorCodeMethodNotFound , "subscriptions not supported" , nil ), nil
384+ }
385+ subCap = sc
358386 }
359387
360388 // Idempotency: if already subscribed, succeed.
@@ -388,11 +416,33 @@ func (e *Engine) handleResourcesSubscribe(ctx context.Context, sess *SessionHand
388416 }
389417 }
390418
391- cancel , err := subCap .Subscribe (ctx , sess , params .URI , emit )
392- if err != nil {
393- // Treat not found or validation as InvalidParams if detectable; otherwise internal error.
394- log .InfoContext (ctx , "engine.handle_request.subscribe.fail" , slog .String ("err" , err .Error ()))
395- return jsonrpc .NewErrorResponse (req .ID , jsonrpc .ErrorCodeInvalidParams , "invalid params" , nil ), nil
419+ var cancel mcpservice.CancelSubscription
420+ if useEventSource {
421+ // Synthesize subscription: attach to UpdatedChan and forward events until canceled.
422+ ch := evSrc .UpdatedChan (params .URI )
423+ base := context .WithoutCancel (ctx )
424+ fwdCtx , fwdCancel := context .WithCancel (base )
425+ cancel = func (_ context.Context ) error { fwdCancel (); return nil }
426+ go func () {
427+ for {
428+ select {
429+ case <- fwdCtx .Done ():
430+ return
431+ case _ , ok := <- ch :
432+ if ! ok {
433+ return
434+ }
435+ emit (fwdCtx , params .URI )
436+ }
437+ }
438+ }()
439+ } else {
440+ c , err := subCap .Subscribe (ctx , sess , params .URI , emit )
441+ if err != nil {
442+ log .InfoContext (ctx , "engine.handle_request.subscribe.fail" , slog .String ("err" , err .Error ()))
443+ return jsonrpc .NewErrorResponse (req .ID , jsonrpc .ErrorCodeInvalidParams , "invalid params" , nil ), nil
444+ }
445+ cancel = c
396446 }
397447
398448 e .subMu .Lock ()
@@ -947,7 +997,19 @@ func (e *Engine) registerListChangedEmitters(ctx context.Context, sess *SessionH
947997 if resCap , ok , err := e .srv .GetResourcesCapability (bg , sess ); err == nil && ok && resCap != nil {
948998 if lc , hasLC , lErr := resCap .GetListChangedCapability (bg , sess ); lErr == nil && hasLC && lc != nil {
949999 _ , _ = lc .Register (bg , sess , func (cbCtx context.Context , s sessions.Session , uri string ) {
950- publishNote (mcp .ResourcesListChangedNotificationMethod )
1000+ // Build JSON-RPC notification
1001+ note := & jsonrpc.Request {JSONRPCVersion : jsonrpc .ProtocolVersion , Method : string (mcp .ResourcesListChangedNotificationMethod )}
1002+ bytes , err := json .Marshal (note )
1003+ if err != nil {
1004+ return
1005+ }
1006+ // Local publish (origin instance)
1007+ _ , _ = e .host .PublishSession (context .WithoutCancel (cbCtx ), sid , bytes )
1008+ // Cross-node fanout (best-effort)
1009+ outer := fanoutMessage {SessionID : sid , UserID : s .UserID (), Msg : bytes }
1010+ if payload , err := json .Marshal (outer ); err == nil {
1011+ _ = e .host .PublishEvent (context .WithoutCancel (cbCtx ), sessionFanoutTopic , payload )
1012+ }
9511013 })
9521014 }
9531015 }
@@ -1143,6 +1205,15 @@ func (e *Engine) handleSessionEvent(ctx context.Context, msg []byte) error {
11431205 }
11441206 e .subMu .Unlock ()
11451207 return nil
1208+ case string (mcp .ResourcesListChangedNotificationMethod ):
1209+ // Fanout of listChanged notification from another instance: publish full note to client.
1210+ bytes , err := json .Marshal (req )
1211+ if err == nil {
1212+ if _ , perr := e .host .PublishSession (context .WithoutCancel (ctx ), fanout .SessionID , bytes ); perr != nil {
1213+ e .log .ErrorContext (ctx , "engine.handle_session_event.publish_fail" , slog .String ("err" , perr .Error ()))
1214+ }
1215+ }
1216+ return nil
11461217 default :
11471218 // Unknown request; ignore.
11481219 return nil
0 commit comments