Skip to content

[gateway] draft SSE implementation #243

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ android {

dependencies {
def ktor_version = "2.2.4"
def okhttp_version = "4.10.0"
def room_version = "2.4.3"
def coroutines_version = "1.6.1"
def work_version = "2.7.1"
Expand All @@ -80,6 +81,9 @@ dependencies {
implementation("io.ktor:ktor-client-content-negotiation:$ktor_version")
implementation("io.ktor:ktor-client-okhttp:$ktor_version")

implementation("com.squareup.okhttp3:okhttp:$okhttp_version")
implementation("com.squareup.okhttp3:okhttp-sse:$okhttp_version")

// Kotlin + coroutines
implementation "androidx.work:work-runtime-ktx:$work_version"

Expand Down
11 changes: 8 additions & 3 deletions app/src/main/AndroidManifest.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@
android:allowBackup="true"
android:icon="@mipmap/ic_launcher"
android:label="@string/app_name"
android:networkSecurityConfig="@xml/network_security_config"
android:roundIcon="@mipmap/ic_launcher_round"
android:supportsRtl="true"
android:networkSecurityConfig="@xml/network_security_config"
android:theme="@style/Theme.SmsGateway">
<service
android:name=".modules.gateway.services.SSEForegroundService"
android:enabled="true"
android:exported="false"
android:foregroundServiceType="dataSync" />
<service
android:name=".modules.ping.services.PingForegroundService"
android:enabled="true"
Expand Down Expand Up @@ -84,8 +89,8 @@

<service
android:name=".modules.localserver.WebService"
android:foregroundServiceType="connectedDevice"
android:enabled="true" />
android:enabled="true"
android:foregroundServiceType="connectedDevice" />
<service
android:name="androidx.work.impl.foreground.SystemForegroundService"
android:foregroundServiceType="dataSync"
Expand Down
123 changes: 123 additions & 0 deletions app/src/main/java/me/capcom/smsgateway/helpers/SSEManager.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package me.capcom.smsgateway.helpers

import android.util.Log
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.launch
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.Response
import okhttp3.sse.EventSource
import okhttp3.sse.EventSourceListener
import okhttp3.sse.EventSources
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean

class SSEManager(
private val url: String,
private val authToken: String
) {
private val client = OkHttpClient.Builder()
.readTimeout(1, TimeUnit.HOURS)
.build()
private val scope = CoroutineScope(Dispatchers.IO + Job())

private var eventSource: EventSource? = null
private var reconnectAttempts = 0
private val isDisconnecting = AtomicBoolean(false)

// Event callbacks
var onEvent: ((type: String?, data: String) -> Unit)? = null
var onConnected: (() -> Unit)? = null
var onError: ((Throwable?) -> Unit)? = null
var onClosed: (() -> Unit)? = null

fun connect() {
isDisconnecting.set(false)
scope.launch {
try {
val request = Request.Builder()
.url(url)
.apply {
header("Authorization", "Bearer $authToken")
}
.build()

eventSource = EventSources.createFactory(client)
.newEventSource(request, object : EventSourceListener() {
override fun onOpen(eventSource: EventSource, response: Response) {
Log.d(TAG, "SSE connected")
reconnectAttempts = 0
onConnected?.invoke()
}

override fun onEvent(
eventSource: EventSource,
id: String?,
type: String?,
data: String
) {
Log.d(TAG, "Event received: $type - $data")
onEvent?.invoke(type, data)
}

override fun onClosed(eventSource: EventSource) {
Log.d(TAG, "SSE connection closed")
onClosed?.invoke()
scheduleReconnect()
}

override fun onFailure(
eventSource: EventSource,
t: Throwable?,
response: Response?
) {
Log.e(TAG, "SSE error", t)
onError?.invoke(t)
scheduleReconnect()
}
})
} catch (e: Exception) {
Log.e(TAG, "Connection failed", e)
scheduleReconnect()
}
}
}

fun disconnect() {
isDisconnecting.set(true)
scope.launch {
eventSource?.cancel()
eventSource = null
reconnectAttempts = 0
}
scope.coroutineContext.cancelChildren()
}

private fun scheduleReconnect() {
if (isDisconnecting.get()) {
return
}

reconnectAttempts++
val delay = when {
reconnectAttempts > 10 -> 60_000L // 1 minute
reconnectAttempts > 5 -> 30_000L // 30 seconds
else -> 5_000L // 5 seconds
}

scope.launch {
eventSource?.cancel()
eventSource = null
Log.d(TAG, "Reconnecting in ${delay}ms (attempt $reconnectAttempts)")
kotlinx.coroutines.delay(delay)
connect()
}
}

companion object {
const val TAG = "SSEManager"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ class SettingsHelper(private val context: Context) {
settings.edit { putBoolean(PREF_KEY_AUTOSTART, value) }
}

var fcmToken: String?
get() = settings.getString(PREF_KEY_FCM_TOKEN, null)
set(value) = settings.edit { putString(PREF_KEY_FCM_TOKEN, value) }

private fun migrate() {
// remove after 2025-11-28
val PREF_KEY_SERVER_TOKEN = "server_token"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package me.capcom.smsgateway.modules.events

data class ExternalEvent(
val type: ExternalEventType,
val data: String?,
)
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package me.capcom.smsgateway.modules.push
package me.capcom.smsgateway.modules.events

import com.google.gson.annotations.SerializedName

enum class Event {
enum class ExternalEventType {
@SerializedName("MessageEnqueued")
MessageEnqueued,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@ import kotlinx.coroutines.launch
import me.capcom.smsgateway.domain.EntitySource
import me.capcom.smsgateway.modules.events.EventBus
import me.capcom.smsgateway.modules.events.EventsReceiver
import me.capcom.smsgateway.modules.gateway.events.DeviceRegisteredEvent
import me.capcom.smsgateway.modules.gateway.events.MessageEnqueuedEvent
import me.capcom.smsgateway.modules.gateway.events.SettingsUpdatedEvent
import me.capcom.smsgateway.modules.gateway.events.WebhooksUpdatedEvent
import me.capcom.smsgateway.modules.gateway.services.SSEForegroundService
import me.capcom.smsgateway.modules.gateway.workers.PullMessagesWorker
import me.capcom.smsgateway.modules.gateway.workers.SendStateWorker
import me.capcom.smsgateway.modules.gateway.workers.SettingsUpdateWorker
import me.capcom.smsgateway.modules.gateway.workers.WebhooksUpdateWorker
import me.capcom.smsgateway.modules.messages.events.MessageStateChangedEvent
import me.capcom.smsgateway.modules.ping.events.PingEvent
import me.capcom.smsgateway.modules.push.events.PushMessageEnqueuedEvent
import org.koin.core.component.get

class EventsReceiver : EventsReceiver() {
Expand All @@ -20,8 +26,8 @@ class EventsReceiver : EventsReceiver() {
override suspend fun collect(eventBus: EventBus) {
coroutineScope {
launch {
Log.d("EventsReceiver", "launched PushMessageEnqueuedEvent")
eventBus.collect<PushMessageEnqueuedEvent> { event ->
Log.d("EventsReceiver", "launched MessageEnqueuedEvent")
eventBus.collect<MessageEnqueuedEvent> { event ->
Log.d("EventsReceiver", "Event: $event")

if (!settings.enabled) return@collect
Expand Down Expand Up @@ -53,6 +59,40 @@ class EventsReceiver : EventsReceiver() {
PullMessagesWorker.start(get())
}
}

launch {
Log.d("EventsReceiver", "launched WebhooksUpdatedEvent")
eventBus.collect<WebhooksUpdatedEvent> {
Log.d("EventsReceiver", "Event: $it")

if (!settings.enabled) return@collect

WebhooksUpdateWorker.start(get())
}
}

launch {
Log.d("EventsReceiver", "launched SettingsUpdatedEvent")
eventBus.collect<SettingsUpdatedEvent> {
Log.d("EventsReceiver", "Event: $it")

if (!settings.enabled) return@collect

SettingsUpdateWorker.start(get())
}
}

launch {
Log.d("EventsReceiver", "launched DeviceRegisteredEvent")
eventBus.collect<DeviceRegisteredEvent> {
Log.d("EventsReceiver", "Event: $it")

if (!settings.enabled) return@collect
if (settings.fcmToken != null) return@collect

SSEForegroundService.start(get())
}
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import me.capcom.smsgateway.domain.EntitySource
import me.capcom.smsgateway.domain.MessageContent
import me.capcom.smsgateway.modules.events.EventBus
import me.capcom.smsgateway.modules.gateway.events.DeviceRegisteredEvent
import me.capcom.smsgateway.modules.gateway.services.SSEForegroundService
import me.capcom.smsgateway.modules.gateway.workers.PullMessagesWorker
import me.capcom.smsgateway.modules.gateway.workers.SendStateWorker
import me.capcom.smsgateway.modules.gateway.workers.SettingsUpdateWorker
Expand Down Expand Up @@ -52,6 +53,7 @@ class GatewayService(
fun stop(context: Context) {
eventsReceiver.stop()

SSEForegroundService.stop(context)
SettingsUpdateWorker.stop(context)
WebhooksUpdateWorker.stop(context)
PullMessagesWorker.stop(context)
Expand Down Expand Up @@ -106,9 +108,7 @@ class GatewayService(
if (accessToken != null) {
// if there's an access token, try to update push token
try {
pushToken?.let {
updateDevice(it)
}
updateDevice(pushToken)
return
} catch (e: ClientRequestException) {
// if token is invalid, try to register new one
Expand All @@ -132,6 +132,8 @@ class GatewayService(
registerMode.login to registerMode.password
)
}

this.settings.fcmToken = pushToken
this.settings.registrationInfo = response

events.emit(
Expand All @@ -153,19 +155,23 @@ class GatewayService(
}
}

internal suspend fun updateDevice(pushToken: String) {
internal suspend fun updateDevice(pushToken: String?) {
if (!settings.enabled) return

val settings = settings.registrationInfo ?: return
val accessToken = settings.token

api.devicePatch(
accessToken,
GatewayApi.DevicePatchRequest(
settings.id,
pushToken
pushToken?.let {
api.devicePatch(
accessToken,
GatewayApi.DevicePatchRequest(
settings.id,
it
)
)
)
}

this.settings.fcmToken = pushToken

events.emit(
DeviceRegisteredEvent.Success(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ class GatewaySettings(
get() = storage.get(REGISTRATION_INFO)
set(value) = storage.set(REGISTRATION_INFO, value)

var fcmToken: String?
get() = storage.get(FCM_TOKEN)
set(value) = storage.set(FCM_TOKEN, value)

val username: String?
get() = registrationInfo?.login
val password: String?
Expand All @@ -32,6 +36,7 @@ class GatewaySettings(
companion object {
private const val REGISTRATION_INFO = "REGISTRATION_INFO"
private const val ENABLED = "ENABLED"
private const val FCM_TOKEN = "fcm_token"

private const val CLOUD_URL = "cloud_url"
private const val PRIVATE_TOKEN = "private_token"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package me.capcom.smsgateway.modules.gateway.events

import me.capcom.smsgateway.modules.events.AppEvent

class MessageEnqueuedEvent : AppEvent(NAME) {
companion object {
const val NAME = "MessageEnqueuedEvent"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package me.capcom.smsgateway.modules.gateway.events

import me.capcom.smsgateway.modules.events.AppEvent

class SettingsUpdatedEvent : AppEvent(NAME) {

companion object {
const val NAME = "SettingsUpdatedEvent"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package me.capcom.smsgateway.modules.gateway.events

import me.capcom.smsgateway.modules.events.AppEvent

class WebhooksUpdatedEvent : AppEvent(NAME) {
companion object {
const val NAME = "WebhooksUpdatedEvent"
}
}
Loading