Skip to content

Commit c33414f

Browse files
authored
Waiting plugin (#274)
* add waiting plugin state * implement waiting * update settings fetch logic to use the new pause and resume functions * update unit tests * bug fix * add unit tests * CI fix
1 parent 92d5009 commit c33414f

File tree

9 files changed

+382
-26
lines changed

9 files changed

+382
-26
lines changed

android/src/test/java/com/segment/analytics/kotlin/android/StorageTests.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ class StorageTests {
158158
fun `system reset action removes system`() = runTest {
159159
val action = object : Action<System> {
160160
override fun reduce(state: System): System {
161-
return System(state.configuration, null, state.running, state.initializedPlugins, state.enabled)
161+
return System(state.configuration, null, state.running, state.initializedPlugins, state.waitingPlugins, state.enabled)
162162
}
163163
}
164164
store.dispatch(action, System::class)

core/src/main/java/com/segment/analytics/kotlin/core/Settings.kt

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -84,31 +84,27 @@ suspend fun Analytics.checkSettings() {
8484
val writeKey = configuration.writeKey
8585
val cdnHost = configuration.cdnHost
8686

87-
store.currentState(System::class) ?: return
88-
store.dispatch(System.ToggleRunningAction(running = false), System::class)
87+
pauseEventProcessing()
8988

90-
withContext(networkIODispatcher) {
89+
val settingsObj = withContext(networkIODispatcher) {
9190
log("Fetching settings on ${Thread.currentThread().name}")
92-
val settingsObj: Settings? = fetchSettings(writeKey, cdnHost)
93-
94-
withContext(analyticsDispatcher) {
95-
96-
settingsObj?.let {
97-
log("Dispatching update settings on ${Thread.currentThread().name}")
98-
store.dispatch(System.UpdateSettingsAction(settingsObj), System::class)
99-
}
91+
return@withContext fetchSettings(writeKey, cdnHost)
92+
}
10093

101-
store.currentState(System::class)?.let { system ->
102-
system.settings?.let { settings ->
103-
log("Propagating settings on ${Thread.currentThread().name}")
104-
update(settings)
105-
}
106-
}
94+
settingsObj?.let {
95+
log("Dispatching update settings on ${Thread.currentThread().name}")
96+
store.dispatch(System.UpdateSettingsAction(settingsObj), System::class)
97+
}
10798

108-
// we're good to go back to a running state.
109-
store.dispatch(System.ToggleRunningAction(running = true), System::class)
99+
store.currentState(System::class)?.let { system ->
100+
system.settings?.let { settings ->
101+
log("Propagating settings on ${Thread.currentThread().name}")
102+
update(settings)
110103
}
111104
}
105+
106+
// we're good to go back to a running state.
107+
resumeEventProcessing()
112108
}
113109

114110
internal fun Analytics.fetchSettings(

core/src/main/java/com/segment/analytics/kotlin/core/State.kt

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ import java.util.*
1818
data class System(
1919
var configuration: Configuration = Configuration(""),
2020
var settings: Settings?,
21-
var running: Boolean,
22-
var initializedPlugins: Set<Int>,
23-
var enabled: Boolean
21+
var running: Boolean = false,
22+
var initializedPlugins: Set<Int> = emptySet(),
23+
var waitingPlugins: Set<Int> = emptySet(),
24+
var enabled: Boolean = true
2425
) : State {
2526

2627
companion object {
@@ -62,6 +63,7 @@ data class System(
6263
settings = settings,
6364
running = false,
6465
initializedPlugins = setOf(),
66+
waitingPlugins = setOf(),
6567
enabled = true
6668
)
6769
}
@@ -74,18 +76,37 @@ data class System(
7476
settings,
7577
state.running,
7678
state.initializedPlugins,
79+
state.waitingPlugins,
7780
state.enabled
7881
)
7982
}
8083
}
8184

8285
class ToggleRunningAction(var running: Boolean) : Action<System> {
8386
override fun reduce(state: System): System {
87+
if (running && state.waitingPlugins.isNotEmpty()) {
88+
running = false
89+
}
90+
8491
return System(
8592
state.configuration,
8693
state.settings,
8794
running,
8895
state.initializedPlugins,
96+
state.waitingPlugins,
97+
state.enabled
98+
)
99+
}
100+
}
101+
102+
class ForceRunningAction : Action<System> {
103+
override fun reduce(state: System): System {
104+
return System(
105+
state.configuration,
106+
state.settings,
107+
true,
108+
state.initializedPlugins,
109+
state.waitingPlugins,
89110
state.enabled
90111
)
91112
}
@@ -105,6 +126,7 @@ data class System(
105126
newSettings,
106127
state.running,
107128
state.initializedPlugins,
129+
state.waitingPlugins,
108130
state.enabled
109131
)
110132
}
@@ -120,6 +142,7 @@ data class System(
120142
state.settings,
121143
state.running,
122144
initializedPlugins,
145+
state.waitingPlugins,
123146
state.enabled
124147
)
125148
}
@@ -132,10 +155,39 @@ data class System(
132155
state.settings,
133156
state.running,
134157
state.initializedPlugins,
158+
state.waitingPlugins,
135159
enabled
136160
)
137161
}
138162
}
163+
164+
class AddWaitingPlugin(val plugin: Int): Action<System> {
165+
override fun reduce(state: System): System {
166+
val waitingPlugins = state.waitingPlugins + plugin
167+
return System(
168+
state.configuration,
169+
state.settings,
170+
state.running,
171+
state.initializedPlugins,
172+
waitingPlugins,
173+
state.enabled
174+
)
175+
}
176+
}
177+
178+
class RemoveWaitingPlugin(val plugin: Int): Action<System> {
179+
override fun reduce(state: System): System {
180+
val waitingPlugins = state.waitingPlugins - plugin
181+
return System(
182+
state.configuration,
183+
state.settings,
184+
state.running,
185+
state.initializedPlugins,
186+
waitingPlugins,
187+
state.enabled
188+
)
189+
}
190+
}
139191
}
140192

141193
/**
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.segment.analytics.kotlin.core
2+
3+
import com.segment.analytics.kotlin.core.platform.Plugin
4+
import kotlinx.coroutines.delay
5+
import kotlinx.coroutines.launch
6+
7+
/**
8+
* An interface that provides functionality of pausing and resuming event processing on Analytics.
9+
*
10+
* By default plugins that implement this interface pauses processing when it is added to
11+
* analytics (via `setup()`) and resumes after 30s.
12+
*
13+
* To customize pausing and resuming, override `setup()` and call `pause()/resumes()` as needed
14+
*/
15+
interface WaitingPlugin: Plugin {
16+
override fun setup(analytics: Analytics) {
17+
super.setup(analytics)
18+
pause()
19+
}
20+
21+
fun pause() {
22+
analytics.pauseEventProcessing(this)
23+
}
24+
25+
fun resume() {
26+
analytics.resumeEventProcessing(this)
27+
}
28+
}
29+
30+
fun Analytics.pauseEventProcessing(plugin: WaitingPlugin) = analyticsScope.launch(analyticsDispatcher) {
31+
store.dispatch(System.AddWaitingPlugin(plugin.hashCode()), System::class)
32+
pauseEventProcessing()
33+
}
34+
35+
36+
fun Analytics.resumeEventProcessing(plugin: WaitingPlugin) = analyticsScope.launch(analyticsDispatcher) {
37+
store.dispatch(System.RemoveWaitingPlugin(plugin.hashCode()), System::class)
38+
resumeEventProcessing()
39+
}
40+
41+
internal suspend fun Analytics.running(): Boolean {
42+
val system = store.currentState(System::class)
43+
return system?.running ?: false
44+
}
45+
46+
internal suspend fun Analytics.pauseEventProcessing(timeout: Long = 30_000) {
47+
if (!running()) return
48+
49+
store.dispatch(System.ToggleRunningAction(false), System::class)
50+
startProcessingAfterTimeout(timeout)
51+
}
52+
53+
internal suspend fun Analytics.resumeEventProcessing() {
54+
if (running()) return
55+
store.dispatch(System.ToggleRunningAction(true), System::class)
56+
}
57+
58+
internal fun Analytics.startProcessingAfterTimeout(timeout: Long) = analyticsScope.launch(analyticsDispatcher) {
59+
delay(timeout)
60+
store.dispatch(System.ForceRunningAction(), System::class)
61+
}
62+

core/src/main/java/com/segment/analytics/kotlin/core/platform/Plugin.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ abstract class DestinationPlugin : EventPlugin {
161161

162162
final override fun execute(event: BaseEvent): BaseEvent? = process(event)
163163

164-
internal fun isDestinationEnabled(event: BaseEvent?): Boolean {
164+
open fun isDestinationEnabled(event: BaseEvent?): Boolean {
165165
// if event payload has integration marked false then its disabled by customer
166166
val customerEnabled = event?.integrations?.getBoolean(key) ?: true // default to true when missing
167167

0 commit comments

Comments
 (0)