Skip to content

Commit 03ef036

Browse files
authored
Removing noisy metric tags and adding stress test for telemetry (#249)
* Removing noisy metric tags and adding stress test for telemetry * Refactoring unhelpful queue size checks
1 parent 546808d commit 03ef036

File tree

6 files changed

+47
-16
lines changed

6 files changed

+47
-16
lines changed

core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ open class Analytics protected constructor(
9898
Telemetry.INVOKE_ERROR_METRIC, t.stackTraceToString()) {
9999
it["error"] = t.toString()
100100
it["message"] = "Exception in Analytics Scope"
101-
it["caller"] = t.stackTrace[0].toString()
102101
}
103102
}
104103
override val analyticsScope = CoroutineScope(SupervisorJob() + exceptionHandler)

core/src/main/java/com/segment/analytics/kotlin/core/HTTPClient.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ class HTTPClient(
3939
it["error"] = e.toString()
4040
it["writekey"] = writeKey
4141
it["message"] = "Malformed url"
42-
it["caller"] = e.stackTrace[0].toString()
4342
}
4443
throw error
4544
}

core/src/main/java/com/segment/analytics/kotlin/core/Settings.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ internal fun Analytics.fetchSettings(
124124
it["error"] = ex.toString()
125125
it["writekey"] = writeKey
126126
it["message"] = "Error retrieving settings"
127-
it["caller"] = ex.stackTrace[0].toString()
128127
}
129128
configuration.defaultSettings
130129
}

core/src/main/java/com/segment/analytics/kotlin/core/Telemetry.kt

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,6 @@ object Telemetry: Subscriber {
9393

9494
private val queue = ConcurrentLinkedQueue<RemoteMetric>()
9595
private var queueBytes = 0
96-
private var queueSizeExceeded = false
97-
private val seenErrors = mutableMapOf<String, Int>()
9896
private var started = false
9997
private var rateLimitEndTime: Long = 0
10098
private var flushFirstError = true
@@ -150,7 +148,6 @@ object Telemetry: Subscriber {
150148
fun reset() {
151149
telemetryJob?.cancel()
152150
resetQueue()
153-
seenErrors.clear()
154151
started = false
155152
rateLimitEndTime = 0
156153
}
@@ -169,7 +166,6 @@ object Telemetry: Subscriber {
169166
if (!metric.startsWith(METRICS_BASE_TAG)) return
170167
if (tags.isEmpty()) return
171168
if (Math.random() > sampleRate) return
172-
if (queue.size >= maxQueueSize) return
173169

174170
addRemoteMetric(metric, tags)
175171
}
@@ -188,7 +184,6 @@ object Telemetry: Subscriber {
188184
if (!enable || sampleRate == 0.0) return
189185
if (!metric.startsWith(METRICS_BASE_TAG)) return
190186
if (tags.isEmpty()) return
191-
if (queue.size >= maxQueueSize) return
192187
if (Math.random() > sampleRate) return
193188

194189
var filteredTags = if(sendWriteKeyOnError) {
@@ -235,7 +230,6 @@ object Telemetry: Subscriber {
235230
var queueCount = queue.size
236231
// Reset queue data size counter since all current queue items will be removed
237232
queueBytes = 0
238-
queueSizeExceeded = false
239233
val sendQueue = mutableListOf<RemoteMetric>()
240234
while (queueCount-- > 0 && !queue.isEmpty()) {
241235
val m = queue.poll()
@@ -303,6 +297,9 @@ object Telemetry: Subscriber {
303297
found.value += value
304298
return
305299
}
300+
if (queue.size >= maxQueueSize) {
301+
return
302+
}
306303

307304
val newMetric = RemoteMetric(
308305
type = METRIC_TYPE,
@@ -315,8 +312,6 @@ object Telemetry: Subscriber {
315312
if (queueBytes + newMetricSize <= maxQueueBytes) {
316313
queue.add(newMetric)
317314
queueBytes += newMetricSize
318-
} else {
319-
queueSizeExceeded = true
320315
}
321316
}
322317

@@ -345,6 +340,5 @@ object Telemetry: Subscriber {
345340
private fun resetQueue() {
346341
queue.clear()
347342
queueBytes = 0
348-
queueSizeExceeded = false
349343
}
350344
}

core/src/main/java/com/segment/analytics/kotlin/core/platform/Mediator.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ internal class Mediator(internal var plugins: CopyOnWriteArrayList<Plugin> = Cop
6363
}
6464
it["writekey"] = plugin.analytics.configuration.writeKey
6565
it["message"] = "Exception executing plugin"
66-
it["caller"] = t.stackTrace[0].toString()
6766
}
6867
}
6968
}
@@ -88,7 +87,6 @@ internal class Mediator(internal var plugins: CopyOnWriteArrayList<Plugin> = Cop
8887
}
8988
it["writekey"] = plugin.analytics.configuration.writeKey
9089
it["message"] = "Exception executing plugin"
91-
it["caller"] = t.stackTrace[0].toString()
9290
}
9391
}
9492
}

core/src/test/kotlin/com/segment/analytics/kotlin/core/TelemetryTest.kt

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ import org.junit.jupiter.api.Test
77
import java.lang.reflect.Field
88
import java.net.HttpURLConnection
99
import java.util.concurrent.ConcurrentLinkedQueue
10+
import java.util.concurrent.CountDownLatch
11+
import java.util.concurrent.Executors
12+
import java.util.concurrent.TimeUnit
13+
import kotlin.random.Random
1014

1115
class TelemetryTest {
1216
fun TelemetryResetFlushFirstError() {
@@ -182,7 +186,7 @@ class TelemetryTest {
182186
Telemetry.start()
183187
for (i in 1..Telemetry.maxQueueSize + 1) {
184188
Telemetry.increment(Telemetry.INVOKE_METRIC) { it["test"] = "test" + i }
185-
Telemetry.error(Telemetry.INVOKE_ERROR_METRIC, "error") { it["test"] = "test" + i }
189+
Telemetry.error(Telemetry.INVOKE_ERROR_METRIC, "error") { it["error"] = "test" + i }
186190
}
187191
assertEquals(Telemetry.maxQueueSize, TelemetryQueueSize())
188192
}
@@ -195,6 +199,44 @@ class TelemetryTest {
195199
Telemetry.sendWriteKeyOnError = false
196200
Telemetry.sendErrorLogData = false
197201
Telemetry.error(Telemetry.INVOKE_ERROR_METRIC, longString) { it["writekey"] = longString }
198-
assertTrue(TelemetryQueueSize() < 1000)
202+
assertTrue(TelemetryQueueBytes() < 1000)
203+
}
204+
205+
@Test
206+
fun testConcurrentErrorReportingWithQueuePressure() {
207+
val operationCount = 200
208+
val latch = CountDownLatch(operationCount)
209+
val executor = Executors.newFixedThreadPool(3)
210+
211+
try {
212+
// Launch operations across multiple threads
213+
repeat(operationCount) { i ->
214+
executor.submit {
215+
try {
216+
Telemetry.error(
217+
metric = Telemetry.INVOKE_ERROR_METRIC,
218+
log = "High pressure test $i"
219+
) {
220+
it["error"] = "pressure_test_key"
221+
it["iteration"] = "$i"
222+
}
223+
224+
// Add random delays to increase race condition probability
225+
if (i % 5 == 0) {
226+
Thread.sleep(Random.nextLong(1, 3))
227+
}
228+
} finally {
229+
latch.countDown()
230+
}
231+
}
232+
}
233+
234+
// Wait for all operations to complete
235+
latch.await(15, TimeUnit.SECONDS)
236+
237+
} finally {
238+
executor.shutdown()
239+
}
240+
assertTrue(TelemetryQueueSize() == Telemetry.maxQueueSize)
199241
}
200242
}

0 commit comments

Comments
 (0)