Skip to content

Commit d9c1c0f

Browse files
add sse for user app
1 parent 44db9ea commit d9c1c0f

File tree

5 files changed

+108
-1
lines changed

5 files changed

+108
-1
lines changed

http/update.http

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
###
2+
GET {{host}}/update/user
3+
Content-Type: text/event-stream
4+
#Authorization: Bearer {{oauthToken}}

src/main/kotlin/com/softeno/template/app/config/security/SecurityConfig.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,9 @@ class SecurityConfig {
100100
"/webjars/**",
101101
"/swagger-resources/**",
102102
"/swagger-ui/**",
103-
"/v3/api-docs/**"
103+
"/v3/api-docs/**",
104+
// todo: make this endpoint secure
105+
"/update/**"
104106
)
105107
.permitAll()
106108
.pathMatchers("/reactive/**", "/coroutine/**", "/ws/**", "/graphql/**", "/external/**").hasAuthority("ROLE_ADMIN")

src/main/kotlin/com/softeno/template/app/event/AppEvent.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.softeno.template.app.event
22

33
import com.softeno.template.app.kafka.dto.KafkaMessage
4+
import com.softeno.template.app.user.service.UserUpdateEmitter
45
import com.softeno.template.sample.http.internal.serverevents.Event
56
import com.softeno.template.sample.http.internal.serverevents.UserNotificationService
67
import com.softeno.template.sample.kafka.ReactiveKafkaSampleProducer
@@ -20,6 +21,7 @@ data class AppEvent(val source: String, val traceId: String? = null, val spanId:
2021
@Component
2122
class SampleApplicationEventPublisher(
2223
private val reactiveMessageService: ReactiveMessageService,
24+
private val userUpdateEmitter: UserUpdateEmitter,
2325
private val reactiveKafkaProducer: ReactiveKafkaSampleProducer,
2426
private val userNotificationService: UserNotificationService,
2527
) : ApplicationListener<AppEvent> {
@@ -35,6 +37,7 @@ class SampleApplicationEventPublisher(
3537

3638
log.info("[event handler]: Received event: $event")
3739
reactiveMessageService.broadcast(event.toMessage())
40+
userUpdateEmitter.broadcast(event.toMessage())
3841

3942
reactiveKafkaProducer.send(event.toKafkaMessage())
4043

src/main/kotlin/com/softeno/template/app/user/api/Controller.kt

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,23 @@ import com.softeno.template.app.permission.api.PermissionDto
66
import com.softeno.template.app.user.UserModifyCommand
77
import com.softeno.template.app.user.mapper.toDto
88
import com.softeno.template.app.user.service.UserService
9+
import com.softeno.template.app.user.service.UserUpdateEmitter
910
import io.micrometer.tracing.Tracer
1011
import kotlinx.coroutines.flow.Flow
1112
import kotlinx.coroutines.flow.map
1213
import org.apache.commons.logging.LogFactory
1314
import org.slf4j.MDC
15+
import org.springframework.http.MediaType
16+
import org.springframework.http.codec.ServerSentEvent
17+
import org.springframework.http.server.reactive.ServerHttpResponse
1418
import org.springframework.security.core.annotation.AuthenticationPrincipal
1519
import org.springframework.stereotype.Component
1620
import org.springframework.validation.annotation.Validated
1721
import org.springframework.web.bind.annotation.*
1822
import org.springframework.web.server.ServerWebExchange
1923
import org.springframework.web.server.WebFilter
2024
import org.springframework.web.server.WebFilterChain
25+
import reactor.core.publisher.Flux
2126
import reactor.core.publisher.Mono
2227
import java.security.Principal
2328
import java.time.LocalDateTime
@@ -88,7 +93,23 @@ class CoroutineUserController(
8893

8994
@GetMapping("/usersCount")
9095
suspend fun getUserSize(): Long = userService.size()
96+
}
9197

98+
@RestController
99+
@RequestMapping("/update/")
100+
@Validated
101+
class UpdateController(
102+
val updateEmitter: UserUpdateEmitter
103+
) {
104+
@GetMapping("/user", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
105+
fun getUserUpdate(response: ServerHttpResponse): Flux<ServerSentEvent<String>> {
106+
response.headers.apply {
107+
set("Cache-Control", "no-cache, no-store, must-revalidate")
108+
set("Connection", "keep-alive")
109+
set("X-Accel-Buffering", "no")
110+
}
111+
return updateEmitter.getSink()
112+
}
92113
}
93114

94115
data class UserDto(

src/main/kotlin/com/softeno/template/app/user/service/Service.kt

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.softeno.template.app.user.service
22

3+
import com.fasterxml.jackson.databind.ObjectMapper
34
import com.softeno.template.app.common.ErrorFactory
45
import com.softeno.template.app.common.PrincipalHandler
56
import com.softeno.template.app.common.getPageRequest
@@ -16,6 +17,8 @@ import com.softeno.template.app.user.db.UserCoroutineRepository
1617
import com.softeno.template.app.user.db.UserDocument
1718
import com.softeno.template.app.user.mapper.toDocument
1819
import com.softeno.template.app.user.mapper.toDomain
20+
import com.softeno.template.sample.websocket.Message
21+
import com.softeno.template.sample.websocket.toJson
1922
import io.micrometer.tracing.Tracer
2023
import kotlinx.coroutines.flow.Flow
2124
import kotlinx.coroutines.flow.asFlow
@@ -26,9 +29,15 @@ import kotlinx.coroutines.withContext
2629
import org.apache.commons.logging.LogFactory
2730
import org.slf4j.MDC
2831
import org.springframework.context.ApplicationEventPublisher
32+
import org.springframework.http.codec.ServerSentEvent
33+
import org.springframework.stereotype.Component
2934
import org.springframework.stereotype.Service
35+
import reactor.core.publisher.Flux
3036
import reactor.core.publisher.Mono
37+
import reactor.core.publisher.Sinks
38+
import reactor.util.concurrent.Queues
3139
import java.security.Principal
40+
import java.time.Duration
3241

3342

3443
@Service
@@ -149,3 +158,71 @@ class UserDocumentService(
149158
return@withContext userCoroutineRepository.deleteById(id)
150159
}
151160
}
161+
162+
@Component
163+
class UserUpdateEmitter(
164+
private val objectMapper: ObjectMapper,
165+
) {
166+
private val sink: Sinks.Many<ServerSentEvent<String>> = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false)
167+
private val log = LogFactory.getLog(javaClass)
168+
169+
fun getSink(): Flux<ServerSentEvent<String>> {
170+
val heartbeatFlux = Flux.interval(Duration.ofSeconds(10))
171+
.map {
172+
ServerSentEvent.builder<String>()
173+
.event("heartbeat")
174+
.data(Message(from = "SYSTEM", to = "ALL", content = "ping").toJson(objectMapper))
175+
.build()
176+
}.doOnError { error -> log.error("Heartbeat error", error) }
177+
178+
179+
val events = sink.asFlux()
180+
.doOnSubscribe { log.info("New SSE client subscribed") }
181+
.doOnCancel { log.info("SSE client disconnected") }
182+
.doOnTerminate { log.info("SSE client terminated") }
183+
.doOnError { error -> log.error("Event stream error", error) }
184+
185+
186+
return Flux.merge(heartbeatFlux, events)
187+
.doOnCancel { log.info("Canceling SSE stream") }
188+
.doOnTerminate { log.info("Terminating SSE stream") }
189+
.onErrorResume { error ->
190+
log.error("SSE stream error, sending error event", error)
191+
Mono.just(
192+
ServerSentEvent.builder<String>()
193+
.event("error")
194+
.data(Message(from = "SYSTEM", to = "ALL", content = "Connection error: ${error.message}").toJson(objectMapper))
195+
.build()
196+
)
197+
}
198+
}
199+
200+
fun broadcast(message: Message): Boolean =
201+
try {
202+
val payload = message.toJson(objectMapper)
203+
val sse = ServerSentEvent.builder(payload).event("update").build()
204+
val result = sink.tryEmitNext(sse)
205+
206+
when (result) {
207+
Sinks.EmitResult.OK -> {
208+
log.debug("Message broadcasted successfully: ${message.content}")
209+
true
210+
}
211+
Sinks.EmitResult.FAIL_CANCELLED -> {
212+
log.warn("Failed to broadcast message - emitter cancelled")
213+
false
214+
}
215+
Sinks.EmitResult.FAIL_OVERFLOW -> {
216+
log.warn("Failed to broadcast message - buffer overflow")
217+
false
218+
}
219+
else -> {
220+
log.error("Failed to broadcast message: $result")
221+
false
222+
}
223+
}
224+
} catch (e: Exception) {
225+
log.error("Error broadcasting message", e)
226+
false
227+
}
228+
}

0 commit comments

Comments
 (0)