Skip to content

Commit

Permalink
feat: create workers
Browse files Browse the repository at this point in the history
  • Loading branch information
Jadiscke committed Aug 18, 2022
1 parent 3295481 commit 570c598
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 13 deletions.
64 changes: 51 additions & 13 deletions bull-script.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ const URL = process.env.URL;
const START_PAGE = Number(process.env.START_PAGE);
const END_PAGE = Number(process.env.END_PAGE);
const COOKIE = process.env.COOKIE || '';
const OPT = process.env.OPT || 'DEFAULT';

console.log('Starting...')
console.log('Configs: ', URL, ' / ', START_PAGE, ' - ', END_PAGE)
// console.log('Starting...')
// console.log('Configs: ', URL, ' / ', START_PAGE, ' - ', END_PAGE)

const headers = new Headers({
cookie: COOKIE
cookie: COOKIE,
})

async function retryJobs(jobsArray) {
Expand All @@ -27,6 +28,12 @@ async function cleanJobs(jobsArray) {
}))
}

async function retryAll() {
console.log('Start Retry All')
const result = await fetch(`${URL}/api/queues/UseCaseJob/retry`, {method: 'put', headers: headers, redirect: "follow", follow: 20})
console.log(result)
}

async function wait(waitingTime) {
await new Promise((resolve, reject) => {
if ( waitingTime < 0 ){
Expand All @@ -41,18 +48,49 @@ process.on('uncaughtException', function (exception) {
console.log(exception)
});


for (let i = START_PAGE; i >= END_PAGE; i--) {
if( i % 100 === 0) {
console.log(i)
async function bullScript(start, end) {
for (let i = start; i >= end; i--) {
if( i % 100 === 0) {
console.log(i)
}

const response = await fetch(
`${URL}/api/queues?activeQueue=UseCaseJob&status=failed&page=${i}`,
{ method: "get", headers: headers }
)

const jobsIds = (await response.json()).queues[0].jobs.map(element => element.id)
retryJobs(jobsIds).catch((error) => { console.log(error.code)})

}
}

export function getStartEndArray(startValue, endValue, n) {
const startEndArray = new Array(n).fill({start:0, end: 0})
const dividedStartValue = Number((startValue / n).toFixed(0))
return startEndArray.map((element, index) => {
const start = dividedStartValue * (index + 1)
const end = endValue + dividedStartValue * index
return {start, end}
})
}

export async function parallelSend(startEndArray,index) {
return bullScript(startEndArray[index].start, startEndArray[index].end)
}

const response = await fetch(
`${URL}/api/queues?activeQueue=UseCaseJob&status=completed&page=${i}`,
{ method: "get", headers: headers }
)
export async function main(opt) {

const jobsIds = (await response.json()).queues[0].jobs.map(element => element.id)
cleanJobs(jobsIds).catch((error) => { console.log(error)})
if (opt === 'PARALLEL') {
const startEndArray = getStartEndArray(START_PAGE, END_PAGE, n)
console.log(startEndArray)
await Promise.all(startEndArray.map(async (element) => {
bullScript(element.start, element.end)
}))
}

if (opt === 'DEFAULT') {
bullScript(START_PAGE, END_PAGE);
}

}
12 changes: 12 additions & 0 deletions bull-threads.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { Worker } from 'worker_threads'
import { getStartEndArray } from './bull-script.mjs'
const START_PAGE = Number(process.env.START_PAGE);
const END_PAGE = Number(process.env.END_PAGE);
const n = 100
const startEndArray = getStartEndArray(START_PAGE,END_PAGE, n);
const array = new Array(n).fill(0);
const workerArray = array.map(() => new Worker('./bull-worker.mjs'))
workerArray.map((worker,index) => worker.postMessage({index, startEndArray}))
workerArray.map((worker) => worker.on('message', (result) => {
console.log('Sent By Worker: ', result)
}))
11 changes: 11 additions & 0 deletions bull-worker.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { parentPort } from 'worker_threads'
import { parallelSend } from './bull-script.mjs'

parentPort.on("message", (contents) => {
const {index, startEndArray } = contents

parallelSend(startEndArray, index)
parentPort.postMessage(
`Started: ${index}, ${startEndArray[index]}`
)
})

0 comments on commit 570c598

Please sign in to comment.