Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion config/custom-environment-variables.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@
"groupId": "KAFKA_CONSUMER_GROUP_ID"
},
"kafkaTopics": {
"input": "KAFKA_INPUT_TOPIC"
"input": {
"__name": "KAFKA_INPUT_TOPIC",
"__format": "json"
}
},
"elastic": {
"node": "ELASTIC_URL",
Expand Down
2 changes: 1 addition & 1 deletion config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
"groupId": "123"
},
"kafkaTopics": {
"input": "topic"
"input": ["topic"]
},
"elastic": {
"node": "http://elasticsearch:9200",
Expand Down
9 changes: 8 additions & 1 deletion config/test.json
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
{}
{
"kafkaConsumer": {
"groupId": "123-test"
},
"kafkaTopics": {
"input": ["topic1-test", "topic2-test"]
}
}
5 changes: 3 additions & 2 deletions openapi3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ paths:
requestId:
type: string
chosenResultId:
type: number
oneOf:
- type: number
nullable: true
responseTime:
type: string
format: date-time
Expand Down Expand Up @@ -99,7 +101,6 @@ components:
geocodingResponse:
type: object
required:
- userId
- apiKey
- site
- response
Expand Down
14 changes: 7 additions & 7 deletions src/common/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export interface IConfig {
}

export interface EnrichResponse {
user: {
user?: {
[key: string]: string | UserDataServiceResponse;
name: string;
};
Expand All @@ -16,13 +16,13 @@ export interface EnrichResponse {
language: string;
};
result: {
rank: number;
score: number;
rank: number | null;
score?: number;
source?: string;
layer?: string;
name: string;
name?: string;
location?: Feature<Point>;
region: string;
region?: string;
};
system: string;
site: string;
Expand All @@ -32,13 +32,13 @@ export interface EnrichResponse {

export interface FeedbackResponse {
requestId: string;
chosenResultId: number;
chosenResultId: number | null;
responseTime: Date; // from FeedbackApi
geocodingResponse: GeocodingResponse;
}

export interface GeocodingResponse {
userId: string;
userId?: string;
apiKey: string;
site: string;
response: QueryResult;
Expand Down
52 changes: 30 additions & 22 deletions src/process/models/processManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,16 @@ export class ProcessManager {
) {}

public async process(feedbackResponse: FeedbackResponse): Promise<EnrichResponse> {
let score = 0;

const selectedResponse = feedbackResponse.geocodingResponse.response.features[feedbackResponse.chosenResultId];
const token = JSON.parse(Buffer.from(feedbackResponse.geocodingResponse.apiKey.split('.')[1], 'base64').toString()) as { sub: string };
const text = this.getQueryText(feedbackResponse);

if (selectedResponse.properties._score) {
score = selectedResponse.properties._score;
}

const { endpoint, queryParams, headers } = this.appConfig.userDataService;
const fetchedUserData = await fetchUserDataService(endpoint, feedbackResponse.geocodingResponse.userId, queryParams, headers);

return {
user: {
name: feedbackResponse.geocodingResponse.userId,
...fetchedUserData,
},
const enrichedResponse: EnrichResponse = {
query: {
language: arabicRegex.test(text) ? 'ar' : 'he',
text,
},
result: {
rank: feedbackResponse.chosenResultId,
score,
source: selectedResponse.properties.matches[0].source,
layer: selectedResponse.properties.matches[0].layer,
name: selectedResponse.properties.names.default,
region: selectedResponse.properties.regions[0].region,
location: center(selectedResponse),
rank: null,
},
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
system: token?.sub,
Expand All @@ -54,6 +34,10 @@ export class ProcessManager {
duration: new Date(feedbackResponse.responseTime) - new Date(feedbackResponse.geocodingResponse.respondedAt),
timestamp: new Date(),
};
if (feedbackResponse.chosenResultId === null) {
return enrichedResponse;
}
return this.enrichData(feedbackResponse, enrichedResponse);
}

public getQueryText(feedbackResponse: FeedbackResponse): string {
Expand All @@ -66,4 +50,28 @@ export class ProcessManager {

return text;
}

public async enrichData(feedbackResponse: FeedbackResponse, enrichedResponse: EnrichResponse): Promise<EnrichResponse> {
const chosenResult: number = feedbackResponse.chosenResultId as number;
const selectedResponse = feedbackResponse.geocodingResponse.response.features[chosenResult];

const { endpoint, queryParams, headers } = this.appConfig.userDataService;
const fetchedUserData = await fetchUserDataService(endpoint, feedbackResponse.geocodingResponse.userId as string, queryParams, headers);

enrichedResponse.user = {
name: feedbackResponse.geocodingResponse.userId as string,
...fetchedUserData,
};
enrichedResponse.result = {
rank: chosenResult,
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
score: selectedResponse.properties?._score ?? 0,
source: selectedResponse.properties.matches[0].source,
layer: selectedResponse.properties.matches[0].layer,
name: selectedResponse.properties.names.default,
region: selectedResponse.properties.regions[0].region,
location: center(selectedResponse),
};
return enrichedResponse;
}
}
5 changes: 3 additions & 2 deletions src/streamerBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { FeedbackResponse, IConfig, KafkaOptions } from './common/interfaces';
import { ProcessManager } from './process/models/processManager';

interface KafkaTopics {
input: string;
input: string[];
}

const elasticIndex = 'elastic.properties.index';
Expand Down Expand Up @@ -53,7 +53,8 @@ export class StreamerBuilder {
const { input: inputTopic } = this.config.get<KafkaTopics>('kafkaTopics');

await this.consumer.connect();
await this.consumer.subscribe({ topics: inputTopic.split(',') });
await this.consumer.subscribe({ topics: inputTopic });
this.logger.info(`Kafka consumer subscribed successfully to ${inputTopic.toString()}`);

await this.consumer.run({
eachMessage: async ({ message }) => {
Expand Down
115 changes: 111 additions & 4 deletions tests/integration/process/process.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { ProcessRequestSender } from './helpers/requestSender';
import { mockApiKey } from './utils';

let currentKafkaTopics = {
input: 'topic1-test',
input: ['topic1-test'],
};

jest.mock('config', () => {
Expand Down Expand Up @@ -163,7 +163,7 @@ describe('process', function () {
expect(response.status).toBe(httpStatusCodes.OK);

const output = response.body as EnrichResponse;
expect(output.user.name).toBe(userId);
expect(output.user?.name).toBe(userId);
expect(output.query.text).toBe('query-name');
expect(output.query.language).toBe('he');
expect(output.result.rank).toBe(1);
Expand All @@ -187,7 +187,7 @@ describe('process', function () {
it('should return 200 status code and the resource when given multiple kafka topics', async function () {
const userId = 'avi@mapcolonies.net';
currentKafkaTopics = {
input: 'topic1-test,topic2-test',
input: ['topic1-test', 'topic2-test'],
};
const userDataServiceScope = nock(config.get<IApplication>('application').userDataService.endpoint, {
reqheaders: {
Expand Down Expand Up @@ -290,7 +290,7 @@ describe('process', function () {
expect(response.status).toBe(httpStatusCodes.OK);

const output = response.body as EnrichResponse;
expect(output.user.name).toBe(userId);
expect(output.user?.name).toBe(userId);
expect(output.query.text).toBe('tile-name');
expect(output.query.language).toBe('he');
expect(output.result.rank).toBe(0);
Expand All @@ -310,6 +310,113 @@ describe('process', function () {

userDataServiceScope.done();
});

it('should return 200 status code when getting missing data - meaning user did not choose a response', async function () {
currentKafkaTopics = {
input: ['topic1-test', 'topic2-test'],
};
const input: FeedbackResponse = {
requestId: 'req-id',
chosenResultId: null,
responseTime: new Date(10000 + 500),
geocodingResponse: {
apiKey: mockApiKey,
site: 'site-name',
respondedAt: new Date(10000),
response: {
type: 'FeatureCollection',
geocoding: {
version: 'version',
query: {
query: 'query-name',
// eslint-disable-next-line @typescript-eslint/naming-convention
geo_context: 'geo_context',
},
},
features: [
{
type: 'Feature',
geometry: {
coordinates: [28.008903004732502, 19.752611840282086],
type: 'Point',
},
properties: {
type: 'Point',
matches: [
{
layer: 'not-layer-name',
source: 'not-source-name',
// eslint-disable-next-line @typescript-eslint/naming-convention
source_id: ['not-some-source-id'],
},
],
names: {
default: 'not-default-name',
},
regions: [
{
region: 'not-region-name',
// eslint-disable-next-line @typescript-eslint/naming-convention
sub_region_names: [],
},
],
// eslint-disable-next-line @typescript-eslint/naming-convention
_score: 10,
},
},
{
type: 'Feature',
geometry: {
coordinates: [29.008903004732502, 30.752611840282086],
type: 'Point',
},
properties: {
type: 'Point',
matches: [
{
layer: 'layer-name',
source: 'source-name',
// eslint-disable-next-line @typescript-eslint/naming-convention
source_id: ['some-source-id'],
},
],
names: {
default: 'default-name',
},
regions: [
{
region: 'region-name',
// eslint-disable-next-line @typescript-eslint/naming-convention
sub_region_names: [],
},
],
// eslint-disable-next-line @typescript-eslint/naming-convention
_score: 5,
},
},
],
},
},
};
const response = await requestSender.process(input);

expect(response.status).toBe(httpStatusCodes.OK);

const output = response.body as EnrichResponse;
expect(output.user?.name).toBeUndefined();
expect(output.query.text).toBe('query-name');
expect(output.query.language).toBe('he');
expect(output.result.rank).toBeNull();
expect(output.result.score).toBeUndefined();
expect(output.result.source).toBeUndefined();
expect(output.result.layer).toBeUndefined();
expect(output.result.name).toBeUndefined();
expect(output.result.region).toBeUndefined();
expect(output.result.location).toBeUndefined();
expect(output.system).toBe('map-colonies-test');
expect(output.site).toBe('site-name');
expect(output.duration).toBe(500);
});
});
describe('Bad Path', function () {
// All requests with status code of 400
Expand Down
Loading