Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jobserver #5912

Open
wants to merge 87 commits into
base: next
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
55b2416
init jobserveR
kanwarujjaval Jan 14, 2025
e9bb5b8
lint and deps
kanwarujjaval Jan 14, 2025
db5fd7d
Merge branch 'next' of github.com:Countly/countly-server into jobserver
kanwarujjaval Jan 14, 2025
2e2e2f4
deepscan errors
kanwarujjaval Jan 14, 2025
8918bca
test config
kanwarujjaval Jan 14, 2025
3010da0
handle promise/callback/fn function for job run
kanwarujjaval Jan 14, 2025
807434e
Job runner improvements
kanwarujjaval Jan 14, 2025
05d3d4a
redundant log here, job baseclass already does this
kanwarujjaval Jan 14, 2025
3485290
hard fix the pulse version, expecting changes in the development
kanwarujjaval Jan 14, 2025
bdf6252
define node engine in packagejson
kanwarujjaval Jan 14, 2025
d1eb6a5
events dont exist :)
kanwarujjaval Jan 14, 2025
ed21279
remove obsolete
kanwarujjaval Jan 14, 2025
3d4201d
support enable/disable of jobs externally
kanwarujjaval Jan 14, 2025
24125d8
self contained better job scheduling
kanwarujjaval Jan 14, 2025
967fa24
Expanded job baseclass with documented example
kanwarujjaval Jan 14, 2025
7b4dc94
JobServer with oplog watcher to communicate with main countly process
kanwarujjaval Jan 14, 2025
372ee07
progress job
kanwarujjaval Jan 14, 2025
480cbc4
codacy fix
kanwarujjaval Jan 15, 2025
8179924
Merge branch 'next' of github.com:Countly/countly-server into jobserver
kanwarujjaval Jan 15, 2025
ef82822
complex job operations from oplog
kanwarujjaval Jan 15, 2025
cc12690
refactor for code cleanup because why not
kanwarujjaval Jan 15, 2025
7b21626
README update
kanwarujjaval Jan 15, 2025
984016c
fix docs
kanwarujjaval Jan 15, 2025
5acfa2f
remove new logger file :D
kanwarujjaval Jan 15, 2025
baf399f
progress with bookmark data
kanwarujjaval Jan 15, 2025
3c38282
so many tiny docs and logs
kanwarujjaval Jan 15, 2025
372ddd6
codacy fix
kanwarujjaval Jan 15, 2025
a3acc44
add design pattern conventions
kanwarujjaval Jan 16, 2025
e103818
wip mark in readme
kanwarujjaval Jan 16, 2025
4445d5a
Add simple example
kanwarujjaval Jan 16, 2025
97ed753
sane defaults for Job
kanwarujjaval Jan 16, 2025
b6a5421
sane defaults for global
kanwarujjaval Jan 16, 2025
46c77aa
Prevent job scheduler collision
kanwarujjaval Jan 16, 2025
36da81c
Merge branch 'next' of github.com:Countly/countly-server into jobserver
kanwarujjaval Jan 20, 2025
67f2588
migrate crashes jobs
kanwarujjaval Jan 23, 2025
d6eb77f
migrate hooks jobs
kanwarujjaval Jan 23, 2025
4a45e76
migrate logger jobs
kanwarujjaval Jan 23, 2025
b91fe81
migrate reports jobs
kanwarujjaval Jan 23, 2025
84fba28
migrate stats jobs
kanwarujjaval Jan 23, 2025
774822c
migrate views jobs
kanwarujjaval Jan 23, 2025
254a52c
disable api.js jobs
kanwarujjaval Jan 23, 2025
dbb0981
migrate core jobs
kanwarujjaval Jan 23, 2025
b87ae76
prevent double loading of jobs in old and new
kanwarujjaval Jan 23, 2025
b7edf02
fixes for runnow and priority/retry configs
kanwarujjaval Jan 23, 2025
2f33110
scan new job every 30 seconds
kanwarujjaval Jan 23, 2025
70f5b6a
refactoring and backward compatiblity injections in jobserver
kanwarujjaval Jan 23, 2025
b25f4db
basic api for jobs
kanwarujjaval Jan 23, 2025
ceaf8b9
init jobs fe
kanwarujjaval Jan 23, 2025
0edd2a7
refactor job view
kanwarujjaval Jan 23, 2025
3d70ec6
lint fix
kanwarujjaval Jan 25, 2025
880dc46
wip: fixes for frontend
kanwarujjaval Jan 25, 2025
b317dca
simplify server connections
kanwarujjaval Jan 25, 2025
290b4db
remove hooks from multi process and job scheduling
kanwarujjaval Jan 28, 2025
6be5c9b
remove job handler from fetch.js
kanwarujjaval Jan 28, 2025
9d13e0c
remove cluster from cache
kanwarujjaval Jan 28, 2025
6454c86
remove cluster from batcher
kanwarujjaval Jan 28, 2025
4a2d445
remove ipchandler from logger
kanwarujjaval Jan 28, 2025
3f27384
remove job handler from requestprocessor
kanwarujjaval Jan 28, 2025
c0ad9e9
remove cluster from tracker
kanwarujjaval Jan 28, 2025
bb7c836
cleaner import in jobserver
kanwarujjaval Jan 28, 2025
ee53851
much slimmer api.js
kanwarujjaval Jan 28, 2025
078b0c7
Merge branch 'next' of github.com:Countly/countly-server into jobserver
kanwarujjaval Jan 28, 2025
1c9abc0
remove cluster from pluginmanager
kanwarujjaval Jan 28, 2025
5e54286
missing dependency for jobs ui
kanwarujjaval Jan 29, 2025
7460d6e
resolve conflicts
kanwarujjaval Jan 29, 2025
79886ad
remove push from default json for temporarily
kanwarujjaval Jan 29, 2025
9015238
disable old jobs tests
kanwarujjaval Jan 29, 2025
2a882ac
update alerts to new job server
kanwarujjaval Jan 30, 2025
4b83b75
clean
kanwarujjaval Jan 30, 2025
a66d851
add dev deps
kanwarujjaval Jan 31, 2025
8f38f9c
fix alertProcessor job schedule
kanwarujjaval Jan 31, 2025
9038da4
remove tests for old jobs
kanwarujjaval Jan 31, 2025
a291aae
remove cluster from system-utility
kanwarujjaval Jan 31, 2025
a3ef55d
small updates system-utility
kanwarujjaval Jan 31, 2025
6910c61
update Alerts to check hourly,daily,monthly queries more robustly
kanwarujjaval Feb 1, 2025
0db8ab6
allow default disabled state in jobs and Manual only jobs
kanwarujjaval Feb 1, 2025
32bdb3b
migrate AppExpire but disabled by default
kanwarujjaval Feb 1, 2025
65b35ba
remove clear job job
kanwarujjaval Feb 1, 2025
774273f
remove clues to the old jobs
kanwarujjaval Feb 1, 2025
d08297c
literally 1 jsdoc added
kanwarujjaval Feb 1, 2025
771d3cf
some cleaning up
kanwarujjaval Feb 2, 2025
a6d2134
fixing cross countly comm
kanwarujjaval Feb 2, 2025
908609c
remove obsolete migration jobs
kanwarujjaval Feb 2, 2025
2e2b37b
fix per job logger
kanwarujjaval Feb 2, 2025
9763887
improvement to single run job
kanwarujjaval Feb 2, 2025
c04f249
try to handle funnel and flow jobs :)
kanwarujjaval Feb 2, 2025
3c93269
close server before flushall, and keepalive between node and nginx
kanwarujjaval Feb 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
complex job operations from oplog
  • Loading branch information
kanwarujjaval committed Jan 15, 2025
commit ef82822ed97905feb4a34e777326a32d76e648e4
41 changes: 41 additions & 0 deletions jobServer/Job.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const defaultLogger = {
e: console.error,
i: console.info
};
const { JOB_PRIORITIES } = require('./constants/JobPriorities');
/**
* Base class for creating jobs.
*
Expand Down Expand Up @@ -274,6 +275,46 @@ class Job {

this.logger?.d(`Progress reported for job "${this.jobName}":`, progressData);
}

/**
* Get job retry configuration
* @typedef {Object} RetryConfig
* @property {boolean} enabled - Whether retries are enabled
* @property {number} attempts - Number of retry attempts
* @property {number} delay - Delay between retry attempts in milliseconds, delay is by default exponentially increasing after each attempt
* @returns {RetryConfig|null} Retry configuration or null for default
*/
getRetryConfig() {
return {
enabled: true,
attempts: 3,
delay: 2000 // 2 seconds
};
}

/**
* Get job priority
* @returns {string} Priority level from JOB_PRIORITIES
*/
getPriority() {
return JOB_PRIORITIES.NORMAL;
}

/**
* Get job concurrency
* @returns {number|null} Maximum concurrent instances or null for default
*/
getConcurrency() {
return 1;
}

/**
* Get job lock lifetime in milliseconds
* @returns {number|null} Lock lifetime or null for default
*/
getLockLifetime() {
return 55 * 60 * 1000; // 55 minutes
}
}

module.exports = Job;
213 changes: 198 additions & 15 deletions jobServer/JobManager.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const {RUNNER_TYPES, createJobRunner} = require('./JobRunner');
const {RUNNER_TYPES, createJobRunner} = require('./JobRunner/index');
const config = require("./config");
const JobUtils = require('./JobUtils');

/**
* Manages job configurations and initialization.
Expand All @@ -25,6 +26,9 @@ class JobManager {
*/
#jobRunner = null;


#jobConfigsCollection;

/**
* Creates a new JobManager instance
* @param {Object} db Database connection
Expand All @@ -40,6 +44,199 @@ class JobManager {
const pulseConfig = config.PULSE;

this.#jobRunner = createJobRunner(this.#db, runnerType, pulseConfig, Logger);
this.#jobConfigsCollection = db.collection('jobConfigs');
this.#watchConfigs();
}

/**
* Watches for changes in job configurations
* @private
* @returns {Promise<void>} A promise that resolves once the watcher is started
*/
async #watchConfigs() {
const changeStream = this.#jobConfigsCollection.watch();

changeStream.on('change', async(change) => {
if (change.operationType === 'update' || change.operationType === 'insert') {
const jobConfig = change.fullDocument;
await this.#applyConfig(jobConfig);
}
});
}

/**
* Applies job configuration changes
* @private
* @param {Object} jobConfig The job configuration to apply
* @param {string} jobConfig.jobName The name of the job
* @param {boolean} [jobConfig.runNow] Whether to run the job immediately
* @param {Object} [jobConfig.schedule] Schedule configuration
* @param {Object} [jobConfig.retry] Retry configuration
* @param {boolean} [jobConfig.enabled] Whether the job is enabled
* @returns {Promise<void>} A promise that resolves once the configuration is applied
*/
async #applyConfig(jobConfig) {
try {
if (!this.#jobRunner) {
throw new Error('Job runner not initialized');
}

const { jobName } = jobConfig;

if (jobConfig.runNow === true) {
await this.#jobRunner.runJobNow(jobName);
await this.#jobConfigsCollection.updateOne(
{ jobName },
{ $unset: { runNow: "" } }
);
}

if (jobConfig.schedule) {
await this.#jobRunner.updateSchedule(jobName, jobConfig.schedule);
}

if (jobConfig.retry) {
await this.#jobRunner.configureRetry(jobName, jobConfig.retry);
}

if (typeof jobConfig.enabled === 'boolean') {
if (jobConfig.enabled) {
await this.#jobRunner.enableJob(jobName);
this.#log.i(`Job ${jobName} enabled via config`);
}
else {
await this.#jobRunner.disableJob(jobName);
this.#log.i(`Job ${jobName} disabled via config`);
}
}
}
catch (error) {
this.#log.e('Error applying job configuration:', error);
}
}

/**
* Loads job classes and manages their configurations
* @private
* @param {Object.<string, Function>} jobClasses - Object containing job class implementations keyed by job name
* @returns {Promise<void>} A promise that resolves once all jobs are loaded and configured
* @throws {Error} If job loading or configuration fails
*/
async #loadJobs(jobClasses) {
// Calculate checksums for all job definitions
const jobDefinitionChecksums = Object.entries(jobClasses).reduce((acc, [name, JobClass]) => {
acc[name] = JobUtils.calculateJobChecksum(JobClass);
return acc;
}, {});

// Initialize or update job configurations
await this.#initializeJobConfigs(jobClasses, jobDefinitionChecksums);

// Load and apply configurations
await this.#applyJobConfigurations(jobClasses, jobDefinitionChecksums);
}

/**
* Initializes or updates job configurations in the database
* @private
* @param {Object.<string, Function>} jobClasses - Job class implementations
* @param {Object.<string, string>} jobDefinitionChecksums - Checksums of job definitions
*/
async #initializeJobConfigs(jobClasses, jobDefinitionChecksums) {
await Promise.all(
Object.entries(jobClasses).map(async([jobName, JobClass ]) => {
this.#log.d(`Initializing job config for ${jobName}`);
const currentChecksum = jobDefinitionChecksums[jobName];
const existingConfigOverride = await this.#jobConfigsCollection.findOne({ jobName });

if (!existingConfigOverride) {
// Create new configuration for new job
await this.#createDefaultJobConfig(jobName, currentChecksum, JobClass);
}
else if (existingConfigOverride.checksum !== currentChecksum) {
// Reset configuration if job implementation has changed
await this.#resetJobConfig(jobName, currentChecksum);
}
})
);
}

/**
* Creates a default configuration for a new job
* @private
* @param {string} jobName - Name of the job
* @param {string} checksum - Checksum of the job definition
* @param {Function} JobClass - Job class implementation
*/
async #createDefaultJobConfig(jobName, checksum, JobClass) {
const instance = new JobClass(jobName);

await this.#jobConfigsCollection.insertOne({
jobName,
enabled: true,
checksum,
createdAt: new Date(),
defaultConfig: {
schedule: instance.getSchedule(),
retry: instance.getRetryConfig(),
priority: instance.getPriority(),
concurrency: instance.getConcurrency(),
lockLifetime: instance.getLockLifetime()
}
});
this.#log.d(`Created default configuration for new job: ${jobName}`);
}

/**
* Resets job configuration when implementation changes
* @private
* @param {string} jobName - Name of the job
* @param {string} newChecksum - New checksum of the job definition
*/
async #resetJobConfig(jobName, newChecksum) {
this.#log.w(`Job ${jobName} implementation changed, resetting configuration`);
await this.#jobConfigsCollection.updateOne(
{ jobName },
{
$set: {
enabled: true,
checksum: newChecksum,
updatedAt: new Date()
},
$unset: {
schedule: "",
retry: "",
runNow: ""
}
}
);
}

/**
* Applies configurations to jobs
* @private
* @param {Object.<string, Function>} jobClasses - Job class implementations
* @param {Object.<string, string>} jobDefinitionChecksums - Checksums of job definitions
*/
async #applyJobConfigurations(jobClasses, jobDefinitionChecksums) {
// Load all existing configuration overrides
const configOverrides = await this.#jobConfigsCollection.find({}).toArray();
const configOverridesMap = new Map(configOverrides.map(conf => [conf.jobName, conf]));

// Create and configure jobs
await Promise.all(
Object.entries(jobClasses).map(async([jobName, JobClass]) => {
// Create the job with default settings
await this.#jobRunner.createJob(jobName, JobClass);

// Apply configuration override if valid
const configOverride = configOverridesMap.get(jobName);
if (configOverride && configOverride.checksum === jobDefinitionChecksums[jobName]) {
await this.#applyConfig(configOverride);
this.#log.d(`Applied configuration override for job: ${jobName}`);
}
})
);
}

/**
Expand Down Expand Up @@ -82,20 +279,6 @@ class JobManager {
await this.#jobRunner.disableJob(jobName);
}

/**
* Loads the job classes into the job runner
* @param {Object.<string, Function>} jobClasses Object containing job classes keyed by job name
* @returns {Promise<void>} A promise that resolves once the jobs are loaded
*/
#loadJobs(jobClasses) {
return Promise.all(
Object.entries(jobClasses)
.map(([name, JobClass]) => {
return this.#jobRunner.createJob(name, JobClass);
})
);
}

/**
* Closes the JobManager and cleans up resources
* @returns {Promise<void>} A promise that resolves once cleanup is complete
Expand Down
36 changes: 0 additions & 36 deletions jobServer/JobServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ class JobServer {
// await this.#jobConfigsCollection.createIndex({ jobName: 1 }, /*{ unique: true }*/);

this.#setupSignalHandlers();
// Watch for changes in job configurations
this.#watchJobConfigs();

this.#log.i('Job process init successfully');
}
Expand Down Expand Up @@ -176,40 +174,6 @@ class JobServer {
});
}

/**
* Watch for changes in job configurations
* @private
*/
async #watchJobConfigs() {
const changeStream = this.#jobConfigsCollection.watch();

changeStream.on('change', async(change) => {
try {
if (change.operationType === 'update' || change.operationType === 'insert') {
const jobName = change.fullDocument.jobName;
const enabled = change.fullDocument.enabled;

if (enabled) {
await this.#jobManager.enableJob(jobName);
}
else {
await this.#jobManager.disableJob(jobName);
}

this.#log.i(`Job ${jobName} ${enabled ? 'enabled' : 'disabled'}`);
}
}
catch (error) {
this.#log.e('Error processing job config change:', error);
}
});

changeStream.on('error', (error) => {
this.#log.e('Error in job configs change stream:', error);
// Implement reconnection logic here
});
}

/**
* Shuts down the job process.
* @param {number} [exitCode=0] - The exit code to use when shutting down the process.
Expand Down
14 changes: 14 additions & 0 deletions jobServer/JobUtils.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const Job = require('./Job');
const {isValidCron} = require('cron-validator');
const later = require('@breejs/later');
const crypto = require('crypto');

/**
* Class responsible for validating job classes.
Expand Down Expand Up @@ -235,6 +236,19 @@ class JobUtils {
return `${minutePart} ${hourPart} ${dayPart} ${monthPart} ${dayOfWeekPart}`;
}

/**
* Calculates checksum for a job class
* @param {Function} JobClass The job class to calculate checksum for
* @returns {string} The calculated checksum
*/
static calculateJobChecksum(JobClass) {
const jobString = JobClass.toString();
return crypto
.createHash('sha256')
.update(jobString)
.digest('hex');
}

}

module.exports = JobUtils;
Loading
Loading