Skip to content

Commit

Permalink
feat: update message handling (#614)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexey-yarmosh authored Feb 1, 2025
1 parent 4e29af6 commit bee1440
Show file tree
Hide file tree
Showing 12 changed files with 162 additions and 21 deletions.
50 changes: 50 additions & 0 deletions src/lib/probe-validator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import config from 'config';
import TTLCache from '@isaacs/ttlcache';
import { type RedisCluster, getMeasurementRedisClient } from './redis/measurement-client.js';

export class ProbeValidator {
private readonly measurementIdToTests = new TTLCache<string, Map<string, string>>({
ttl: (config.get<number>('measurement.timeout') + 30) * 1000,
});

constructor (private readonly redis: RedisCluster) {}

addValidIds (measurementId: string, testId: string, probeUuid: string): void {
const measurement = this.measurementIdToTests.get(measurementId);

if (!measurement) {
this.measurementIdToTests.set(measurementId, new Map([ [ testId, probeUuid ] ]));
} else {
measurement.set(testId, probeUuid);
}
}

async validateProbe (measurementId: string, testId: string, probeUuid: string): Promise<void> {
const measurement = this.measurementIdToTests.get(measurementId);
let probeId = measurement && measurement.get(testId);

if (!probeId) {
probeId = await this.getProbeIdFromRedis(measurementId, testId);
}

if (!probeId) {
throw new Error(`Probe ID not found for measurement ID: ${measurementId}, test ID: ${testId}`);
} else if (probeId !== probeUuid) {
throw new Error(`Probe ID is wrong for measurement ID: ${measurementId}, test ID: ${testId}. Expected: ${probeId}, actual: ${probeUuid}`);
}
}

async getProbeIdFromRedis (measurementId: string, testId: string) {
return this.redis.hGet('gp:test-to-probe', `${measurementId}_${testId}`);
}
}

let probeValidator: ProbeValidator;

export const getProbeValidator = () => {
if (!probeValidator) {
probeValidator = new ProbeValidator(getMeasurementRedisClient());
}

return probeValidator;
};
6 changes: 4 additions & 2 deletions src/lib/ws/gateway.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { getMetricsAgent } from '../metrics.js';
import { listenMeasurementRequest } from '../../measurement/handler/request.js';
import { handleMeasurementAck } from '../../measurement/handler/ack.js';
import { handleMeasurementResult } from '../../measurement/handler/result.js';
import { handleMeasurementProgress } from '../../measurement/handler/progress.js';
Expand Down Expand Up @@ -41,9 +42,10 @@ io
socket.on('probe:isIPv4Supported:update', handleIsIPv4SupportedUpdate(probe));
socket.on('probe:dns:update', handleDnsUpdate(probe));
socket.on('probe:stats:report', handleStatsReport(probe));
socket.onAnyOutgoing(listenMeasurementRequest(probe));
subscribeWithHandler(socket, 'probe:measurement:ack', handleMeasurementAck(probe));
subscribeWithHandler(socket, 'probe:measurement:progress', handleMeasurementProgress);
subscribeWithHandler(socket, 'probe:measurement:result', handleMeasurementResult);
subscribeWithHandler(socket, 'probe:measurement:progress', handleMeasurementProgress(probe));
subscribeWithHandler(socket, 'probe:measurement:result', handleMeasurementResult(probe));

socket.on('disconnect', (reason) => {
logger.debug(`Probe disconnected. (reason: ${reason}) [${socket.id}][${probe.ipAddress}]`);
Expand Down
3 changes: 1 addition & 2 deletions src/measurement/handler/ack.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { Probe } from '../../probe/types.js';
import type { MeasurementAckMessage } from '../types.js';

// eslint-disable-next-line @typescript-eslint/no-unused-vars
export const handleMeasurementAck = (_probe: Probe) => async (_data: MeasurementAckMessage, ack: () => void): Promise<void> => {
export const handleMeasurementAck = (_probe: Probe) => async (_data: null, ack: () => void): Promise<void> => {
ack();
};
7 changes: 5 additions & 2 deletions src/measurement/handler/progress.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { getMeasurementRunner } from '../runner.js';
import type { Probe } from '../../probe/types.js';
import type { MeasurementProgressMessage } from '../types.js';
import { getMeasurementRunner } from '../runner.js';
import { getProbeValidator } from '../../lib/probe-validator.js';

const runner = getMeasurementRunner();

export const handleMeasurementProgress = async (data: MeasurementProgressMessage): Promise<void> => {
export const handleMeasurementProgress = (probe: Probe) => async (data: MeasurementProgressMessage): Promise<void> => {
await getProbeValidator().validateProbe(data.measurementId, data.testId, probe.uuid);
await runner.recordProgress(data);
};
12 changes: 12 additions & 0 deletions src/measurement/handler/request.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import type { Probe } from '../../probe/types.js';
import { getProbeValidator } from '../../lib/probe-validator.js';
import { MeasurementRequestMessage } from '../types.js';

export const listenMeasurementRequest = (probe: Probe) => (event: string, data: unknown) => {
if (event !== 'probe:measurement:request') {
return;
}

const message = data as MeasurementRequestMessage;
getProbeValidator().addValidIds(message.measurementId, message.testId, probe.uuid);
};
5 changes: 4 additions & 1 deletion src/measurement/handler/result.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import type { Probe } from '../../probe/types.js';
import type { MeasurementResultMessage } from '../types.js';
import { getMeasurementRunner } from '../runner.js';
import { getProbeValidator } from '../../lib/probe-validator.js';

const runner = getMeasurementRunner();

export const handleMeasurementResult = async (data: MeasurementResultMessage): Promise<void> => {
export const handleMeasurementResult = (probe: Probe) => async (data: MeasurementResultMessage): Promise<void> => {
await getProbeValidator().validateProbe(data.measurementId, data.testId, probe.uuid);
await runner.recordResult(data);
};
7 changes: 4 additions & 3 deletions src/measurement/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type { Probe } from '../probe/types.js';
import { getMetricsAgent, type MetricsAgent } from '../lib/metrics.js';
import type { MeasurementStore } from './store.js';
import { getMeasurementStore } from './store.js';
import type { MeasurementRequest, MeasurementResultMessage, MeasurementProgressMessage, UserRequest } from './types.js';
import type { MeasurementRequest, MeasurementResultMessage, MeasurementProgressMessage, UserRequest, MeasurementRequestMessage } from './types.js';
import { rateLimit } from '../lib/rate-limiter/rate-limiter-post.js';
import type { ExtendedContext } from '../types.js';

Expand Down Expand Up @@ -64,7 +64,7 @@ export class MeasurementRunner {
const maxInProgressTests = config.get<number>('measurement.maxInProgressTests');
onlineProbesMap.forEach((probe, index) => {
const inProgressUpdates = request.inProgressUpdates && inProgressTests++ < maxInProgressTests;
this.io.of(PROBES_NAMESPACE).to(probe.client).emit('probe:measurement:request', {
const requestMessage: MeasurementRequestMessage = {
measurementId,
testId: index.toString(),
measurement: {
Expand All @@ -73,7 +73,8 @@ export class MeasurementRunner {
target: request.target,
inProgressUpdates,
},
});
};
this.io.of(PROBES_NAMESPACE).to(probe.client).emit('probe:measurement:request', requestMessage);
});
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/measurement/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ export class MeasurementStore {
async createMeasurement (request: MeasurementRequest, onlineProbesMap: Map<number, Probe>, allProbes: (Probe | OfflineProbe)[]): Promise<string> {
const id = cryptoRandomString({ length: 16, type: 'alphanumeric' });
const key = getMeasurementKey(id);

const probesAwaitingTtl = config.get<number>('measurement.timeout') + 5;
const startTime = new Date();
const results = this.probesToResults(allProbes, request.type);

Expand All @@ -73,14 +71,17 @@ export class MeasurementStore {
results,
};
const measurementWithoutDefaults = this.removeDefaults(measurement, request);
const testsToProbes = Object.fromEntries(Array.from(onlineProbesMap, ([ testId, probe ]) => [ `${id}_${testId}`, probe.uuid ]));

await Promise.all([
this.redis.hSet('gp:in-progress', id, startTime.getTime()),
this.redis.set(getMeasurementKey(id, 'probes_awaiting'), onlineProbesMap.size, { EX: probesAwaitingTtl }),
this.redis.set(getMeasurementKey(id, 'probes_awaiting'), onlineProbesMap.size, { EX: config.get<number>('measurement.timeout') + 30 }),
this.redis.json.set(key, '$', measurementWithoutDefaults),
this.redis.json.set(getMeasurementKey(id, 'ips'), '$', allProbes.map(probe => probe.ipAddress)),
this.redis.expire(key, config.get<number>('measurement.resultTTL')),
this.redis.expire(getMeasurementKey(id, 'ips'), config.get<number>('measurement.resultTTL')),
!_.isEmpty(testsToProbes) && this.redis.hSet('gp:test-to-probe', testsToProbes),
!_.isEmpty(testsToProbes) && this.redis.hExpire('gp:test-to-probe', Object.keys(testsToProbes), config.get<number>('measurement.timeout') + 30),
]);

return id;
Expand Down
12 changes: 9 additions & 3 deletions src/measurement/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,16 @@ export type MeasurementRecord = {
/**
* Probe Messages
*/
export type MeasurementAckMessage = {
id: string;

export type MeasurementRequestMessage = {
testId: string;
measurementId: string;
};
measurement: MeasurementOptions & {
type: MeasurementRequest['type'];
target: MeasurementRequest['target'];
inProgressUpdates: MeasurementRequest['inProgressUpdates'];
}
}

export type MeasurementProgressMessage = {
testId: string;
Expand Down
2 changes: 0 additions & 2 deletions test/tests/integration/measurement/create-measurement.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,6 @@ describe('Create measurement', () => {
.send({
type: 'ping',
target: 'example.com',
limit: 2,
locations: [{
continent: 'NA',
}],
Expand Down Expand Up @@ -666,7 +665,6 @@ describe('Create measurement', () => {
await requestAgent.get(`/v1/measurements/${id2}`)
.expect(200)
.expect((response) => {
expect(response.body.limit).to.equal(2);
expect(response.body.locations).to.deep.equal([{ continent: 'NA' }]);
expect(response.body.results[0].result.status).to.equal('offline');
expect(response.body.results[0].result.rawOutput).to.equal('This probe is currently offline. Please try again later.');
Expand Down
29 changes: 26 additions & 3 deletions test/tests/unit/measurement/store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type { PingResult } from '../../../../src/measurement/types.js';

const getProbe = (id: string, ip: string) => ({
ipAddress: ip,
uuid: `${id}-${id}-${id}-${id}-${id}`,
altIpAddresses: [],
location: {
network: id,
Expand Down Expand Up @@ -37,6 +38,7 @@ describe('measurement store', () => {
hScan: sandbox.stub(),
hDel: sandbox.stub(),
hSet: sandbox.stub(),
hExpire: sandbox.stub(),
set: sandbox.stub(),
expire: sandbox.stub(),
del: sandbox.stub(),
Expand Down Expand Up @@ -149,10 +151,11 @@ describe('measurement store', () => {
[ getProbe('z', '1.1.1.1'), getProbe('10', '2.2.2.2'), getProbe('x', '3.3.3.3'), getProbe('0', '4.4.4.4') ],
);

expect(redisMock.hSet.callCount).to.equal(1);
expect(redisMock.hSet.callCount).to.equal(2);

expect(redisMock.hSet.args[0]).to.deep.equal([ 'gp:in-progress', 'measurementid', now ]);
expect(redisMock.set.callCount).to.equal(1);
expect(redisMock.set.args[0]).to.deep.equal([ 'gp:m:{measurementid}:probes_awaiting', 4, { EX: 35 }]);
expect(redisMock.set.args[0]).to.deep.equal([ 'gp:m:{measurementid}:probes_awaiting', 4, { EX: 60 }]);
expect(redisMock.json.set.callCount).to.equal(2);

expect(redisMock.json.set.args[0]).to.deep.equal([ 'gp:m:{measurementid}:results', '$', {
Expand Down Expand Up @@ -235,6 +238,26 @@ describe('measurement store', () => {
expect(redisMock.json.set.args[1]).to.deep.equal([ 'gp:m:{measurementid}:ips', '$', [ '1.1.1.1', '2.2.2.2', '3.3.3.3', '4.4.4.4' ] ]);

expect(redisMock.expire.args[1]).to.deep.equal([ 'gp:m:{measurementid}:ips', 604800 ]);

expect(redisMock.hSet.args[1]).to.deep.equal([ 'gp:test-to-probe', {
measurementid_0: 'z-z-z-z-z',
measurementid_1: '10-10-10-10-10',
measurementid_2: 'x-x-x-x-x',
measurementid_3: '0-0-0-0-0',
}]);

expect(redisMock.hExpire.callCount).to.equal(1);

expect(redisMock.hExpire.args[0]).to.deep.equal([
'gp:test-to-probe',
[
'measurementid_0',
'measurementid_1',
'measurementid_2',
'measurementid_3',
],
60,
]);
});

it('should initialize measurement object with the proper default values', async () => {
Expand Down Expand Up @@ -400,7 +423,7 @@ describe('measurement store', () => {
},
]);

expect(redisMock.set.args[0]).to.deep.equal([ 'gp:m:{measurementid}:probes_awaiting', 0, { EX: 35 }]);
expect(redisMock.set.args[0]).to.deep.equal([ 'gp:m:{measurementid}:probes_awaiting', 0, { EX: 60 }]);
});

it('should store non-default fields of the measurement request', async () => {
Expand Down
43 changes: 43 additions & 0 deletions test/tests/unit/probe-validator.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { expect } from 'chai';
import * as sinon from 'sinon';
import { ProbeValidator } from '../../../src/lib/probe-validator.js';
import { RedisCluster } from '../../../src/lib/redis/shared.js';

describe('ProbeValidator', () => {
const sandbox = sinon.createSandbox();
const redis = { hGet: sandbox.stub() };
const probeValidator = new ProbeValidator(redis as unknown as RedisCluster);

beforeEach(() => {
redis.hGet.resolves(undefined);
});

it('should pass through valid probe id', async () => {
probeValidator.addValidIds('measurement-id', 'test-id-0', 'probe-uuid-0');
probeValidator.addValidIds('measurement-id', 'test-id-1', 'probe-uuid-1');
await probeValidator.validateProbe('measurement-id', 'test-id-0', 'probe-uuid-0');
await probeValidator.validateProbe('measurement-id', 'test-id-1', 'probe-uuid-1');
});

it('should throw for invalid probe id', async () => {
probeValidator.addValidIds('measurement-id', 'test-id', 'probe-uuid');
const error = await probeValidator.validateProbe('measurement-id', 'test-id', 'invalid-probe-uuid').catch(err => err);
expect(error.message).to.equal('Probe ID is wrong for measurement ID: measurement-id, test ID: test-id. Expected: probe-uuid, actual: invalid-probe-uuid');
});

it('should throw for missing key', async () => {
const error = await probeValidator.validateProbe('missing-measurement-id', 'test-id', 'probe-uuid').catch(err => err);
expect(error.message).to.equal('Probe ID not found for measurement ID: missing-measurement-id, test ID: test-id');
});

it('should search key in redis if not found locally', async () => {
redis.hGet.resolves('probe-uuid');
await probeValidator.validateProbe('only-redis-measurement-id', 'test-id', 'probe-uuid');
});

it('should throw if redis probe id is different', async () => {
redis.hGet.resolves('different-probe-uuid');
const error = await probeValidator.validateProbe('only-redis-measurement-id', 'test-id', 'probe-uuid').catch(err => err);
expect(error.message).to.equal('Probe ID is wrong for measurement ID: only-redis-measurement-id, test ID: test-id. Expected: different-probe-uuid, actual: probe-uuid');
});
});

0 comments on commit bee1440

Please sign in to comment.