Skip to content

Restart discovery after re-initializing client. #1167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
fixup.
Signed-off-by: Akos Kitta <a.kitta@arduino.cc>
  • Loading branch information
Akos Kitta committed Jul 13, 2022
commit e9cd290ec7367896d2146ff8ccc3f1017fd805b3
124 changes: 79 additions & 45 deletions arduino-ide-extension/src/node/board-discovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ import {
AvailablePorts,
AttachedBoardsChangeEvent,
} from '../common/protocol';
import { Emitter } from '@theia/core/lib/common/event';
import { Emitter, Event } from '@theia/core/lib/common/event';
import { DisposableCollection } from '@theia/core/lib/common/disposable';
import { Disposable } from '@theia/core/shared/vscode-languageserver-protocol';
import { ArduinoCoreServiceClient } from './cli-protocol/cc/arduino/cli/commands/v1/commands_grpc_pb';
import { v4 } from 'uuid';
import { ServiceError } from './service-error';
import { BackendApplicationContribution } from '@theia/core/lib/node';
import { Deferred } from '@theia/core/lib/common/promise-util';

type Duplex = ClientDuplexStream<BoardListWatchRequest, BoardListWatchResponse>;
interface StreamWrapper extends Disposable {
Expand All @@ -30,7 +31,8 @@ interface StreamWrapper extends Disposable {

/**
* Singleton service for tracking the available ports and board and broadcasting the
* changes to all connected frontend instances. \
* changes to all connected frontend instances.
*
* Unlike other services, this is not connection scoped.
*/
@injectable()
Expand All @@ -45,16 +47,16 @@ export class BoardDiscovery
@inject(NotificationServiceServer)
private readonly notificationService: NotificationServiceServer;

// Used to know if the board watch process is already running to avoid
// starting it multiple times
private watching: boolean;
private watching: Deferred<void> | undefined;
private stopping: Deferred<void> | undefined;
private wrapper: StreamWrapper | undefined;
private readonly onStreamDidEndEmitter = new Emitter<void>(); // sent from the CLI when the discovery process is killed for example after the indexes update and the core client re-initialization.
private readonly onStreamDidCancelEmitter = new Emitter<void>(); // when the watcher is canceled by the IDE2
private readonly toDisposeOnStopWatch = new DisposableCollection();

/**
* Keys are the `address` of the ports.
*
* The `protocol` is ignored because the board detach event does not carry the protocol information,
* just the address.
* ```json
Expand All @@ -64,46 +66,57 @@ export class BoardDiscovery
* }
* ```
*/
private _state: AvailablePorts = {};
get state(): AvailablePorts {
return this._state;
private _availablePorts: AvailablePorts = {};
get availablePorts(): AvailablePorts {
return this._availablePorts;
}

onStart(): void {
this.start();
this.onClientDidRefresh(() => this.start());
this.onClientDidRefresh(() => this.restart());
}

private async restart(): Promise<void> {
this.logger.info('restarting before stop');
await this.stop();
this.logger.info('restarting after stop');
return this.start();
}

onStop(): void {
this.stop();
}

stop(): Promise<void> {
async stop(restart = false): Promise<void> {
this.logger.info('stop');
if (this.stopping) {
this.logger.info('stop already stopping');
return this.stopping.promise;
}
if (!this.watching) {
return;
}
this.stopping = new Deferred();
this.logger.info('>>> Stopping boards watcher...');
return new Promise<void>((resolve, reject) => {
const timeout = this.createTimeout(
BoardDiscovery.StopWatchTimeout,
reject
);
const timeout = this.createTimeout(10_000, reject);
const toDispose = new DisposableCollection();
toDispose.pushAll([
timeout,
this.onStreamDidEndEmitter.event(() => {
this.logger.info(
`<<< Received the end event from the stream. Boards watcher has been successfully stopped.`
);
this.watching = false;
const waitForEvent = (event: Event<unknown>) =>
event(() => {
this.logger.info('stop received event: either end or cancel');
toDispose.dispose();
this.stopping?.resolve();
this.stopping = undefined;
this.logger.info('stop stopped');
resolve();
}),
this.onStreamDidCancelEmitter.event(() => {
this.logger.info(
`<<< Received the cancel event from the stream. Boards watcher has been successfully stopped.`
);
this.watching = false;
toDispose.dispose();
resolve();
}),
if (restart) {
this.start();
}
});
toDispose.pushAll([
timeout,
waitForEvent(this.onStreamDidEndEmitter.event),
waitForEvent(this.onStreamDidCancelEmitter.event),
]);
this.logger.info('Canceling boards watcher...');
this.toDisposeOnStopWatch.dispose();
Expand Down Expand Up @@ -149,9 +162,14 @@ export class BoardDiscovery
}
const stream = client
.boardListWatch()
.on('end', () => this.onStreamDidEndEmitter.fire())
.on('end', () => {
this.logger.info('received end');
this.onStreamDidEndEmitter.fire();
})
.on('error', (error) => {
this.logger.info('error received');
if (ServiceError.isCancel(error)) {
this.logger.info('cancel error received!');
this.onStreamDidCancelEmitter.fire();
} else {
this.logger.error(
Expand All @@ -165,13 +183,21 @@ export class BoardDiscovery
stream,
uuid: v4(),
dispose: () => {
this.logger.info('disposing requesting cancel');
// Cancelling the stream will kill the discovery `builtin:mdns-discovery process`.
// The client (this class) will receive a `{"eventType":"quit","error":""}` response from the CLI.
stream.cancel();
this.logger.info('disposing canceled');
this.wrapper = undefined;
},
};
this.toDisposeOnStopWatch.pushAll([wrapper]);
this.toDisposeOnStopWatch.pushAll([
wrapper,
Disposable.create(() => {
this.watching?.reject(new Error(`Stopping watcher.`));
this.watching = undefined;
}),
]);
return wrapper;
}

Expand All @@ -188,17 +214,25 @@ export class BoardDiscovery
}

async start(): Promise<void> {
this.logger.info('start');
if (this.stopping) {
this.logger.info('start is stopping wait');
await this.stopping.promise;
this.logger.info('start stopped');
}
if (this.watching) {
// We want to avoid starting the board list watch process multiple
// times to meet unforeseen consequences
return;
this.logger.info('start already watching');
return this.watching.promise;
}
this.watching = new Deferred();
this.logger.info('start new deferred');
const { client, instance } = await this.coreClient;
const wrapper = await this.createWrapper(client);
wrapper.stream.on('data', async (resp: BoardListWatchResponse) => {
this.logger.info('onData', this.toJson(resp));
if (resp.getEventType() === 'quit') {
await this.stop();
this.logger.info('quit received');
this.stop();
return;
}

Expand All @@ -217,8 +251,8 @@ export class BoardDiscovery
throw new Error(`Unexpected event type: '${resp.getEventType()}'`);
}

const oldState = deepClone(this._state);
const newState = deepClone(this._state);
const oldState = deepClone(this._availablePorts);
const newState = deepClone(this._availablePorts);

const address = (detectedPort as any).getPort().getAddress();
const protocol = (detectedPort as any).getPort().getProtocol();
Expand Down Expand Up @@ -286,18 +320,21 @@ export class BoardDiscovery
},
};

this._state = newState;
this._availablePorts = newState;
this.notificationService.notifyAttachedBoardsDidChange(event);
}
});
this.logger.info('start request start watch');
await this.requestStartWatch(
new BoardListWatchRequest().setInstance(instance),
wrapper.stream
);
this.watching = true;
this.logger.info('start requested start watch');
this.watching.resolve();
this.logger.info('start resolved watching');
}

getAttachedBoards(state: AvailablePorts = this.state): Board[] {
getAttachedBoards(state: AvailablePorts = this.availablePorts): Board[] {
const attachedBoards: Board[] = [];
for (const portID of Object.keys(state)) {
const [, boards] = state[portID];
Expand All @@ -306,7 +343,7 @@ export class BoardDiscovery
return attachedBoards;
}

getAvailablePorts(state: AvailablePorts = this.state): Port[] {
getAvailablePorts(state: AvailablePorts = this.availablePorts): Port[] {
const availablePorts: Port[] = [];
for (const portID of Object.keys(state)) {
const [port] = state[portID];
Expand All @@ -315,6 +352,3 @@ export class BoardDiscovery
return availablePorts;
}
}
export namespace BoardDiscovery {
export const StopWatchTimeout = 10_000;
}
2 changes: 1 addition & 1 deletion arduino-ide-extension/src/node/boards-service-impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class BoardsServiceImpl
protected readonly boardDiscovery: BoardDiscovery;

async getState(): Promise<AvailablePorts> {
return this.boardDiscovery.state;
return this.boardDiscovery.availablePorts;
}

async getAttachedBoards(): Promise<Board[]> {
Expand Down