Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread calling the Native API changed from main to background. #255

Merged
merged 2 commits into from
Aug 3, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion rxandroidble/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ android {

dependencies {
compile rootProject.ext.libs.rxjava
compile rootProject.ext.libs.rxandroid
compile rootProject.ext.libs.rxrelay
compile rootProject.ext.libs.support_annotations
compile rootProject.ext.libs.dagger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import android.location.LocationManager;
import android.os.Build;
import android.support.annotation.Nullable;
import android.support.annotation.RestrictTo;
import com.polidea.rxandroidble.helpers.LocationServicesOkObservable;
import com.polidea.rxandroidble.internal.DeviceComponent;
import com.polidea.rxandroidble.internal.scan.InternalToExternalScanResultConverter;
Expand All @@ -33,7 +34,6 @@
import javax.inject.Provider;
import rx.Observable;
import rx.Scheduler;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

Expand All @@ -43,11 +43,10 @@ public interface ClientComponent {

class NamedSchedulers {

public static final String MAIN_THREAD = "main-thread";
public static final String COMPUTATION = "computation";
public static final String RADIO_OPERATIONS = "callback-emitter";
public static final String TIMEOUT = "timeout";
public static final String GATT_CALLBACK = "callback";
public static final String BLUETOOTH_INTERACTION = "bluetooth_interaction";
public static final String BLUETOOTH_CALLBACKS = "bluetooth_callbacks";
private NamedSchedulers() {

}
Expand Down Expand Up @@ -119,28 +118,50 @@ static int provideDeviceSdk() {
}

@Provides
@Named(NamedSchedulers.GATT_CALLBACK)
@Named(NamedSchedulers.BLUETOOTH_INTERACTION)
@ClientScope
static ExecutorService provideGattCallbackExecutorService() {
static ExecutorService provideBluetoothInteractionExecutorService() {
return Executors.newSingleThreadExecutor();
}

@Provides
@Named(NamedSchedulers.GATT_CALLBACK)
@Named(NamedSchedulers.BLUETOOTH_CALLBACKS)
@ClientScope
static Scheduler provideGattCallbackScheduler(@Named(NamedSchedulers.GATT_CALLBACK) ExecutorService executorService) {
return Schedulers.from(executorService);
static ExecutorService provideBluetoothCallbacksExecutorService() {
return Executors.newSingleThreadExecutor();
}

@Provides
LocationManager provideLocationManager() {
return (LocationManager) context.getSystemService(Context.LOCATION_SERVICE);
@Named(NamedSchedulers.BLUETOOTH_INTERACTION)
@ClientScope
static Scheduler provideBluetoothInteractionScheduler(@Named(NamedSchedulers.BLUETOOTH_INTERACTION) ExecutorService service) {
return Schedulers.from(service);
}

@Provides
@Named(NamedSchedulers.BLUETOOTH_CALLBACKS)
@ClientScope
static Scheduler provideBluetoothCallbacksScheduler(@Named(NamedSchedulers.BLUETOOTH_CALLBACKS) ExecutorService service) {
return Schedulers.from(service);
}

@Provides
@Named(NamedSchedulers.MAIN_THREAD)
static Scheduler provideMainThreadScheduler() {
return AndroidSchedulers.mainThread();
static ClientComponentFinalizer provideFinalizationCloseable(
@Named(NamedSchedulers.BLUETOOTH_INTERACTION) final ExecutorService interactionExecutorService,
@Named(NamedSchedulers.BLUETOOTH_CALLBACKS) final ExecutorService callbacksExecutorService
) {
return new ClientComponentFinalizer() {
@Override
public void onFinalize() {
interactionExecutorService.shutdown();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this is incorrect. Dagger will provide you a new instance of the executor each time you ask for it. You seems to be closing an instance that wasn't even started and you're leaving started service as is.

Provided executors should be scoped.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I see the executors are scoped. And .shutdown() is executed in RxBleClient.finalize(). Or am I missing something?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct, I missed the annotation @ClientScope

callbacksExecutorService.shutdown();
}
};
}

@Provides
LocationManager provideLocationManager() {
return (LocationManager) context.getSystemService(Context.LOCATION_SERVICE);
}

@Provides
Expand Down Expand Up @@ -227,10 +248,6 @@ abstract class ClientModuleBinder {
@ClientScope
abstract RxBleRadio bindRxBleRadio(RxBleRadioImpl rxBleRadio);

@Binds
@Named(NamedSchedulers.RADIO_OPERATIONS)
abstract Scheduler bindCallbackScheduler(@Named(NamedSchedulers.MAIN_THREAD) Scheduler mainThreadScheduler);

@Binds
@Named(NamedSchedulers.TIMEOUT)
abstract Scheduler bindTimeoutScheduler(@Named(NamedSchedulers.COMPUTATION) Scheduler computationScheduler);
Expand All @@ -242,4 +259,10 @@ abstract class ClientModuleBinder {
LocationServicesOkObservable locationServicesOkObservable();

RxBleClient rxBleClient();

@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
interface ClientComponentFinalizer {

void onFinalize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;

import javax.inject.Inject;
import javax.inject.Named;
Expand All @@ -50,8 +49,8 @@ class RxBleClientImpl extends RxBleClient {
private final ScanSetupBuilder scanSetupBuilder;
private final ScanPreconditionsVerifier scanPreconditionVerifier;
private final Func1<RxBleInternalScanResult, ScanResult> internalToExternalScanResultMapFunction;
private final ExecutorService executorService;
private final Scheduler mainThreadScheduler;
private final ClientComponent.ClientComponentFinalizer clientComponentFinalizer;
private final Scheduler bluetoothInteractionScheduler;
private final Map<Set<UUID>, Observable<RxBleScanResult>> queuedScanOperations = new HashMap<>();
private final RxBleAdapterWrapper rxBleAdapterWrapper;
private final Observable<BleAdapterState> rxBleAdapterStateObservable;
Expand All @@ -69,8 +68,8 @@ class RxBleClientImpl extends RxBleClient {
ScanSetupBuilder scanSetupBuilder,
ScanPreconditionsVerifier scanPreconditionVerifier,
Func1<RxBleInternalScanResult, ScanResult> internalToExternalScanResultMapFunction,
@Named(ClientComponent.NamedSchedulers.GATT_CALLBACK) ExecutorService executorService,
@Named(ClientComponent.NamedSchedulers.MAIN_THREAD) Scheduler mainThreadScheduler) {
@Named(ClientComponent.NamedSchedulers.BLUETOOTH_INTERACTION) Scheduler bluetoothInteractionScheduler,
ClientComponent.ClientComponentFinalizer clientComponentFinalizer) {
this.uuidUtil = uuidUtil;
this.rxBleRadio = rxBleRadio;
this.rxBleAdapterWrapper = rxBleAdapterWrapper;
Expand All @@ -81,14 +80,14 @@ class RxBleClientImpl extends RxBleClient {
this.scanSetupBuilder = scanSetupBuilder;
this.scanPreconditionVerifier = scanPreconditionVerifier;
this.internalToExternalScanResultMapFunction = internalToExternalScanResultMapFunction;
this.executorService = executorService;
this.mainThreadScheduler = mainThreadScheduler;
this.bluetoothInteractionScheduler = bluetoothInteractionScheduler;
this.clientComponentFinalizer = clientComponentFinalizer;
}

@Override
protected void finalize() throws Throwable {
clientComponentFinalizer.onFinalize();
super.finalize();
executorService.shutdown();
}

@Override
Expand Down Expand Up @@ -118,7 +117,7 @@ public Observable<ScanResult> call() {
final ScanSetup scanSetup = scanSetupBuilder.build(scanSettings, scanFilters);
final Operation<RxBleInternalScanResult> scanOperation = scanSetup.scanOperation;
return rxBleRadio.queue(scanOperation)
.unsubscribeOn(mainThreadScheduler)
.unsubscribeOn(bluetoothInteractionScheduler)
.compose(scanSetup.scanOperationBehaviourEmulatorTransformer)
.map(internalToExternalScanResultMapFunction)
.mergeWith(RxBleClientImpl.this.<ScanResult>bluetoothAdapterOffExceptionObservable());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.android.schedulers.AndroidSchedulers;

/**
* Represents a custom operation that will be enqueued for future execution within the client instance.
Expand Down Expand Up @@ -41,7 +40,6 @@ public interface RxBleRadioOperationCustom<T> {
* @param bluetoothGatt The Android API GATT instance
* @param rxBleGattCallback The internal Rx ready bluetooth gatt callback to be notified of GATT operations
* @param scheduler The RxBleRadio scheduler used to asObservable operation
* (currently {@link AndroidSchedulers#mainThread()}
* @throws Throwable Any exception that your custom operation might throw
*/
@NonNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public RxBleConnectionImpl(
DescriptorWriter descriptorWriter,
OperationsProvider operationProvider,
Provider<LongWriteOperationBuilder> longWriteOperationBuilderProvider,
@Named(ClientComponent.NamedSchedulers.RADIO_OPERATIONS) Scheduler callbackScheduler
@Named(ClientComponent.NamedSchedulers.BLUETOOTH_INTERACTION) Scheduler callbackScheduler
) {
this.rxBleRadio = rxBleRadio;
this.gattCallback = gattCallback;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public Object call(Pair<BluetoothGatt, RxBleConnectionState> bluetoothGattRxBleC
.autoConnect(0);

@Inject
public RxBleGattCallback(@Named(ClientComponent.NamedSchedulers.GATT_CALLBACK) Scheduler callbackScheduler,
public RxBleGattCallback(@Named(ClientComponent.NamedSchedulers.BLUETOOTH_CALLBACKS) Scheduler callbackScheduler,
BluetoothGattProvider bluetoothGattProvider,
NativeCallbackDispatcher nativeCallbackDispatcher) {
this.callbackScheduler = callbackScheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class OperationsProviderImpl implements OperationsProvider {
private final RxBleGattCallback rxBleGattCallback;
private final BluetoothGatt bluetoothGatt;
private final TimeoutConfiguration timeoutConfiguration;
private final Scheduler mainThreadScheduler;
private final Scheduler bluetoothInteractionScheduler;
private final Scheduler timeoutScheduler;
private final Provider<RxBleRadioOperationReadRssi> rssiReadOperationProvider;

Expand All @@ -34,13 +34,13 @@ public class OperationsProviderImpl implements OperationsProvider {
RxBleGattCallback rxBleGattCallback,
BluetoothGatt bluetoothGatt,
@Named(DeviceModule.OPERATION_TIMEOUT) TimeoutConfiguration timeoutConfiguration,
@Named(ClientComponent.NamedSchedulers.MAIN_THREAD) Scheduler mainThreadScheduler,
@Named(ClientComponent.NamedSchedulers.BLUETOOTH_INTERACTION) Scheduler bluetoothInteractionScheduler,
@Named(ClientComponent.NamedSchedulers.TIMEOUT) Scheduler timeoutScheduler,
Provider<RxBleRadioOperationReadRssi> rssiReadOperationProvider) {
this.rxBleGattCallback = rxBleGattCallback;
this.bluetoothGatt = bluetoothGatt;
this.timeoutConfiguration = timeoutConfiguration;
this.mainThreadScheduler = mainThreadScheduler;
this.bluetoothInteractionScheduler = bluetoothInteractionScheduler;
this.timeoutScheduler = timeoutScheduler;
this.rssiReadOperationProvider = rssiReadOperationProvider;
}
Expand All @@ -54,7 +54,7 @@ public RxBleRadioOperationCharacteristicLongWrite provideLongWriteOperation(

return new RxBleRadioOperationCharacteristicLongWrite(bluetoothGatt,
rxBleGattCallback,
mainThreadScheduler,
bluetoothInteractionScheduler,
timeoutConfiguration,
bluetoothGattCharacteristic,
maxBatchSizeProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class RxBleRadioOperationCharacteristicLongWrite extends RxBleRadioOperat

private final BluetoothGatt bluetoothGatt;
private final RxBleGattCallback rxBleGattCallback;
private final Scheduler mainThreadScheduler;
private final Scheduler bluetoothInteractionScheduler;
private final TimeoutConfiguration timeoutConfiguration;
private final BluetoothGattCharacteristic bluetoothGattCharacteristic;
private final PayloadSizeLimitProvider batchSizeProvider;
Expand All @@ -49,15 +49,15 @@ public class RxBleRadioOperationCharacteristicLongWrite extends RxBleRadioOperat
RxBleRadioOperationCharacteristicLongWrite(
BluetoothGatt bluetoothGatt,
RxBleGattCallback rxBleGattCallback,
@Named(ClientComponent.NamedSchedulers.MAIN_THREAD) Scheduler mainThreadScheduler,
@Named(ClientComponent.NamedSchedulers.BLUETOOTH_INTERACTION) Scheduler bluetoothInteractionScheduler,
@Named(DeviceModule.OPERATION_TIMEOUT) TimeoutConfiguration timeoutConfiguration,
BluetoothGattCharacteristic bluetoothGattCharacteristic,
PayloadSizeLimitProvider batchSizeProvider,
WriteOperationAckStrategy writeOperationAckStrategy,
byte[] bytesToWrite) {
this.bluetoothGatt = bluetoothGatt;
this.rxBleGattCallback = rxBleGattCallback;
this.mainThreadScheduler = mainThreadScheduler;
this.bluetoothInteractionScheduler = bluetoothInteractionScheduler;
this.timeoutConfiguration = timeoutConfiguration;
this.bluetoothGattCharacteristic = bluetoothGattCharacteristic;
this.batchSizeProvider = batchSizeProvider;
Expand All @@ -79,7 +79,7 @@ protected void protectedRun(final Emitter<byte[]> emitter, final RadioReleaseInt

final RadioReleasingEmitterWrapper<byte[]> emitterWrapper = new RadioReleasingEmitterWrapper<>(emitter, radioReleaseInterface);
writeBatchAndObserve(batchSize, byteBuffer)
.subscribeOn(mainThreadScheduler)
.subscribeOn(bluetoothInteractionScheduler)
.takeFirst(writeResponseForMatchingCharacteristic(bluetoothGattCharacteristic))
.timeout(
timeoutConfiguration.timeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@

import rx.Emitter;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;

import static com.polidea.rxandroidble.RxBleConnection.RxBleConnectionState.DISCONNECTED;
Expand All @@ -38,7 +38,7 @@ public class RxBleRadioOperationDisconnect extends RxBleRadioOperation<Void> {
private final BluetoothGattProvider bluetoothGattProvider;
private final String macAddress;
private final BluetoothManager bluetoothManager;
private final Scheduler mainThreadScheduler;
private final Scheduler bluetoothInteractionScheduler;
private final TimeoutConfiguration timeoutConfiguration;
private final ConnectionStateChangeListener connectionStateChangeListener;

Expand All @@ -48,14 +48,14 @@ public class RxBleRadioOperationDisconnect extends RxBleRadioOperation<Void> {
BluetoothGattProvider bluetoothGattProvider,
@Named(DeviceModule.MAC_ADDRESS) String macAddress,
BluetoothManager bluetoothManager,
@Named(ClientComponent.NamedSchedulers.MAIN_THREAD) Scheduler mainThreadScheduler,
@Named(ClientComponent.NamedSchedulers.BLUETOOTH_INTERACTION) Scheduler bluetoothInteractionScheduler,
@Named(DeviceModule.DISCONNECT_TIMEOUT) TimeoutConfiguration timeoutConfiguration,
ConnectionStateChangeListener connectionStateChangeListener) {
this.rxBleGattCallback = rxBleGattCallback;
this.bluetoothGattProvider = bluetoothGattProvider;
this.macAddress = macAddress;
this.bluetoothManager = bluetoothManager;
this.mainThreadScheduler = mainThreadScheduler;
this.bluetoothInteractionScheduler = bluetoothInteractionScheduler;
this.timeoutConfiguration = timeoutConfiguration;
this.connectionStateChangeListener = connectionStateChangeListener;
}
Expand All @@ -71,30 +71,26 @@ protected void protectedRun(final Emitter<Void> emitter, final RadioReleaseInter
emitter.onCompleted();
} else {
(isDisconnected(bluetoothGatt) ? just(bluetoothGatt) : disconnect(bluetoothGatt))
.observeOn(mainThreadScheduler)
.subscribe(
new Action1<BluetoothGatt>() {
@Override
public void call(BluetoothGatt bluetoothGatt) {
bluetoothGatt.close();
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
radioReleaseInterface.release();
emitter.onError(throwable);
}
},
new Action0() {
@Override
public void call() {
connectionStateChangeListener.onConnectionStateChange(DISCONNECTED);
radioReleaseInterface.release();
emitter.onCompleted();
}
}
);
.observeOn(bluetoothInteractionScheduler)
.subscribe(new Observer<BluetoothGatt>() {
@Override
public void onNext(BluetoothGatt bluetoothGatt) {
bluetoothGatt.close();
}

@Override
public void onError(Throwable throwable) {
radioReleaseInterface.release();
emitter.onError(throwable);
}

@Override
public void onCompleted() {
connectionStateChangeListener.onConnectionStateChange(DISCONNECTED);
radioReleaseInterface.release();
emitter.onCompleted();
}
});
}
}

Expand All @@ -109,7 +105,7 @@ private boolean isDisconnected(BluetoothGatt bluetoothGatt) {
* 2. The same BluetoothGatt - in this situation we should probably cancel the pending BluetoothGatt.close() call
*/
private Observable<BluetoothGatt> disconnect(BluetoothGatt bluetoothGatt) {
return new DisconnectGattObservable(bluetoothGatt, rxBleGattCallback, mainThreadScheduler)
return new DisconnectGattObservable(bluetoothGatt, rxBleGattCallback, bluetoothInteractionScheduler)
.timeout(timeoutConfiguration.timeout, timeoutConfiguration.timeoutTimeUnit, just(bluetoothGatt),
timeoutConfiguration.timeoutScheduler);
}
Expand All @@ -126,13 +122,12 @@ private static class DisconnectGattObservable extends Observable<BluetoothGatt>
public void call(Subscriber<? super BluetoothGatt> subscriber) {
rxBleGattCallback
.getOnConnectionStateChange()
.filter(new Func1<RxBleConnection.RxBleConnectionState, Boolean>() {
.takeFirst(new Func1<RxBleConnection.RxBleConnectionState, Boolean>() {
@Override
public Boolean call(RxBleConnection.RxBleConnectionState rxBleConnectionState) {
return rxBleConnectionState == DISCONNECTED;
}
})
.take(1)
.map(new Func1<RxBleConnection.RxBleConnectionState, BluetoothGatt>() {
@Override
public BluetoothGatt call(RxBleConnection.RxBleConnectionState rxBleConnectionState) {
Expand Down
Loading