diff --git a/Sources/Segment/Analytics.swift b/Sources/Segment/Analytics.swift index aec6c59c..edc1ad85 100644 --- a/Sources/Segment/Analytics.swift +++ b/Sources/Segment/Analytics.swift @@ -30,6 +30,9 @@ public class Analytics { static internal weak var firstInstance: Analytics? = nil @Atomic static internal var activeWriteKeys = [String]() + + // Used for WaitingPlugin's, see waiting.swift + internal var processingTimer: DispatchWorkItem? = nil /** This method isn't a traditional singleton implementation. It's provided here diff --git a/Sources/Segment/Plugins.swift b/Sources/Segment/Plugins.swift index 19705fb5..7c288ae0 100644 --- a/Sources/Segment/Plugins.swift +++ b/Sources/Segment/Plugins.swift @@ -124,6 +124,9 @@ extension DestinationPlugin { public func add(plugin: Plugin) -> Plugin { if let analytics = self.analytics { plugin.configure(analytics: analytics) + if let waiting = plugin as? WaitingPlugin { + analytics.pauseEventProcessing(plugin: waiting) + } } timeline.add(plugin: plugin) analytics?.updateIfNecessary(plugin: plugin) @@ -188,6 +191,9 @@ extension Analytics { @discardableResult public func add(plugin: Plugin) -> Plugin { plugin.configure(analytics: self) + if let waiting = plugin as? WaitingPlugin { + pauseEventProcessing(plugin: waiting) + } timeline.add(plugin: plugin) updateIfNecessary(plugin: plugin) return plugin diff --git a/Sources/Segment/Settings.swift b/Sources/Segment/Settings.swift index 0573d4bc..308e0f7e 100644 --- a/Sources/Segment/Settings.swift +++ b/Sources/Segment/Settings.swift @@ -109,12 +109,11 @@ extension Settings: Equatable { extension Analytics { internal func update(settings: Settings) { - guard let system: System = store.currentState() else { return } apply { plugin in - plugin.update(settings: settings, type: updateType(for: plugin, in: system)) + plugin.update(settings: settings, type: updateType(for: plugin)) if let destPlugin = plugin as? DestinationPlugin { destPlugin.apply { subPlugin in - subPlugin.update(settings: settings, type: updateType(for: subPlugin, in: system)) + subPlugin.update(settings: settings, type: updateType(for: subPlugin)) } } } @@ -125,19 +124,12 @@ extension Analytics { // if we're already running, update has already been called for existing plugins, // so we just wanna call it on this one if it hasn't been done already. if system.running, let settings = system.settings { - let alreadyInitialized = system.initializedPlugins.contains { p in - return plugin === p - } - if !alreadyInitialized { - store.dispatch(action: System.AddPluginToInitialized(plugin: plugin)) - plugin.update(settings: settings, type: .initial) - } else { - plugin.update(settings: settings, type: .refresh) - } + plugin.update(settings: settings, type: updateType(for: plugin)) } } - internal func updateType(for plugin: Plugin, in system: System) -> UpdateType { + internal func updateType(for plugin: Plugin) -> UpdateType { + guard let system: System = store.currentState() else { return .initial } let alreadyInitialized = system.initializedPlugins.contains { p in return plugin === p } @@ -154,14 +146,14 @@ extension Analytics { if isUnitTesting { // we don't really wanna wait for this network call during tests... // but we should make it work similarly. - store.dispatch(action: System.ToggleRunningAction(running: false)) + pauseEventProcessing() operatingMode.run(queue: DispatchQueue.main) { if let state: System = self.store.currentState(), let settings = state.settings { self.store.dispatch(action: System.UpdateSettingsAction(settings: settings)) self.update(settings: settings) } - self.store.dispatch(action: System.ToggleRunningAction(running: true)) + self.resumeEventProcessing() } return @@ -172,7 +164,7 @@ extension Analytics { let httpClient = HTTPClient(analytics: self) // stop things; queue in case our settings have changed. - store.dispatch(action: System.ToggleRunningAction(running: false)) + pauseEventProcessing() httpClient.settingsFor(writeKey: writeKey) { (success, settings) in if success, let s = settings { // put the new settings in the state store. @@ -186,7 +178,7 @@ extension Analytics { } // we're good to go back to a running state. - self.store.dispatch(action: System.ToggleRunningAction(running: true)) + self.resumeEventProcessing() } } } diff --git a/Sources/Segment/State.swift b/Sources/Segment/State.swift index 1772572f..c861dde5 100644 --- a/Sources/Segment/State.swift +++ b/Sources/Segment/State.swift @@ -16,6 +16,7 @@ struct System: State { let running: Bool let enabled: Bool let initializedPlugins: [Plugin] + let waitingPlugins: [Plugin] struct UpdateSettingsAction: Action { let settings: Settings @@ -25,7 +26,8 @@ struct System: State { settings: settings, running: state.running, enabled: state.enabled, - initializedPlugins: state.initializedPlugins) + initializedPlugins: state.initializedPlugins, + waitingPlugins: state.waitingPlugins) return result } } @@ -34,11 +36,29 @@ struct System: State { let running: Bool func reduce(state: System) -> System { + var desiredRunning = running + + if desiredRunning == true && state.waitingPlugins.count > 0 { + desiredRunning = false + } + return System(configuration: state.configuration, settings: state.settings, - running: running, + running: desiredRunning, enabled: state.enabled, - initializedPlugins: state.initializedPlugins) + initializedPlugins: state.initializedPlugins, + waitingPlugins: state.waitingPlugins) + } + } + + struct ForceRunningAction: Action { + func reduce(state: System) -> System { + return System(configuration: state.configuration, + settings: state.settings, + running: true, + enabled: state.enabled, + initializedPlugins: state.initializedPlugins, + waitingPlugins: state.waitingPlugins) } } @@ -50,7 +70,8 @@ struct System: State { settings: state.settings, running: state.running, enabled: enabled, - initializedPlugins: state.initializedPlugins) + initializedPlugins: state.initializedPlugins, + waitingPlugins: state.waitingPlugins) } } @@ -62,7 +83,8 @@ struct System: State { settings: state.settings, running: state.running, enabled: state.enabled, - initializedPlugins: state.initializedPlugins) + initializedPlugins: state.initializedPlugins, + waitingPlugins: state.waitingPlugins) } } @@ -79,7 +101,8 @@ struct System: State { settings: settings, running: state.running, enabled: state.enabled, - initializedPlugins: state.initializedPlugins) + initializedPlugins: state.initializedPlugins, + waitingPlugins: state.waitingPlugins) } } @@ -97,7 +120,64 @@ struct System: State { settings: state.settings, running: state.running, enabled: state.enabled, - initializedPlugins: initializedPlugins) + initializedPlugins: initializedPlugins, + waitingPlugins: state.waitingPlugins) + } + } + + struct AddWaitingPlugin: Action { + let plugin: Plugin + + func reduce(state: System) -> System { + var waitingPlugins = state.waitingPlugins + if !waitingPlugins.contains(where: { p in + return plugin === p + }) { + waitingPlugins.append(plugin) + } + return System(configuration: state.configuration, + settings: state.settings, + running: state.running, + enabled: state.enabled, + initializedPlugins: state.initializedPlugins, + waitingPlugins: waitingPlugins) + } + } + + /*struct RemoveWaitingPlugin: Action { + let plugin: Plugin + + func reduce(state: System) -> System { + var waitingPlugins = state.waitingPlugins + waitingPlugins.removeAll { p in + return plugin === p + } + return System(configuration: state.configuration, + settings: state.settings, + running: state.running, + enabled: state.enabled, + initializedPlugins: state.initializedPlugins, + waitingPlugins: waitingPlugins) + } + }*/ + struct RemoveWaitingPlugin: Action { + let plugin: Plugin + + func reduce(state: System) -> System { + var waitingPlugins = state.waitingPlugins + let countBefore = waitingPlugins.count + waitingPlugins.removeAll { p in + return plugin === p + } + let countAfter = waitingPlugins.count + print("RemoveWaitingPlugin: \(countBefore) -> \(countAfter)") + + return System(configuration: state.configuration, + settings: state.settings, + running: state.running, + enabled: state.enabled, + initializedPlugins: state.initializedPlugins, + waitingPlugins: waitingPlugins) } } } @@ -171,7 +251,14 @@ extension System { settings = Settings(writeKey: configuration.values.writeKey, apiHost: HTTPClient.getDefaultAPIHost()) } } - return System(configuration: configuration, settings: settings, running: false, enabled: true, initializedPlugins: [Plugin]()) + return System( + configuration: configuration, + settings: settings, + running: false, + enabled: true, + initializedPlugins: [Plugin](), + waitingPlugins: [WaitingPlugin]() + ) } } diff --git a/Sources/Segment/Waiting.swift b/Sources/Segment/Waiting.swift new file mode 100644 index 00000000..13fed19f --- /dev/null +++ b/Sources/Segment/Waiting.swift @@ -0,0 +1,73 @@ +// +// Waiting.swift +// Segment +// +// Created by Brandon Sneed on 7/12/25. +// +import Foundation + +public protocol WaitingPlugin: Plugin {} + +extension Analytics { + /// Pauses event processing, causing events to be queued. When processing resumes + /// any queued events will be replayed to the system with their original timestamps. + /// The system will forcibly resume after 30 seconds, but you should + /// call `resumeEventProcessing(plugin:)` when you've completed your task. + public func pauseEventProcessing(plugin: WaitingPlugin) { + store.dispatch(action: System.AddWaitingPlugin(plugin: plugin)) + pauseEventProcessing() + } + + /// Resume event processing. Any queued events will be replayed into the system + /// using their original timestamps. + public func resumeEventProcessing(plugin: WaitingPlugin) { + store.dispatch(action: System.RemoveWaitingPlugin(plugin: plugin)) + resumeEventProcessing() + } +} + +extension Analytics { + internal func running() -> Bool { + if let system: System = store.currentState() { + return system.running + } + // we have no state, so assume no. + return false + } + + internal func pauseEventProcessing() { + let running = running() + // if we're already paused, ignore and leave. + if !running { + return + } + // pause processing + store.dispatch(action: System.ToggleRunningAction(running: false)) + // if we WERE running, someone stopped us, set a timer for + // 30 seconds so they can't keep the system stopped forever. + startProcessingAfterTimeout() + } + + internal func resumeEventProcessing() { + let running = running() + // if we're already running, ignore and leave. + if running { + return + } + store.dispatch(action: System.ToggleRunningAction(running: true)) + } + + internal func startProcessingAfterTimeout() { + DispatchQueue.main.async { [weak self] in + guard let self else { return } + self.processingTimer?.cancel() + self.processingTimer = DispatchWorkItem { [weak self] in + self?.store.dispatch(action: System.ForceRunningAction()) + self?.processingTimer = nil // clean up after ourselves + } + if let processingTimer = self.processingTimer { + DispatchQueue.main.asyncAfter(deadline: .now() + 30, execute: processingTimer) + } + } + } +} diff --git a/Tests/Segment-Tests/Waiting_Tests.swift b/Tests/Segment-Tests/Waiting_Tests.swift new file mode 100644 index 00000000..27efe6eb --- /dev/null +++ b/Tests/Segment-Tests/Waiting_Tests.swift @@ -0,0 +1,343 @@ +// +// Waiting_Tests.swift +// Segment +// +// Created by Brandon Sneed on 7/12/25. +// + +import XCTest +import Sovran +@testable import Segment + +class ExampleWaitingPlugin: EventPlugin, WaitingPlugin { + let type: PluginType + var identifier: String + + weak var analytics: Analytics? + + init(identifier: String = "ExampleWaitingPlugin") { + self.type = .enrichment + self.identifier = identifier + } + + func update(settings: Settings, type: UpdateType) { + // we got our settings, do something and pretend to wait + if type == .initial { + self.analytics?.pauseEventProcessing(plugin: self) + DispatchQueue.main.asyncAfter(deadline: .now() + 1) { [weak self] in + // pretend to hit the network or something ... get some stuff... + guard let self else { return } + self.analytics?.resumeEventProcessing(plugin: self) + } + } + } + + func track(event: TrackEvent) -> TrackEvent? { + var workingEvent = event + + workingEvent.context?.setValue(identifier, forKeyPath: "processed_by") + + return workingEvent + } +} + +class SlowWaitingPlugin: EventPlugin, WaitingPlugin { + let type: PluginType + var shouldResume: Bool = false + + weak var analytics: Analytics? + + init() { + self.type = .enrichment + } + + func update(settings: Settings, type: UpdateType) { + print("SlowWaitingPlugin.update() called with type: \(type)") + if type == .initial { + analytics?.pauseEventProcessing(plugin: self) + /// don't resume + } + } + + func manualResume() { + analytics?.resumeEventProcessing(plugin: self) + } + + func track(event: TrackEvent) -> TrackEvent? { + var workingEvent = event + workingEvent.context?.setValue("slow_plugin", forKeyPath: "processed_by") + return workingEvent + } +} + +class MockDestinationPlugin: DestinationPlugin { + var timeline = Timeline() + + let type = PluginType.destination + let key = "MockDestination" + weak var analytics: Analytics? +} + +final class Waiting_Tests: XCTestCase, Subscriber { + override func setUpWithError() throws { + Telemetry.shared.enable = false + } + + func testBasicWaitingPlugin() { + let analytics = Analytics(configuration: Configuration(writeKey: "testWaiting")) + + // System should start as not running + XCTAssertFalse(analytics.running()) + + analytics.add(plugin: ExampleWaitingPlugin()) + + // Track an event while paused + analytics.track(name: "test_event") + + // System should still be paused + XCTAssertFalse(analytics.running()) + + // Wait until plugin resumes and system starts + waitUntilStarted(analytics: analytics, timeout: 20) + + // System should now be running + XCTAssertTrue(analytics.running()) + } + + func testMultipleWaitingPlugins() { + let analytics = Analytics(configuration: Configuration(writeKey: "testMultipleWaiting")) + + let plugin1 = ExampleWaitingPlugin(identifier: "plugin1") + let plugin2 = ExampleWaitingPlugin(identifier: "plugin2") + + analytics.add(plugin: plugin1) + analytics.add(plugin: plugin2) + + // System should be paused with multiple waiting plugins + XCTAssertFalse(analytics.running()) + + // Track events while paused + analytics.track(name: "event1") + analytics.track(name: "event2") + + // Wait for both plugins to finish + waitUntilStarted(analytics: analytics, timeout: 5) + + // System should now be running + XCTAssertTrue(analytics.running()) + } + + func testTimeoutForceStart() { + let analytics = Analytics(configuration: Configuration(writeKey: "testTimeout")) + + let slowPlugin = SlowWaitingPlugin() + analytics.add(plugin: slowPlugin) + + // System should be paused + XCTAssertFalse(analytics.running()) + + // Track an event while paused + analytics.track(name: "timeout_test") + + // Plugin never resumes, but timeout should force start + // Note: We'd need to mock the timer or reduce timeout for actual testing + // For now, manually trigger the timeout behavior + DispatchQueue.main.asyncAfter(deadline: .now() + 0.1) { + // Simulate timeout by forcing resume + analytics.store.dispatch(action: System.ForceRunningAction()) + } + + waitUntilStarted(analytics: analytics, timeout: 1) + XCTAssertTrue(analytics.running()) + } + + func testEventQueueingAndReplay() { + let analytics = Analytics(configuration: Configuration(writeKey: "testQueueing")) + let plugin = ExampleWaitingPlugin() + + analytics.add(plugin: plugin) + + // Track multiple events while paused + analytics.track(name: "queued_event_1") + analytics.track(name: "queued_event_2") + analytics.track(name: "queued_event_3") + + // System should still be paused + XCTAssertFalse(analytics.running()) + + // Wait for system to start + waitUntilStarted(analytics: analytics) + + // All events should have been replayed and processed + XCTAssertTrue(analytics.running()) + } + + func testPauseWhenAlreadyPaused() { + let analytics = Analytics(configuration: Configuration(writeKey: "testDoublePause")) + + let plugin1 = SlowWaitingPlugin() + let plugin2 = SlowWaitingPlugin() + + analytics.add(plugin: plugin1) + // System is now paused by plugin1 + XCTAssertFalse(analytics.running()) + + analytics.add(plugin: plugin2) + // Adding plugin2 should not break anything + XCTAssertFalse(analytics.running()) + + // Wait until both plugins are in waiting state + let waitForPluginsAdded = XCTestExpectation(description: "Plugins added to waiting list") + Timer.scheduledTimer(withTimeInterval: 0.1, repeats: true) { timer in + let state: System = analytics.store.currentState()! + if state.waitingPlugins.count == 2 { + waitForPluginsAdded.fulfill() + timer.invalidate() + } + } + wait(for: [waitForPluginsAdded], timeout: 1) + + // Resume plugin1 - system should still be paused because plugin2 is waiting + plugin1.manualResume() + + // Small delay to let state update + DispatchQueue.main.asyncAfter(deadline: .now() + 0.1) { + XCTAssertFalse(analytics.running()) + + // Now resume plugin2 - system should start + plugin2.manualResume() + } + + waitUntilStarted(analytics: analytics, timeout: 3) + XCTAssertTrue(analytics.running()) + } + + func testResumeWhenAlreadyRunning() { + let analytics = Analytics(configuration: Configuration(writeKey: "testDoubleResume")) + + let plugin = ExampleWaitingPlugin() + analytics.add(plugin: plugin) + + // Wait for normal startup + waitUntilStarted(analytics: analytics) + XCTAssertTrue(analytics.running()) + + // Try to resume again - should be no-op + analytics.resumeEventProcessing(plugin: plugin) + XCTAssertTrue(analytics.running()) + } + + func testWaitingPluginState() { + let analytics = Analytics(configuration: Configuration(writeKey: "testState")) + + let plugin1 = SlowWaitingPlugin() + let plugin2 = SlowWaitingPlugin() + + // Check initial state + waitForWaitingPluginCount(analytics: analytics, expectedCount: 0) + + analytics.add(plugin: plugin1) + print("Added plugin1") + analytics.add(plugin: plugin2) + print("Added plugin2") + waitForWaitingPluginCount(analytics: analytics, expectedCount: 2) + + // Resume one plugin and wait for state update + plugin1.manualResume() + waitForWaitingPluginCount(analytics: analytics, expectedCount: 1) + + // System should still be paused because plugin2 is waiting + XCTAssertFalse(analytics.running()) + + // Resume second plugin and wait for state update + plugin2.manualResume() + waitForWaitingPluginCount(analytics: analytics, expectedCount: 0) + + // Now wait for system to start + waitUntilStarted(analytics: analytics, timeout: 2) + + let finalState: System = analytics.store.currentState()! + XCTAssertTrue(finalState.running) + } + + func testDestinationWaitingPlugin() { + let analytics = Analytics(configuration: Configuration(writeKey: "testDestination")) + let destination = MockDestinationPlugin() + let waitingPlugin = ExampleWaitingPlugin() + + analytics.store.subscribe(self) { (state: System) in + print("State updated running: \(state.running)") + } + + analytics.add(plugin: destination) + destination.add(plugin: waitingPlugin) + + // System should be paused + XCTAssertFalse(analytics.running()) + + // Plugin should auto-resume after 1 second + waitUntilStarted(analytics: analytics, timeout: 5) + XCTAssertTrue(analytics.running()) + } + + func testDestinationSlowWaitingPlugin() { + let analytics = Analytics(configuration: Configuration(writeKey: "testDestination")) + let destination = MockDestinationPlugin() + let waitingPlugin = SlowWaitingPlugin() + + analytics.store.subscribe(self) { (state: System) in + print("State updated running: \(state.running)") + } + + analytics.add(plugin: destination) + destination.add(plugin: waitingPlugin) + + // System should be paused (proving destination.add worked) + XCTAssertFalse(analytics.running()) + + // Resume should work normally + // this will pull it out of the waitingPlugins list. + waitingPlugin.manualResume() + + // but update will get called, pausing it once more. + waitForWaitingPluginCount(analytics: analytics, expectedCount: 1) + + // at which point, we have to resume it again. + waitingPlugin.manualResume() + + waitUntilStarted(analytics: analytics, timeout: 5) + XCTAssertTrue(analytics.running()) + } +} + +// Helper extension +extension Waiting_Tests { + func waitUntilStarted(analytics: Analytics, timeout: TimeInterval = 5) { + let expectation = XCTestExpectation(description: "Analytics started") + + let timer = Timer.scheduledTimer(withTimeInterval: 0.1, repeats: true) { timer in + if analytics.running() { + expectation.fulfill() + timer.invalidate() + } + } + + wait(for: [expectation], timeout: timeout) + timer.invalidate() + } + + func waitForWaitingPluginCount(analytics: Analytics, expectedCount: Int, timeout: TimeInterval = 2) { + let expectation = XCTestExpectation(description: "Waiting for \(expectedCount) plugins") + + let timer = Timer.scheduledTimer(withTimeInterval: 0.05, repeats: true) { timer in + let state: System = analytics.store.currentState()! + if state.waitingPlugins.count == expectedCount { + expectation.fulfill() + timer.invalidate() + } + } + + wait(for: [expectation], timeout: timeout) + timer.invalidate() + } +}