Skip to content

Commit 510cc29

Browse files
authored
Implemented data collection of scraped data. (#481)
* Added listen on all interfaces to bolt configuration for neo4j in the analysis ms * Allowed more auth failure attempts * Allowed more auth failure attempts * Allowed more auth failure attempts * Added default listen address and advertised address * Disabling auth so that password can be reset * Disabling auth so that password can be reset * Adding auth back in to trigger password reset. * Testing usage of neo4j * Testing usage of neo4j * changed tweet fetch interval to each minute, implemented deletion of cronjobs by default. * Setting collection pod count to zero to trigger cronjob destruction * Reinstated 3 collection ms replicas * Updated lerna-lite version * Updated lerna-lite version * Added environment variables to analysis ms template, updated values.yaml files and updated circle config. * Added environment variables to analysis ms template, updated values.yaml files and updated circle config. * Added environment variables to analysis ms template, updated values.yaml files and updated circle config. * Added environment variables to analysis ms template, updated values.yaml files and updated circle config. * Added quotes around config values * Updated values to override * Updated values to override * Updated values to override * Removed quotes from config fields * Testing access to complex value * Testing access to complex value * Testing access to complex value * Testing access to complex value * Testing access to complex value * Testing access to complex value * Testing access to complex value * Testing access to complex value * Testing access to complex value * Updated apollo-datasource-neo4j version * Started using neo4j ssc * Started using neo4j defaults * Added map reduce to compute sentiment. * Added map reduce to compute sentiment. * Updated tweet addition commands to merge with nodes already present. Testing to see if unique trees are formed in db. * Updated memory and cpu usage for neo4j * Updated query * Added more memory to neo4j * Reverted memory configuration as a result of do limits * Removed recreate pods. * bumped version of apollo-datasource-neo4j * Bumped version of apollo-datasource-neo4j * Updated querries to include named relationships * Updated logging statement in analysis service * Updated moment to use milliseconds, not seconds * Added memory to resources of analysis neo4j * Added some additional jvm arguments. * Updated heap size for analysis neo4j * Set heap sizes manually for analysis neo4j * Made heap memory 3gb on analysis neo4j * Testing something * Reduced memory size to that in documentation. Upgraded size of volume connected for data storage. * Changed cron job schedule to every 6 hours * Updated timeline insertion query * Modified neo4j queries * Began addition of expected data model to front-end. * Continued addition of expected data model to front-end. * Continued implementing handling of new data model. * Enabled http through defaults. * Added TODO to data collector * Modified front end to use specific lib in moment, modified analysis ms to add sentiment to neo4j * Modified front end to use specific lib in moment, modified analysis ms to add sentiment to neo4j * Updated stuff * Updated subscription resolvers * Updated analysis service to use sentiment score, not object * Updated analysis service to use sentiment comparative score, not object * Updated sentiment check * Updated tweet check in neo4j data source * Added logging statement to sentiment addition in analysis service * Made start data collection async so execution of commands could be awaited. Updated necessary functions in collection service. * Bumped version of KafkaJS used. * Testing whether waiting causes correct reception of kafka messages. * Testing whether waiting causes correct reception of kafka messages. * trying something * Added logging statements to inspect issue with addition of sentiment node * Added deployment strategy to test result. * Testing use of maxunavailable and max surge for rolling updates. * Testing use of maxunavailable and max surge for rolling updates. * Added minReadySeconds to deployment for testing * More deployment tests * Moved processing of economic entity memo in collection ms to immediate function * Modified data collector to see if that's causing the issue * Modified data collector to see if that's causing the issue * Updated bitnami * Eliminated fromBeginning for testing * testing * Started using addSentiments instead of addSentiment. Made addSentiment private due to requirement of creating dates, economicEntity first. * Modified collection ms to only emit most recent tweet recording, modified edge and coll + analysis ms to use utcDateTime string instead of unix timestamp, modified neo4j operations to use supplied datetime(...) object. Standardizing on UTC date time strings. * Instead of storing string in mongodb using date object. * Adding recommended new Date(...). Seems strange but testing the result. * Added logging statement to collection ms for failure to create tweets in datastore. * Added logging statement to collection ms for failure to create tweets in datastore. * Modified reduction operation to use utcDateTime in collection ms * Removed tweet extraction from analysis ms compute sentiment. * Used tweet text instead of tweet object in analysis service * Used text in sentiment calculation instead of tweet object in analysis service * Using 128 Gi instead of 64 after hanging of neo4j UI * Got rid of 'not' sign in Number.isNAN(...) for neo4j store in analysis ms. * Added logging statement to read function * Mapped neo4j results to sentiment object. * Updated front-end and back-end. testing results. * Added fetch sentiments in neo4j store, mapped recent and arbitrary sentiment results to that expected by the client * Switched Number out with Float in analysis ms graphql schema * Updated storage to be 128 gigs for analysis neo4j * Added utcDateTime to rows returned from readSentiment query for compatability with reducer, eliminated seamingly unecessary database operations after modification of apollo-datasource-neo4j * Modified readSentiment query to include utcDateTime instead of dateTime. * Modified readMostRecentSentiment query in neo4j store of analysis ms * Changed neo4j store query executed based on neo4j docs * Added logging statement for trouble shooting * Made modifications based on query results. * Made modifications on query. * Updated neo4j store query in analysis ms * Updated neo4j store query in analysis ms * Enabled apoc procedures. * Modified query * Modified query * Modified query * Modified graphql scalar date to accept strings instead of ints * Testing analysis ms schema using double array in graphql * Renamed resolved in analysis m ms to match schema * Modified query * Modified query * Removed null default from neo4j store reduction of database data in analysis ms. * Ordered sentiment metch ascending, modified front-end sentiment graph and summary to operate with new data. Still have to do subscription. * Ordered sentiment metch ascending, modified front-end sentiment graph and summary to operate with new data. Still have to do subscription. * Modified get sentiment query for neo4j store of analysis ms * Modified get sentiment query for neo4j store of analysis ms * Made some modifications to front-end, eliminated some TODOs * Added formatting to start date for start and end date date pickers, fixed issue where null wasn't correctly processed. * Modified collection cron to occur every minute. * Got rid of extend in schema def for subscription ms, also added filter based on endDate being null * Commented check on null date for testing purposes. * Updated filter function of subscription ms, updated analysis ms to send economicEntity object instead of name and type separately. * Modified subscription to return payload data and not entire payload. * Added logging statement to trouble-shoot subscription ms. * Modified neo4j store query to return standard UTC format, set most recent data in analysis service of analysis ms to return data fromand not array * Added logging statement to analysis ms for troubleshooting purposes. * Eliminated overwhelming logging statement in analysis ms * Added parenthesis around recent sentiment fetch in analysis ms * Testing neo4j aggregate function collect(...) * Testing neo4j aggregate function collect(...) * Testing neo4j aggregate function collect(...) * Testing neo4j aggregate function collect(...) * Testing neo4j aggregate function collect(...) * Testing neo4j aggregate function collect(...) * Testing neo4j aggregate function collect(...) * Testing neo4j aggregate function collect(...) * Modified neo4j query to collect tweets and average sentiment causing group by utcDateTime, updated analysis ms * Updated queries. * Added filter to sentiment subscription to require a null end-date. * Changed cronjob tweet fetch interval in collection ms back to every 6 hours. * Eliminated some sub-todos in summary page code. Moved markup into functions to make render function more readable. * Added time to front-end values sent back so that start and end date would be included. * Added limit functionality to fetch sentiments * Backed out limit changes * Centralizing types for consistency and ease of maintenance. * Added factory method for economic entity type, tied type into graphql schema definitions for change broadcasting, updated various components. * Added factory method for economic entity type, tied type into graphql schema definitions for change broadcasting, updated various components. * Added factory method for economic entity type, tied type into graphql schema definitions for change broadcasting, updated various components. * Updated various functions to use economic entity object for shared definitions. * Testing copy of type directory into docker image. * Testing copy of type directory into docker image. * Testing copy of type directory into docker image. * Added creation of symbolic link to keep docker build context constraint during docker build. * Added creation of symbolic link to keep docker build context constraint during docker build. * Added creation of symbolic link to keep docker build context constraint during docker build. * Attempting to run npm install in project and copy into image. * Attempting to run npm install in project and copy into image. * Attempting to run npm install in project and copy into image. * Updated remaining circle ci config builds, updated remaining dockerfiles. * Updated remaining circle ci config builds, updated remaining dockerfiles. * Modified circle config ms build to only install one file, modified dockerfiles to copy the one file to the necessary directory. * Testing use of relative path in file add * Updated lock file, reverted circle ms build to use yarn install instead of add. * Modified economic entity type validity check. * Updated cert ids * Changed neo4j block storage allocation for analysis ms to 64Gi * Added toggling of dependency installs * Made modifications due to readonly objects * Updated some of the collection ms to operate on economic entity type. * Modified configuration ms * Updated analysis ms end date check to allow for nullability. * Returned valid case for end date instead of when it's invalid. * Updated tweet sentiment computed event to be sentiment computed, eliminated dead code, modified analysis ms to use economic entity objects. * Updated tweet sentiment computed event to be sentiment computed, eliminated dead code, modified analysis ms to use economic entity objects. * Updated much of the collection ms to use thinkdeep types * Updated remaining portion of collection ms to use thinkdeep type * added logging statement to collection ms economic entity memo for testing purposes. * Added filter to read all to see if error subsides. * Modified collection ms to return null if no results found in _readMemo * Added await to retrieve array * Separated await on toArray from filter method * Updated validity check in filter function to be correct. * Updated config ms to use thinkdeep type * Updated subscription ms to use thinkdeep type * Modified type to be compatible with front-end as well. * Created util package to separate utilities such as validity checking, renamed thinkdeep type to model * Updated necessary dockerfiles to include copy of util package. * Modified front-end graphql files to be js to incorporate thinkdeep model, added tests to data collector. * Implemented a couple tests. * Implemented a couple tests. * Added some tests around economic analyzer * Added more tests. * Added more tests. * Added more tests * Updated validEconomicEntity in type * Fixed broken fe tests. * Made some changes * Made some changes * Updated dependencies. * Added tests around _computeSentiment of analysis ms * Stubbed out analysis ms neo4j store tests * Added tests around analysis ms * Reintroduced collection ms tests and fixed them. * Added/modified more tests. Updated model dependencies to include those related to testing. * Added more tests * Updated tests to be recursive, fixed broken tests in config and coll ms, fixed some code smells * Updated tests to be recursive, fixed broken tests in analysis ms * Modified circleci configs to include caching of node modules download for packages, updated environment variables. * Updated neo4j helm chart version in analysis ms * Updated e2e tests * updated neo4j cert id * Used correct moment import value for front-end * Updated test finish timeout for web test runner * Eliminated some code smells. * Eliminated some code smells. * Updated cron jobs to run minute to minute for subscription varification. * Modified the subscription ms * Modified subscription ms with logging statements to test failure of subscriptions on fe * Modified subscription ms to create economic entity from payload, return payload. * Modified subscription ms to create economic entity from payload, return payload. * Backed out cron every minute. * Added default advertised address to prod * Added economic sector, sector type and factory to the model. Started using TDD for data-collector modifications. * Made progress with site scraping. Added check for robots.txt to ensure sites allow it. * Wrote tests around search engine interaction and robots.txt check. * Made some naming changes * Finished simple scraping of public allowed web pages. Added web site object to model. * Switched to scraping branch * Merging master into branch * Extended timeout expectation * Updated to newest neo4j helm chart for analysis ms * Updated resource constrains * Added changes to economic analyzer tests * Updated prod config to include neo4j
1 parent dca2077 commit 510cc29

File tree

57 files changed

+4698
-1758
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+4698
-1758
lines changed

.circleci/config.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -417,9 +417,9 @@ jobs:
417417
namespace: "production"
418418
release-name: "v1"
419419
atomic: true
420-
values-to-override: 'gateway.service.certificateId=$PROD_MICROSERVICE_GATEWAY_CERTIFICATE_ID,subscription.service.certificateId=$PROD_MICROSERVICE_SUBSCRIPTION_CERTIFICATE_ID,auth.audience=$PREDECOS_AUTH_AUDIENCE,auth.jwksUri=$PREDECOS_AUTH_JWKS_URI,auth.issuer=$PREDECOS_AUTH_ISSUER,analysis.neo4j.neo4j.username=$PROD_MICROSERVICE_ANALYSIS_NEO4J_USERNAME,analysis.neo4j.neo4j.password=$PROD_MICROSERVICE_ANALYSIS_NEO4J_PASSWORD,analysis.neo4j.config.dbms\.default_database=$PROD_MICROSERVICE_ANALYSIS_NEO4J_DATABASE,analysis.neo4j.config.dbms\.default_advertised_address=$PROD_MICROSERVICE_ANALYSIS_NEO4J_ADVERTISED_ADDRESS,collection.replicas=3,collection.twitter.bearer=$PROD_PREDECOS_TWITTER_BEARER,collection.data.collector.image=thinkdeeptech/data-collector:latest,replicas=1,global.docker.secretName=docker-secret,global.nodeEnv=production'
420+
values-to-override: "gateway.service.certificateId=$PROD_MICROSERVICE_GATEWAY_CERTIFICATE_ID,subscription.service.certificateId=$PROD_MICROSERVICE_SUBSCRIPTION_CERTIFICATE_ID,auth.audience=$PREDECOS_AUTH_AUDIENCE,auth.jwksUri=$PREDECOS_AUTH_JWKS_URI,auth.issuer=$PREDECOS_AUTH_ISSUER,analysis.neo4j.neo4j.username=$PROD_MICROSERVICE_ANALYSIS_NEO4J_USERNAME,analysis.neo4j.neo4j.password=$PROD_MICROSERVICE_ANALYSIS_NEO4J_PASSWORD,analysis.neo4j.services.neo4j.enabled=true,analysis.neo4j.services.neo4j.annotations.service\\.beta\\.kubernetes\\.io/do-loadbalancer-certificate-id=$PROD_ANALYSIS_NEO4J_LOADBALANCER_CERTIFICATE_ID,analysis.neo4j.config.dbms\\.default_database=$PROD_MICROSERVICE_ANALYSIS_NEO4J_DATABASE,analysis.neo4j.config.dbms\\.default_advertised_address=$PROD_MICROSERVICE_ANALYSIS_NEO4J_ADVERTISED_ADDRESS,collection.replicas=3,collection.twitter.bearer=$PROD_PREDECOS_TWITTER_BEARER,collection.data.collector.image=thinkdeeptech/data-collector:latest,replicas=1,global.docker.secretName=docker-secret,global.nodeEnv=production"
421421
update-repositories: true
422-
timeout: "480s"
422+
timeout: "960s"
423423
wait: true
424424

425425

@@ -465,9 +465,9 @@ jobs:
465465
namespace: "development"
466466
release-name: "v1"
467467
atomic: true
468-
values-to-override: 'gateway.service.certificateId=$DEV_MICROSERVICE_GATEWAY_CERTIFICATE_ID,subscription.service.certificateId=$DEV_MICROSERVICE_SUBSCRIPTION_CERTIFICATE_ID,auth.audience=$PREDECOS_TEST_AUTH_AUDIENCE,auth.jwksUri=$PREDECOS_TEST_AUTH_JWKS_URI,auth.issuer=$PREDECOS_TEST_AUTH_ISSUER,collection.twitter.bearer=$DEV_PREDECOS_TWITTER_BEARER,replicas=1,global.docker.secretName=docker-secret,analysis.container.image=thinkdeeptech/deep-microservice-analysis-dev:latest,analysis.neo4j.neo4j.username=$DEV_MICROSERVICE_ANALYSIS_NEO4J_USERNAME,analysis.neo4j.neo4j.password=$DEV_MICROSERVICE_ANALYSIS_NEO4J_PASSWORD,analysis.neo4j.services.neo4j.enabled=true,analysis.neo4j.config.dbms\.default_database=$DEV_MICROSERVICE_ANALYSIS_NEO4J_DATABASE,analysis.neo4j.config.dbms\.default_advertised_address=$DEV_MICROSERVICE_ANALYSIS_NEO4J_ADVERTISED_ADDRESS,collection.replicas=3,collection.container.image=thinkdeeptech/deep-microservice-collection-dev:latest,collection.data.collector.image=thinkdeeptech/data-collector-dev:latest,configuration.container.image=thinkdeeptech/deep-microservice-configuration-dev:latest,gateway.container.image=thinkdeeptech/deep-microservice-gateway-dev:latest,subscription.container.image=thinkdeeptech/deep-microservice-subscription-dev:latest,global.nodeEnv=development'
468+
values-to-override: "gateway.service.certificateId=$DEV_MICROSERVICE_GATEWAY_CERTIFICATE_ID,subscription.service.certificateId=$DEV_MICROSERVICE_SUBSCRIPTION_CERTIFICATE_ID,auth.audience=$PREDECOS_TEST_AUTH_AUDIENCE,auth.jwksUri=$PREDECOS_TEST_AUTH_JWKS_URI,auth.issuer=$PREDECOS_TEST_AUTH_ISSUER,collection.twitter.bearer=$DEV_PREDECOS_TWITTER_BEARER,replicas=1,global.docker.secretName=docker-secret,analysis.container.image=thinkdeeptech/deep-microservice-analysis-dev:latest,analysis.neo4j.neo4j.username=$DEV_MICROSERVICE_ANALYSIS_NEO4J_USERNAME,analysis.neo4j.neo4j.password=$DEV_MICROSERVICE_ANALYSIS_NEO4J_PASSWORD,analysis.neo4j.services.neo4j.enabled=true,analysis.neo4j.services.neo4j.annotations.service\\.beta\\.kubernetes\\.io/do-loadbalancer-certificate-id=$DEV_ANALYSIS_NEO4J_LOADBALANCER_CERTIFICATE_ID,analysis.neo4j.config.dbms\\.default_database=$DEV_MICROSERVICE_ANALYSIS_NEO4J_DATABASE,analysis.neo4j.config.dbms\\.default_advertised_address=$DEV_MICROSERVICE_ANALYSIS_NEO4J_ADVERTISED_ADDRESS,collection.replicas=3,collection.container.image=thinkdeeptech/deep-microservice-collection-dev:latest,collection.data.collector.image=thinkdeeptech/data-collector-dev:latest,configuration.container.image=thinkdeeptech/deep-microservice-configuration-dev:latest,gateway.container.image=thinkdeeptech/deep-microservice-gateway-dev:latest,subscription.container.image=thinkdeeptech/deep-microservice-subscription-dev:latest,global.nodeEnv=development"
469469
update-repositories: true
470-
timeout: "480s"
470+
timeout: "960s"
471471
wait: true
472472

473473
deploy_ui_and_publish_coverage:

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"setup": "yarn run clean && yarn install",
66
"lint": "eslint --fix 'packages/**/*.js'",
77
"prettier": "prettier --write 'packages/**/*.js'",
8-
"clean": "lerna exec --parallel -- rm -rf ./node_modules && rm -rf ./node_modules && rm yarn.lock",
8+
"clean": "lerna exec --parallel -- rm -rf ./node_modules && rm -rf ./node_modules && rm -f yarn.lock",
99
"build": "lerna run build --parallel",
1010
"tests": "yarn run tests:unit && yarn run tests:integration && yarn run tests:e2e",
1111
"tests:unit": "lerna run tests:unit",

packages/data-collector/package.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
"devDependencies": {
1212
"c8": "^7.11.3",
1313
"chai": "^4.3.4",
14+
"chai-as-promised": "^7.1.1",
1415
"concat-stream": "^2.0.0",
1516
"mocha": "^9.1.3",
1617
"sinon-chai": "^3.7.0"
@@ -19,10 +20,13 @@
1920
"@thinkdeep/attach-exit-handler": "^1.0.0",
2021
"@thinkdeep/model": "file:./../model",
2122
"@thinkdeep/util": "file:./../util",
23+
"axios": "^0.27.2",
2224
"commander": "^9.0.0",
25+
"duck-duck-scrape": "^2.2.1",
2326
"kafkajs": "^2.1.0",
2427
"log4js": "^6.3.0",
2528
"moment": "^2.29.4",
29+
"robots-parser": "^3.0.0",
2630
"sinon": "^12.0.1",
2731
"twitter-api-v2": "^1.11.0"
2832
},

packages/data-collector/src/data-collector.js

Lines changed: 89 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
import {EconomicEntityFactory, EconomicEntityType} from '@thinkdeep/model';
1+
import {EconomicEntityFactory, CollectionOperationType} from '@thinkdeep/model';
22
import {validString} from '@thinkdeep/util';
33
import {Client} from './client.js';
44
import {Command, Option} from 'commander';
5+
import {DataScraper} from './data-scraper.js';
56
import {Kafka} from 'kafkajs';
67
import log4js from 'log4js';
78
import moment from 'moment';
@@ -20,121 +21,103 @@ try {
2021

2122
program.addOption(
2223
new Option(
23-
'-n, --entity-name <entity name>',
24-
'Specify the name of the economic entity for which the operation will be performed.'
24+
'-e, --economic-entity <economic entity>',
25+
`Specify the economic entity (i.e, '{ "name": "Google", "type": "BUSINESS"}').`
2526
)
2627
);
2728

2829
program.addOption(
2930
new Option(
30-
'-t, --entity-type <entity type>',
31-
'Specify the type of the economic entity for which the operation will be performed.'
32-
).choices(['BUSINESS'])
31+
'-o, --operation-type <operation type>',
32+
'Specify the type of data collection operation you would like to execute.'
33+
).choices(CollectionOperationType.types)
3334
);
3435

3536
program.addOption(
3637
new Option(
37-
'-o, --operation-type <operation type>',
38-
'Specify the type of data collection operation you would like to execute.'
39-
).choices(['fetch-tweets'])
38+
'-l, --limit [limit]',
39+
'Specify the limit associated with the operation.'
40+
).default(10, 'Defaults to 10.')
4041
);
4142

4243
program.addOption(
4344
new Option(
44-
'-m, --num-tweets [num tweets]',
45-
'Specify the number of tweets to be fetched at once.'
46-
).default(10, 'The default number to fetch.')
45+
'-m, --mock-data <mock data>',
46+
'Trigger mocking of the cli.'
47+
).default({}, 'An empty object')
4748
);
4849

49-
program.addOption(new Option('--mock-run', 'Trigger mocking of the cli.'));
50-
5150
program.parse(process.argv);
5251

5352
const options = program.opts();
5453

54+
const economicEntity = EconomicEntityFactory.get(
55+
JSON.parse(options.economicEntity)
56+
);
57+
5558
if (!validString(options.operationType))
5659
throw new Error('Operation type is required');
5760

58-
if (!validString(options.entityName))
59-
throw new Error(`Entity name is required`);
60-
61-
if (!validString(options.entityType))
62-
throw new Error(`Entity type is required`);
63-
64-
if (!EconomicEntityType.valid(options.entityType))
65-
throw new Error(`Entity type ${options.entityType} is invalid.`);
61+
const currentUtcDateTime = moment().utc().format();
6662

67-
const economicEntity = EconomicEntityFactory.economicEntity({
68-
name: options.entityName,
69-
type: options.entityType,
70-
});
63+
let kafkaClient;
64+
let twitterClient;
65+
if (!options.mockData || Object.keys(options.mockData).length <= 0) {
66+
logger.info(`Creating kafka client.`);
67+
kafkaClient = new Kafka({
68+
clientId: 'collect-data',
69+
brokers: [
70+
`${process.env.PREDECOS_KAFKA_HOST}:${process.env.PREDECOS_KAFKA_PORT}`,
71+
],
72+
});
73+
74+
twitterClient = new TwitterApi(process.env.PREDECOS_TWITTER_BEARER)
75+
.readOnly;
76+
} else {
77+
logger.info(`Creating mock kafka client.`);
78+
kafkaClient = {
79+
admin: sinon.stub().returns({
80+
connect: sinon.stub(),
81+
createTopics: sinon.stub(),
82+
disconnect: sinon.stub(),
83+
}),
84+
producer: sinon.stub().returns({
85+
connect: sinon.stub(),
86+
send: sinon.stub(),
87+
disconnect: sinon.stub(),
88+
}),
89+
};
90+
91+
twitterClient = {
92+
v2: {
93+
get: sinon.stub().returns({
94+
data: JSON.parse(options.mockData),
95+
}),
96+
},
97+
};
98+
}
7199

72-
const currentUtcDateTime = moment().utc().format();
100+
const collectDataClient = new Client(twitterClient, kafkaClient, logger);
73101

74102
switch (options.operationType) {
75-
case 'fetch-tweets': {
103+
case CollectionOperationType.FetchTweets: {
76104
logger.info('Fetching tweets.');
77-
let twitterClient;
78-
let kafkaClient;
79-
if (!options.mockRun) {
80-
twitterClient = new TwitterApi(process.env.PREDECOS_TWITTER_BEARER)
81-
.readOnly;
82-
83-
kafkaClient = new Kafka({
84-
clientId: 'collect-data',
85-
brokers: [
86-
`${process.env.PREDECOS_KAFKA_HOST}:${process.env.PREDECOS_KAFKA_PORT}`,
87-
],
88-
});
89-
} else {
90-
twitterClient = {
91-
v2: {
92-
get: sinon.stub().returns({
93-
data: [
94-
{
95-
text: 'tweet 1',
96-
},
97-
{
98-
text: 'tweet 2',
99-
},
100-
{
101-
text: 'tweet 3',
102-
},
103-
],
104-
}),
105-
},
106-
};
107-
kafkaClient = {
108-
admin: sinon.stub().returns({
109-
connect: sinon.stub(),
110-
createTopics: sinon.stub(),
111-
disconnect: sinon.stub(),
112-
}),
113-
producer: sinon.stub().returns({
114-
connect: sinon.stub(),
115-
send: sinon.stub(),
116-
disconnect: sinon.stub(),
117-
}),
118-
};
119-
}
120-
121-
const collectDataClient = new Client(twitterClient, kafkaClient, logger);
122105

123106
(async () => {
124107
logger.info('Connecting to data collection client.');
125108
await collectDataClient.connect();
126109

127110
const recentTweets = await collectDataClient.fetchRecentTweets({
128111
query: `${options.entityName} lang:en -is:retweet`,
129-
max_results: options.numTweets,
112+
max_results: options.limit,
130113
});
131114
logger.debug(
132115
`Retrieved the following tweets: ${JSON.stringify(recentTweets)}`
133116
);
134117

135118
const data = {
136119
utcDateTime: currentUtcDateTime,
137-
economicEntity: economicEntity.toObject(),
120+
economicEntity,
138121
tweets: recentTweets,
139122
};
140123

@@ -145,6 +128,37 @@ try {
145128

146129
break;
147130
}
131+
case CollectionOperationType.ScrapeData: {
132+
(async () => {
133+
logger.info('Connecting to data collection client.');
134+
await collectDataClient.connect();
135+
136+
logger.info(
137+
`Scraping data for ${economicEntity.type} ${economicEntity.name}.`
138+
);
139+
const scraper =
140+
!options.mockData || Object.keys(options.mockData).length <= 0
141+
? new DataScraper(logger)
142+
: sinon.createStubInstance(DataScraper);
143+
144+
const scrapedData = await scraper.scrapeData(economicEntity);
145+
146+
const data = {
147+
utcDateTime: currentUtcDateTime,
148+
economicEntity,
149+
data: scrapedData,
150+
};
151+
152+
await collectDataClient.emitEvent('DATA_SCRAPED', data);
153+
})();
154+
155+
break;
156+
}
157+
default: {
158+
throw new Error(
159+
`The specified operation ${options.operationType} isn't yet supported.`
160+
);
161+
}
148162
}
149163
} catch (e) {
150164
logger.error(e.message.toString());

0 commit comments

Comments
 (0)