Skip to content

Commit 5b367f2

Browse files
authored
Merge pull request #607 from NordicSemiconductor/bugfix/flows
Sending notifications as Flow more reliably
2 parents 9504515 + 0ee79e6 commit 5b367f2

File tree

1 file changed

+26
-5
lines changed

1 file changed

+26
-5
lines changed

ble-ktx/src/main/java/no/nordicsemi/android/ble/ktx/ValueChangedCallbackExt.kt

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,13 @@
22

33
package no.nordicsemi.android.ble.ktx
44

5+
import android.util.Log
56
import kotlinx.coroutines.ExperimentalCoroutinesApi
67
import kotlinx.coroutines.channels.awaitClose
8+
import kotlinx.coroutines.channels.onFailure
9+
import kotlinx.coroutines.channels.trySendBlocking
710
import kotlinx.coroutines.flow.Flow
11+
import kotlinx.coroutines.flow.buffer
812
import kotlinx.coroutines.flow.callbackFlow
913
import no.nordicsemi.android.ble.ValueChangedCallback
1014
import no.nordicsemi.android.ble.callback.profile.ProfileReadResponse
@@ -17,6 +21,11 @@ import no.nordicsemi.android.ble.response.ReadResponse
1721
* Usage:
1822
*
1923
* val hrmMeasurementsData = setNotificationCallback(hrmCharacteristic).asFlow() // Flow<Data>
24+
*
25+
* Use the [buffer] operator on the resulting flow to specify a user-defined value and to control
26+
* what happens when data is produced faster than consumed, i.e. to control the back-pressure behavior.
27+
*
28+
* val hrmMeasurementsData = setNotificationCallback(hrmCharacteristic).asFlow().buffer()
2029
* @return The flow.
2130
* @since 2.3.0
2231
*/
@@ -25,7 +34,8 @@ fun ValueChangedCallback.asFlow(): Flow<Data> = callbackFlow {
2534
// Make sure the callbacks are called without unnecessary delay.
2635
setHandler(null)
2736
with { _, data ->
28-
trySend(data)
37+
trySendBlocking(data)
38+
.onFailure { t -> Log.w("ValueChangeCallback", "Sending data to Flow failed with: $t") }
2939
}
3040
awaitClose {
3141
// There's no way to unregister the callback from here.
@@ -41,6 +51,9 @@ fun ValueChangedCallback.asFlow(): Flow<Data> = callbackFlow {
4151
* val hrmMeasurementsData: Flow<HeartRateMeasurementResponse> =
4252
* setNotificationCallback(hrmCharacteristic)
4353
* .asResponseFlow()
54+
*
55+
* Use the [buffer] operator on the resulting flow to specify a user-defined value and to control
56+
* what happens when data is produced faster than consumed, i.e. to control the back-pressure behavior.
4457
* @return The flow.
4558
* @since 2.4.0
4659
*/
@@ -49,7 +62,10 @@ inline fun <reified T: ReadResponse> ValueChangedCallback.asResponseFlow(): Flow
4962
// Make sure the callbacks are called without unnecessary delay.
5063
setHandler(null)
5164
with { device, data ->
52-
trySend(T::class.java.getDeclaredConstructor().newInstance().apply { onDataReceived(device, data) })
65+
val response = T::class.java.getDeclaredConstructor().newInstance()
66+
.apply { onDataReceived(device, data) }
67+
trySendBlocking(response)
68+
.onFailure { t -> Log.w("ValueChangeCallback", "Sending response to Flow failed with: $t") }
5369
}
5470
awaitClose {
5571
// There's no way to unregister the callback from here.
@@ -66,6 +82,9 @@ inline fun <reified T: ReadResponse> ValueChangedCallback.asResponseFlow(): Flow
6682
* val hrmMeasurementsData: Flow<HeartRateMeasurementResponse> =
6783
* setNotificationCallback(hrmCharacteristic)
6884
* .asValidResponseFlow()
85+
*
86+
* Use the [buffer] operator on the resulting flow to specify a user-defined value and to control
87+
* what happens when data is produced faster than consumed, i.e. to control the back-pressure behavior.
6988
* @return The flow.
7089
* @since 2.4.0
7190
*/
@@ -74,10 +93,12 @@ inline fun <reified T: ProfileReadResponse> ValueChangedCallback.asValidResponse
7493
// Make sure the callbacks are called without unnecessary delay.
7594
setHandler(null)
7695
with { device, data ->
77-
T::class.java.getDeclaredConstructor().newInstance()
96+
val response = T::class.java.getDeclaredConstructor().newInstance()
7897
.apply { onDataReceived(device, data) }
79-
.takeIf { it.isValid }
80-
?.let { trySend(it) }
98+
if (response.isValid) {
99+
trySendBlocking(response)
100+
.onFailure { t -> Log.w("ValueChangeCallback", "Sending response to Flow failed with: $t") }
101+
}
81102
}
82103
awaitClose {
83104
// There's no way to unregister the callback from here.

0 commit comments

Comments
 (0)