Skip to content

Commit 4645ebf

Browse files
authored
Server feature list changed and resource updated notifications (#441)
<!-- Provide a brief summary of your changes --> ## Motivation and Context <!-- Why is this change needed? What problem does it solve? --> Implemented thread-safe notification service following eventual consistency principle. Fixes #249 Contracts: * If the client connection and feature update happened in the same thread, the timestamps of the events will be consequent ``` server.connect(client) server.add(feature) // client will receive the notification ``` ``` server.add(feature) server.connect(client) // client will NOT receive the notification ``` * If the client connection and feature update happened in the different threads, both options can be possible The flow of the notification is explained in the diagram: <img width="663" height="413" alt="image" src="https://github.com/user-attachments/assets/e2c4c108-53dd-408d-97ef-97110ce5c979" /> Documentation references: https://modelcontextprotocol.io/specification/2025-06-18/server/prompts#list-changed-notification https://modelcontextprotocol.io/specification/2025-06-18/server/tools#list-changed-notification https://modelcontextprotocol.io/specification/2025-06-18/server/resources#list-changed-notification https://modelcontextprotocol.io/specification/2025-06-18/server/resources#subscriptions Questions: * I haven't found the subscription confirmed response message content docs, how it should look like? ## How Has This Been Tested? <!-- Have you tested this in a real application? Which scenarios were tested? --> Multiple tests with various notification scenarios were added ## Breaking Changes <!-- Will users need to update their code or configurations? --> ## Types of changes <!-- What types of changes does your code introduce? Put an `x` in all the boxes that apply: --> - [ ] Bug fix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) - [ ] Documentation update ## Checklist <!-- Go over all the following points, and put an `x` in all the boxes that apply. --> - [x] I have read the [MCP Documentation](https://modelcontextprotocol.io) - [x] My code follows the repository's style guidelines - [x] New and existing tests pass locally - [x] I have added appropriate error handling - [x] I have added or updated documentation as needed ## Additional context <!-- Add any other context, implementation notes, or design decisions -->
1 parent cc9b50f commit 4645ebf

File tree

13 files changed

+1307
-60
lines changed

13 files changed

+1307
-60
lines changed

kotlin-sdk-server/api/kotlin-sdk-server.api

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ public class io/modelcontextprotocol/kotlin/sdk/server/Server {
8282
public final fun onConnect (Lkotlin/jvm/functions/Function0;)V
8383
public final fun onInitialized (Lkotlin/jvm/functions/Function0;)V
8484
public final fun ping (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
85+
public final fun removeNotificationHandler (Lio/modelcontextprotocol/kotlin/sdk/types/Method;)V
86+
public final fun removeNotificationHandler (Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/types/Method;)V
8587
public final fun removePrompt (Ljava/lang/String;)Z
8688
public final fun removePrompts (Ljava/util/List;)I
8789
public final fun removeResource (Ljava/lang/String;)Z
@@ -93,6 +95,8 @@ public class io/modelcontextprotocol/kotlin/sdk/server/Server {
9395
public final fun sendResourceListChanged (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
9496
public final fun sendResourceUpdated (Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/types/ResourceUpdatedNotification;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
9597
public final fun sendToolListChanged (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
98+
public final fun setNotificationHandler (Lio/modelcontextprotocol/kotlin/sdk/types/Method;Lkotlin/jvm/functions/Function1;)V
99+
public final fun setNotificationHandler (Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/types/Method;Lkotlin/jvm/functions/Function1;)V
96100
}
97101

98102
public final class io/modelcontextprotocol/kotlin/sdk/server/ServerOptions : io/modelcontextprotocol/kotlin/sdk/shared/ProtocolOptions {

kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt

Lines changed: 373 additions & 0 deletions
Large diffs are not rendered by default.

kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureRegistry.kt

Lines changed: 63 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,17 @@ import kotlinx.atomicfu.atomic
55
import kotlinx.atomicfu.getAndUpdate
66
import kotlinx.atomicfu.update
77
import kotlinx.collections.immutable.minus
8+
import kotlinx.collections.immutable.persistentListOf
89
import kotlinx.collections.immutable.persistentMapOf
910
import kotlinx.collections.immutable.toPersistentSet
1011

12+
/**
13+
* A listener interface for receiving notifications about feature changes in registry.
14+
*/
15+
internal interface FeatureListener {
16+
fun onFeatureUpdated(featureKey: String)
17+
}
18+
1119
/**
1220
* A generic registry for managing features of a specified type. This class provides thread-safe
1321
* operations for adding, removing, and retrieving features from the registry.
@@ -33,15 +41,28 @@ internal class FeatureRegistry<T : Feature>(private val featureType: String) {
3341
internal val values: Map<FeatureKey, T>
3442
get() = registry.value
3543

44+
private val listeners = atomic(persistentListOf<FeatureListener>())
45+
46+
internal fun addListener(listener: FeatureListener) {
47+
listeners.update { it.add(listener) }
48+
}
49+
50+
internal fun removeListener(listener: FeatureListener) {
51+
listeners.update { it.remove(listener) }
52+
}
53+
3654
/**
3755
* Adds the specified feature to the registry.
3856
*
3957
* @param feature The feature to be added to the registry.
4058
*/
4159
internal fun add(feature: T) {
4260
logger.info { "Adding $featureType: \"${feature.key}\"" }
43-
registry.update { current -> current.put(feature.key, feature) }
61+
val oldMap = registry.getAndUpdate { current -> current.put(feature.key, feature) }
62+
val oldFeature = oldMap[feature.key]
63+
4464
logger.info { "Added $featureType: \"${feature.key}\"" }
65+
notifyFeatureUpdated(oldFeature, feature)
4566
}
4667

4768
/**
@@ -52,8 +73,13 @@ internal class FeatureRegistry<T : Feature>(private val featureType: String) {
5273
*/
5374
internal fun addAll(features: List<T>) {
5475
logger.info { "Adding ${featureType}s: ${features.size}" }
55-
registry.update { current -> current.putAll(features.associateBy { it.key }) }
76+
val oldMap = registry.getAndUpdate { current -> current.putAll(features.associateBy { it.key }) }
77+
5678
logger.info { "Added ${featureType}s: ${features.size}" }
79+
for (feature in features) {
80+
val oldFeature = oldMap[feature.key]
81+
notifyFeatureUpdated(oldFeature, feature)
82+
}
5783
}
5884

5985
/**
@@ -66,15 +92,17 @@ internal class FeatureRegistry<T : Feature>(private val featureType: String) {
6692
logger.info { "Removing $featureType: \"$key\"" }
6793
val oldMap = registry.getAndUpdate { current -> current.remove(key) }
6894

69-
val removed = key in oldMap
70-
logger.info {
71-
if (removed) {
72-
"Removed $featureType: \"$key\""
73-
} else {
74-
"$featureType not found: \"$key\""
75-
}
95+
val removedFeature = oldMap[key]
96+
val removed = removedFeature != null
97+
98+
if (removed) {
99+
logger.info { "Removed $featureType: \"$key\"" }
100+
notifyFeatureUpdated(removedFeature, null)
101+
} else {
102+
logger.info { "$featureType not found: \"$key\"" }
76103
}
77-
return key in oldMap
104+
105+
return removed
78106
}
79107

80108
/**
@@ -87,13 +115,16 @@ internal class FeatureRegistry<T : Feature>(private val featureType: String) {
87115
logger.info { "Removing ${featureType}s: ${keys.size}" }
88116
val oldMap = registry.getAndUpdate { current -> current - keys.toPersistentSet() }
89117

90-
val removedCount = keys.count { it in oldMap }
91-
logger.info {
92-
if (removedCount > 0) {
93-
"Removed ${featureType}s: $removedCount"
94-
} else {
95-
"No $featureType were removed"
96-
}
118+
val removedFeatures = keys.mapNotNull { oldMap[it] }
119+
val removedCount = removedFeatures.size
120+
121+
if (removedCount > 0) {
122+
logger.info { "Removed ${featureType}s: $removedCount" }
123+
} else {
124+
logger.info { "No $featureType were removed" }
125+
}
126+
removedFeatures.forEach {
127+
notifyFeatureUpdated(it, null)
97128
}
98129

99130
return removedCount
@@ -108,13 +139,22 @@ internal class FeatureRegistry<T : Feature>(private val featureType: String) {
108139
internal fun get(key: FeatureKey): T? {
109140
logger.info { "Getting $featureType: \"$key\"" }
110141
val feature = registry.value[key]
111-
logger.info {
112-
if (feature != null) {
113-
"Got $featureType: \"$key\""
114-
} else {
115-
"$featureType not found: \"$key\""
116-
}
142+
if (feature != null) {
143+
logger.info { "Got $featureType: \"$key\"" }
144+
} else {
145+
logger.info { "$featureType not found: \"$key\"" }
117146
}
147+
118148
return feature
119149
}
150+
151+
private fun notifyFeatureUpdated(oldFeature: T?, newFeature: T?) {
152+
val featureKey = (oldFeature?.key ?: newFeature?.key) ?: run {
153+
logger.error { "Notification should have feature key, but none found" }
154+
return
155+
}
156+
157+
logger.info { "Notifying listeners on feature update" }
158+
listeners.value.forEach { it.onFeatureUpdated(featureKey) }
159+
}
120160
}

kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/Server.kt

Lines changed: 114 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,24 @@ import io.modelcontextprotocol.kotlin.sdk.types.ListToolsRequest
2626
import io.modelcontextprotocol.kotlin.sdk.types.ListToolsResult
2727
import io.modelcontextprotocol.kotlin.sdk.types.LoggingMessageNotification
2828
import io.modelcontextprotocol.kotlin.sdk.types.Method
29+
import io.modelcontextprotocol.kotlin.sdk.types.Notification
2930
import io.modelcontextprotocol.kotlin.sdk.types.Prompt
3031
import io.modelcontextprotocol.kotlin.sdk.types.PromptArgument
3132
import io.modelcontextprotocol.kotlin.sdk.types.ReadResourceRequest
3233
import io.modelcontextprotocol.kotlin.sdk.types.ReadResourceResult
3334
import io.modelcontextprotocol.kotlin.sdk.types.Resource
3435
import io.modelcontextprotocol.kotlin.sdk.types.ResourceUpdatedNotification
3536
import io.modelcontextprotocol.kotlin.sdk.types.ServerCapabilities
37+
import io.modelcontextprotocol.kotlin.sdk.types.SubscribeRequest
3638
import io.modelcontextprotocol.kotlin.sdk.types.TextContent
3739
import io.modelcontextprotocol.kotlin.sdk.types.Tool
3840
import io.modelcontextprotocol.kotlin.sdk.types.ToolAnnotations
3941
import io.modelcontextprotocol.kotlin.sdk.types.ToolSchema
42+
import io.modelcontextprotocol.kotlin.sdk.types.UnsubscribeRequest
4043
import kotlinx.coroutines.CancellationException
44+
import kotlinx.coroutines.Deferred
4145
import kotlinx.serialization.json.JsonObject
46+
import kotlin.time.ExperimentalTime
4247

4348
private val logger = KotlinLogging.logger {}
4449

@@ -58,6 +63,11 @@ public class ServerOptions(public val capabilities: ServerCapabilities, enforceS
5863
* You can register tools, prompts, and resources using [addTool], [addPrompt], and [addResource].
5964
* The server will then automatically handle listing and retrieval requests from the client.
6065
*
66+
* In case the server supports feature list notification or resource substitution,
67+
* the server will automatically send notifications for all connected clients.
68+
* Currently, after subscription to a resource, the server will NOT send the subscription confirmation
69+
* as this response schema is not defined in the protocol.
70+
*
6171
* @param serverInfo Information about this server implementation (name, version).
6272
* @param options Configuration options for the server.
6373
* @param instructionsProvider Optional provider for instructions from the server to the client about how to use
@@ -86,14 +96,6 @@ public open class Server(
8696
block: Server.() -> Unit = {},
8797
) : this(serverInfo, options, { instructions }, block)
8898

89-
private val sessionRegistry = ServerSessionRegistry()
90-
91-
/**
92-
* Provides a snapshot of all sessions currently registered in the server
93-
*/
94-
public val sessions: Map<ServerSessionKey, ServerSession>
95-
get() = sessionRegistry.sessions
96-
9799
@Suppress("ktlint:standard:backing-property-naming")
98100
private var _onInitialized: (() -> Unit) = {}
99101

@@ -103,14 +105,51 @@ public open class Server(
103105
@Suppress("ktlint:standard:backing-property-naming")
104106
private var _onClose: () -> Unit = {}
105107

106-
private val toolRegistry = FeatureRegistry<RegisteredTool>("Tool")
107-
private val promptRegistry = FeatureRegistry<RegisteredPrompt>("Prompt")
108-
private val resourceRegistry = FeatureRegistry<RegisteredResource>("Resource")
108+
@OptIn(ExperimentalTime::class)
109+
private val notificationService = FeatureNotificationService()
110+
111+
private val sessionRegistry = ServerSessionRegistry()
109112

113+
private val toolRegistry = FeatureRegistry<RegisteredTool>("Tool").apply {
114+
if (options.capabilities.tools?.listChanged ?: false) {
115+
addListener(notificationService.toolListChangedListener)
116+
}
117+
}
118+
private val promptRegistry = FeatureRegistry<RegisteredPrompt>("Prompt").apply {
119+
if (options.capabilities.prompts?.listChanged ?: false) {
120+
addListener(notificationService.promptListChangedListener)
121+
}
122+
}
123+
private val resourceRegistry = FeatureRegistry<RegisteredResource>("Resource").apply {
124+
if (options.capabilities.resources?.listChanged ?: false) {
125+
addListener(notificationService.resourceListChangedListener)
126+
}
127+
if (options.capabilities.resources?.subscribe ?: false) {
128+
addListener(notificationService.resourceUpdatedListener)
129+
}
130+
}
131+
132+
/**
133+
* Provides a snapshot of all sessions currently registered in the server
134+
*/
135+
public val sessions: Map<ServerSessionKey, ServerSession>
136+
get() = sessionRegistry.sessions
137+
138+
/**
139+
* Provides a snapshot of all tools currently registered in the server
140+
*/
110141
public val tools: Map<String, RegisteredTool>
111142
get() = toolRegistry.values
143+
144+
/**
145+
* Provides a snapshot of all prompts currently registered in the server
146+
*/
112147
public val prompts: Map<String, RegisteredPrompt>
113148
get() = promptRegistry.values
149+
150+
/**
151+
* Provides a snapshot of all resources currently registered in the server
152+
*/
114153
public val resources: Map<String, RegisteredResource>
115154
get() = resourceRegistry.values
116155

@@ -120,6 +159,7 @@ public open class Server(
120159

121160
public suspend fun close() {
122161
logger.debug { "Closing MCP server" }
162+
notificationService.close()
123163
sessions.forEach { (sessionId, session) ->
124164
logger.info { "Closing session $sessionId" }
125165
session.close()
@@ -182,17 +222,32 @@ public open class Server(
182222
session.setRequestHandler<ListResourceTemplatesRequest>(Method.Defined.ResourcesTemplatesList) { _, _ ->
183223
handleListResourceTemplates()
184224
}
225+
if (options.capabilities.resources?.subscribe ?: false) {
226+
session.setRequestHandler<SubscribeRequest>(Method.Defined.ResourcesSubscribe) { request, _ ->
227+
handleSubscribeResources(session, request)
228+
// Does not return any confirmation as the structure is not stated in the protocol
229+
null
230+
}
231+
session.setRequestHandler<UnsubscribeRequest>(Method.Defined.ResourcesUnsubscribe) { request, _ ->
232+
handleUnsubscribeResources(session, request)
233+
// Does not return any confirmation as the structure is not stated in the protocol
234+
null
235+
}
236+
}
185237
}
186238

187239
// Register cleanup handler to remove session from list when it closes
188240
session.onClose {
189241
logger.debug { "Removing closed session from active sessions list" }
242+
notificationService.unsubscribeSession(session)
190243
sessionRegistry.removeSession(session.sessionId)
191244
}
245+
192246
logger.debug { "Server session connecting to transport" }
193247
session.connect(transport)
194248
logger.debug { "Server session successfully connected to transport" }
195249
sessionRegistry.addSession(session)
250+
notificationService.subscribeSession(session)
196251

197252
_onConnect()
198253
return session
@@ -484,7 +539,25 @@ public open class Server(
484539
}
485540

486541
// --- Internal Handlers ---
487-
private suspend fun handleListTools(): ListToolsResult {
542+
private fun handleSubscribeResources(session: ServerSession, request: SubscribeRequest) {
543+
if (options.capabilities.resources?.subscribe ?: false) {
544+
logger.debug { "Subscribing to resources" }
545+
notificationService.subscribeToResourceUpdate(session, request.params.uri)
546+
} else {
547+
logger.debug { "Failed to subscribe to resources: Server does not support resources capability" }
548+
}
549+
}
550+
551+
private fun handleUnsubscribeResources(session: ServerSession, request: UnsubscribeRequest) {
552+
if (options.capabilities.resources?.subscribe ?: false) {
553+
logger.debug { "Unsubscribing from resources" }
554+
notificationService.unsubscribeFromResourceUpdate(session, request.params.uri)
555+
} else {
556+
logger.debug { "Failed to unsubscribe from resources: Server does not support resources capability" }
557+
}
558+
}
559+
560+
private fun handleListTools(): ListToolsResult {
488561
val toolList = tools.values.map { it.tool }
489562
return ListToolsResult(tools = toolList, nextCursor = null)
490563
}
@@ -518,7 +591,7 @@ public open class Server(
518591
}
519592
}
520593

521-
private suspend fun handleListPrompts(): ListPromptsResult {
594+
private fun handleListPrompts(): ListPromptsResult {
522595
logger.debug { "Handling list prompts request" }
523596
return ListPromptsResult(prompts = prompts.values.map { it.prompt })
524597
}
@@ -534,7 +607,7 @@ public open class Server(
534607
return prompt.messageProvider(request)
535608
}
536609

537-
private suspend fun handleListResources(): ListResourcesResult {
610+
private fun handleListResources(): ListResourcesResult {
538611
logger.debug { "Handling list resources request" }
539612
return ListResourcesResult(resources = resources.values.map { it.resource })
540613
}
@@ -550,7 +623,7 @@ public open class Server(
550623
return resource.readHandler(request)
551624
}
552625

553-
private suspend fun handleListResourceTemplates(): ListResourceTemplatesResult {
626+
private fun handleListResourceTemplates(): ListResourceTemplatesResult {
554627
// If you have resource templates, return them here. For now, return empty.
555628
return ListResourceTemplatesResult(listOf())
556629
}
@@ -675,4 +748,30 @@ public open class Server(
675748
}
676749
}
677750
// End the ServerSession redirection section
751+
752+
// Start the notification handling section
753+
public fun <T : Notification> setNotificationHandler(method: Method, handler: (notification: T) -> Deferred<Unit>) {
754+
sessions.forEach { (_, session) ->
755+
session.setNotificationHandler(method, handler)
756+
}
757+
}
758+
759+
public fun removeNotificationHandler(method: Method) {
760+
sessions.forEach { (_, session) ->
761+
session.removeNotificationHandler(method)
762+
}
763+
}
764+
765+
public fun <T : Notification> setNotificationHandler(
766+
sessionId: String,
767+
method: Method,
768+
handler: (notification: T) -> Deferred<Unit>,
769+
) {
770+
sessionRegistry.getSessionOrNull(sessionId)?.setNotificationHandler(method, handler)
771+
}
772+
773+
public fun removeNotificationHandler(sessionId: String, method: Method) {
774+
sessionRegistry.getSessionOrNull(sessionId)?.removeNotificationHandler(method)
775+
}
776+
// End the notification handling section
678777
}

0 commit comments

Comments
 (0)