Skip to content

Commit

Permalink
[#165] Added support for native BluetoothGattCallback usage in custom…
Browse files Browse the repository at this point in the history
… operations. (#237)
  • Loading branch information
uKL authored Jun 29, 2017
1 parent acb38c4 commit bad5797
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 18 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
Change Log
==========

Version 1.4.0-SNAPSHOT
* Added native callback usage support in custom operations. You may consider this API if your implementation is performance critical. (https://github.com/Polidea/RxAndroidBle/issues/165)

Version 1.3.2
* Fixed completing the `Observable<byte[]>` emitted by `RxBleConnection.setupNotification()`/`RxBleConnection.setupIndication()` when unsubscribed (https://github.com/Polidea/RxAndroidBle/issues/231)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.polidea.rxandroidble.internal.connection;

import android.annotation.TargetApi;
import android.bluetooth.BluetoothGatt;
import android.bluetooth.BluetoothGattCallback;
import android.bluetooth.BluetoothGattCharacteristic;
import android.bluetooth.BluetoothGattDescriptor;
import android.os.Build;

import javax.inject.Inject;

class NativeCallbackDispatcher {

private BluetoothGattCallback nativeCallback;

@Inject
NativeCallbackDispatcher() {

}

public void notifyNativeChangedCallback(BluetoothGatt gatt, BluetoothGattCharacteristic characteristic) {
if (nativeCallback != null) {
nativeCallback.onCharacteristicChanged(gatt, characteristic);
}
}

public void notifyNativeConnectionStateCallback(BluetoothGatt gatt, int status, int newState) {
if (nativeCallback != null) {
nativeCallback.onConnectionStateChange(gatt, status, newState);
}
}

public void notifyNativeDescriptorReadCallback(BluetoothGatt gatt, BluetoothGattDescriptor descriptor, int status) {
if (nativeCallback != null) {
nativeCallback.onDescriptorRead(gatt, descriptor, status);
}
}

public void notifyNativeDescriptorWriteCallback(BluetoothGatt gatt, BluetoothGattDescriptor descriptor, int status) {
if (nativeCallback != null) {
nativeCallback.onDescriptorWrite(gatt, descriptor, status);
}
}

@TargetApi(Build.VERSION_CODES.LOLLIPOP)
public void notifyNativeMtuChangedCallback(BluetoothGatt gatt, int mtu, int status) {
if (nativeCallback != null) {
nativeCallback.onMtuChanged(gatt, mtu, status);
}
}

public void notifyNativeReadRssiCallback(BluetoothGatt gatt, int rssi, int status) {
if (nativeCallback != null) {
nativeCallback.onReadRemoteRssi(gatt, rssi, status);
}
}

public void notifyNativeReliableWriteCallback(BluetoothGatt gatt, int status) {
if (nativeCallback != null) {
nativeCallback.onReliableWriteCompleted(gatt, status);
}
}

public void notifyNativeServicesDiscoveredCallback(BluetoothGatt gatt, int status) {
if (nativeCallback != null) {
nativeCallback.onServicesDiscovered(gatt, status);
}
}

public void notifyNativeWriteCallback(BluetoothGatt gatt, BluetoothGattCharacteristic characteristic, int status) {
if (nativeCallback != null) {
nativeCallback.onCharacteristicWrite(gatt, characteristic, status);
}
}

void setNativeCallback(BluetoothGattCallback callback) {
this.nativeCallback = callback;
}

void notifyNativeReadCallback(BluetoothGatt gatt, BluetoothGattCharacteristic characteristic, int status) {
if (nativeCallback != null) {
nativeCallback.onCharacteristicRead(gatt, characteristic, status);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import rx.Emitter;
import rx.Observable;
import rx.Scheduler;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;

Expand Down Expand Up @@ -316,7 +317,22 @@ protected void protectedRun(Emitter<T> emitter, RadioReleaseInterface radioRelea
throw new IllegalArgumentException("The custom operation asObservable method must return a non-null observable");
}

operationObservable.subscribe(emitterWrapper);
operationObservable
.doOnTerminate(clearNativeCallbackReferenceAction())
.subscribe(emitterWrapper);
}

/**
* The Native Callback abstractions is intended to be used only in a custom operation, therefore, to make sure
* that we won't leak any references it's a good idea to clean it.
*/
private Action0 clearNativeCallbackReferenceAction() {
return new Action0() {
@Override
public void call() {
gattCallback.setNativeCallback(null);
}
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.jakewharton.rxrelay.PublishRelay;
import com.jakewharton.rxrelay.SerializedRelay;
import com.polidea.rxandroidble.ClientComponent;

import com.polidea.rxandroidble.RxBleConnection.RxBleConnectionState;
import com.polidea.rxandroidble.RxBleDeviceServices;
import com.polidea.rxandroidble.exceptions.BleDisconnectedException;
Expand All @@ -35,6 +34,7 @@ public class RxBleGattCallback {

private final Scheduler callbackScheduler;
private final BluetoothGattProvider bluetoothGattProvider;
private final NativeCallbackDispatcher nativeCallbackDispatcher;
private final Output<Pair<BluetoothGatt, RxBleConnectionState>> gattAndConnectionStateOutput = new Output<>();
private final Output<RxBleDeviceServices> servicesDiscoveredOutput = new Output<>();
private final Output<ByteAssociation<UUID>> readCharacteristicOutput = new Output<>();
Expand Down Expand Up @@ -70,9 +70,11 @@ public Object call(Pair<BluetoothGatt, RxBleConnectionState> bluetoothGattRxBleC

@Inject
public RxBleGattCallback(@Named(ClientComponent.NamedSchedulers.GATT_CALLBACK) Scheduler callbackScheduler,
BluetoothGattProvider bluetoothGattProvider) {
BluetoothGattProvider bluetoothGattProvider,
NativeCallbackDispatcher nativeCallbackDispatcher) {
this.callbackScheduler = callbackScheduler;
this.bluetoothGattProvider = bluetoothGattProvider;
this.nativeCallbackDispatcher = nativeCallbackDispatcher;
}

private boolean isDisconnectedOrDisconnecting(Pair<BluetoothGatt, RxBleConnectionState> pair) {
Expand All @@ -85,6 +87,7 @@ private boolean isDisconnectedOrDisconnecting(Pair<BluetoothGatt, RxBleConnectio
@Override
public void onConnectionStateChange(BluetoothGatt gatt, int status, int newState) {
RxBleLog.d("onConnectionStateChange newState=%d status=%d", newState, status);
nativeCallbackDispatcher.notifyNativeConnectionStateCallback(gatt, status, newState);
super.onConnectionStateChange(gatt, status, newState);
bluetoothGattProvider.updateBluetoothGatt(gatt);

Expand All @@ -95,19 +98,22 @@ public void onConnectionStateChange(BluetoothGatt gatt, int status, int newState
@Override
public void onServicesDiscovered(BluetoothGatt gatt, int status) {
RxBleLog.d("onServicesDiscovered status=%d", status);
nativeCallbackDispatcher.notifyNativeServicesDiscoveredCallback(gatt, status);
super.onServicesDiscovered(gatt, status);

if (!propagateErrorIfOccurred(servicesDiscoveredOutput, gatt, status, BleGattOperationType.SERVICE_DISCOVERY)) {
if (servicesDiscoveredOutput.hasObservers()
&& !propagateErrorIfOccurred(servicesDiscoveredOutput, gatt, status, BleGattOperationType.SERVICE_DISCOVERY)) {
servicesDiscoveredOutput.valueRelay.call(new RxBleDeviceServices(gatt.getServices()));
}
}

@Override
public void onCharacteristicRead(BluetoothGatt gatt, BluetoothGattCharacteristic characteristic, int status) {
RxBleLog.d("onCharacteristicRead characteristic=%s status=%d", characteristic.getUuid(), status);
nativeCallbackDispatcher.notifyNativeReadCallback(gatt, characteristic, status);
super.onCharacteristicRead(gatt, characteristic, status);

if (!propagateErrorIfOccurred(
if (readCharacteristicOutput.hasObservers() && !propagateErrorIfOccurred(
readCharacteristicOutput, gatt, characteristic, status, BleGattOperationType.CHARACTERISTIC_READ
)) {
readCharacteristicOutput.valueRelay.call(new ByteAssociation<>(characteristic.getUuid(), characteristic.getValue()));
Expand All @@ -117,9 +123,10 @@ public void onCharacteristicRead(BluetoothGatt gatt, BluetoothGattCharacteristic
@Override
public void onCharacteristicWrite(BluetoothGatt gatt, BluetoothGattCharacteristic characteristic, int status) {
RxBleLog.d("onCharacteristicWrite characteristic=%s status=%d", characteristic.getUuid(), status);
nativeCallbackDispatcher.notifyNativeWriteCallback(gatt, characteristic, status);
super.onCharacteristicWrite(gatt, characteristic, status);

if (!propagateErrorIfOccurred(
if (writeCharacteristicOutput.hasObservers() && !propagateErrorIfOccurred(
writeCharacteristicOutput, gatt, characteristic, status, BleGattOperationType.CHARACTERISTIC_WRITE
)) {
writeCharacteristicOutput.valueRelay.call(new ByteAssociation<>(characteristic.getUuid(), characteristic.getValue()));
Expand All @@ -129,64 +136,76 @@ public void onCharacteristicWrite(BluetoothGatt gatt, BluetoothGattCharacteristi
@Override
public void onCharacteristicChanged(BluetoothGatt gatt, BluetoothGattCharacteristic characteristic) {
RxBleLog.d("onCharacteristicChanged characteristic=%s", characteristic.getUuid());
nativeCallbackDispatcher.notifyNativeChangedCallback(gatt, characteristic);
super.onCharacteristicChanged(gatt, characteristic);

/*
* It is important to call changedCharacteristicSerializedPublishRelay as soon as possible because a quick changing
* characteristic could lead to out-of-order execution since onCharacteristicChanged may be called on arbitrary
* threads.
*/
changedCharacteristicSerializedPublishRelay.call(
new CharacteristicChangedEvent(
characteristic.getUuid(),
characteristic.getInstanceId(),
characteristic.getValue()
)
);
if (changedCharacteristicSerializedPublishRelay.hasObservers()) {
changedCharacteristicSerializedPublishRelay.call(
new CharacteristicChangedEvent(
characteristic.getUuid(),
characteristic.getInstanceId(),
characteristic.getValue()
)
);
}
}

@Override
public void onDescriptorRead(BluetoothGatt gatt, BluetoothGattDescriptor descriptor, int status) {
RxBleLog.d("onCharacteristicRead descriptor=%s status=%d", descriptor.getUuid(), status);
nativeCallbackDispatcher.notifyNativeDescriptorReadCallback(gatt, descriptor, status);
super.onDescriptorRead(gatt, descriptor, status);

if (!propagateErrorIfOccurred(readDescriptorOutput, gatt, descriptor, status, BleGattOperationType.DESCRIPTOR_READ)) {
if (readDescriptorOutput.hasObservers()
&& !propagateErrorIfOccurred(readDescriptorOutput, gatt, descriptor, status, BleGattOperationType.DESCRIPTOR_READ)) {
readDescriptorOutput.valueRelay.call(new ByteAssociation<>(descriptor, descriptor.getValue()));
}
}

@Override
public void onDescriptorWrite(BluetoothGatt gatt, BluetoothGattDescriptor descriptor, int status) {
RxBleLog.d("onDescriptorWrite descriptor=%s status=%d", descriptor.getUuid(), status);
nativeCallbackDispatcher.notifyNativeDescriptorWriteCallback(gatt, descriptor, status);
super.onDescriptorWrite(gatt, descriptor, status);

if (!propagateErrorIfOccurred(writeDescriptorOutput, gatt, descriptor, status, BleGattOperationType.DESCRIPTOR_WRITE)) {
if (writeDescriptorOutput.hasObservers()
&& !propagateErrorIfOccurred(writeDescriptorOutput, gatt, descriptor, status, BleGattOperationType.DESCRIPTOR_WRITE)) {
writeDescriptorOutput.valueRelay.call(new ByteAssociation<>(descriptor, descriptor.getValue()));
}
}

@Override
public void onReliableWriteCompleted(BluetoothGatt gatt, int status) {
RxBleLog.d("onReliableWriteCompleted status=%d", status);
nativeCallbackDispatcher.notifyNativeReliableWriteCallback(gatt, status);
super.onReliableWriteCompleted(gatt, status);
}

@Override
public void onReadRemoteRssi(BluetoothGatt gatt, int rssi, int status) {
RxBleLog.d("onReadRemoteRssi rssi=%d status=%d", rssi, status);
nativeCallbackDispatcher.notifyNativeReadRssiCallback(gatt, rssi, status);
super.onReadRemoteRssi(gatt, rssi, status);

if (!propagateErrorIfOccurred(readRssiOutput, gatt, status, BleGattOperationType.READ_RSSI)) {
if (readRssiOutput.hasObservers()
&& !propagateErrorIfOccurred(readRssiOutput, gatt, status, BleGattOperationType.READ_RSSI)) {
readRssiOutput.valueRelay.call(rssi);
}
}

@Override
public void onMtuChanged(BluetoothGatt gatt, int mtu, int status) {
RxBleLog.d("onMtuChanged mtu=%d status=%d", mtu, status);
nativeCallbackDispatcher.notifyNativeMtuChangedCallback(gatt, mtu, status);
super.onMtuChanged(gatt, mtu, status);

if (!propagateErrorIfOccurred(changedMtuOutput, gatt, status, BleGattOperationType.ON_MTU_CHANGED)) {
if (changedMtuOutput.hasObservers()
&& !propagateErrorIfOccurred(changedMtuOutput, gatt, status, BleGattOperationType.ON_MTU_CHANGED)) {
changedMtuOutput.valueRelay.call(mtu);
}
}
Expand Down Expand Up @@ -327,13 +346,31 @@ public Observable<Integer> getOnRssiRead() {
return withDisconnectionHandling(readRssiOutput).observeOn(callbackScheduler);
}

/**
* A native callback allows to omit RxJava's abstraction on the {@link BluetoothGattCallback}.
* It's intended to be used only with a {@link com.polidea.rxandroidble.RxBleRadioOperationCustom} in a performance
* critical implementations. If you don't know if your operation is performance critical it's likely that you shouldn't use this API
* and stick with the RxJava.
*
* The callback reference will be automatically released after the operation is terminated. The main drawback of this API is that
* we can't assure you the thread on which it will be executed. Please keep this in mind as the system may execute it on a main thread.
*/
public void setNativeCallback(BluetoothGattCallback callback) {
nativeCallbackDispatcher.setNativeCallback(callback);
}

private static class Output<T> {

final PublishRelay<T> valueRelay;
final PublishRelay<BleGattException> errorRelay;

Output() {
this.valueRelay = PublishRelay.create();
this.errorRelay = PublishRelay.create();
}

boolean hasObservers() {
return valueRelay.hasObservers() || valueRelay.hasObservers();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.polidea.rxandroidble.internal.connection

import android.bluetooth.BluetoothGatt
import android.bluetooth.BluetoothGattCallback
import android.bluetooth.BluetoothGattCharacteristic
import android.bluetooth.BluetoothGattDescriptor
import android.bluetooth.BluetoothGattService
Expand All @@ -21,6 +22,7 @@ import rx.subjects.PublishSubject
import spock.lang.Specification
import spock.lang.Unroll

import static rx.Observable.error
import static rx.Observable.from
import static rx.Observable.just

Expand Down Expand Up @@ -312,6 +314,48 @@ class RxBleConnectionTest extends Specification {
testSubscriber.assertError(RuntimeException.class)
}

def "should clear native gatt callback after custom operation is finished"() {
given:
def nativeCallback = Mock BluetoothGattCallback
def radioOperationCustom = new RxBleRadioOperationCustom<Boolean>() {

@Override
Observable<Boolean> asObservable(BluetoothGatt bluetoothGatt,
RxBleGattCallback rxBleGattCallback,
Scheduler scheduler) throws Throwable {
rxBleGattCallback.setNativeCallback(nativeCallback)
return just(true)
}
}

when:
objectUnderTest.queue(radioOperationCustom).subscribe(testSubscriber)

then:
1 * gattCallback.setNativeCallback(null)
}

def "should clear native gatt callback after custom operation failed"() {
given:
def nativeCallback = Mock BluetoothGattCallback
def radioOperationCustom = new RxBleRadioOperationCustom<Boolean>() {

@Override
Observable<Boolean> asObservable(BluetoothGatt bluetoothGatt,
RxBleGattCallback rxBleGattCallback,
Scheduler scheduler) throws Throwable {
rxBleGattCallback.setNativeCallback(nativeCallback)
return error(new IllegalArgumentException("Oh no, da error!"))
}
}

when:
objectUnderTest.queue(radioOperationCustom).subscribe(testSubscriber)

then:
1 * gattCallback.setNativeCallback(null)
}

def "should pass error and release the radio if observable returned from RxBleRadioOperationCustom.asObservable() will emit error"() {
given:
def radioOperationCustom = customRadioOperationWithOutcome { Observable.error(new RuntimeException()) }
Expand Down
Loading

0 comments on commit bad5797

Please sign in to comment.