Skip to content

Conversation

@fahimfaisaal
Copy link

@fahimfaisaal fahimfaisaal commented Jul 6, 2025

Improvements

  1. Eliminated Repetition and Boilerplate - Single Source of Truth
    Before: Each of the 7+ workers contained 30-40 lines of identical boilerplate code for queue setup, Redis connections, job options, and error handling
    After: All common functionality consolidated into BaseQueue and BaseWorker base classes
    Impact: Reduced codebase by ~200+ lines of duplicated code, with all configuration centralized in the base classes
  2. Significantly Less Error-Prone
    Consistent Behavior: All workers now have identical error handling, logging patterns, and job management
    Type Safety: Generic base classes (BaseQueue, BaseWorker) provide compile-time type checking
    Reduced Human Error: No more copy-paste mistakes when creating new workers
  3. Streamlined Future Worker Integration
    Minimal Implementation: New workers only need to extend base classes and implement the handler method
    Plug-and-Play: Consistent constructor patterns and method signatures across all workers
    Template Pattern: Clear structure guides developers on how to implement new workers correctly
    Example: Adding a new worker now requires ~15 lines instead of ~60 lines
  4. Enhanced Long-term Maintainability
    Centralized Updates: Changes to queue behavior, error handling, or job options only need modification in one place
    Easier Debugging: Consistent logging and error patterns across all workers
    Simplified Testing: Base class functionality can be tested once, reducing test complexity
    Code Readability: Worker classes now focus purely on business logic, making them easier to understand and review

Summary by CodeRabbit

  • Refactor

    • Consolidated queue/worker behavior into shared base classes and migrated from factory-style workers to class-based workers with a unified .create() API.
  • New Features

    • Added background jobs to delete organization audit logs and to reactivate organizations.
  • Chores

    • Standardized worker/queue module names, removed legacy queue/worker interfaces, and cleaned up related imports and public API.

✏️ Tip: You can customize this high-level summary in your review settings.

Checklist

  • I have discussed my proposed changes in an issue and have received approval to proceed.
  • I have followed the coding standards of the project.
  • Tests or benchmarks have been added or updated.
  • Documentation has been updated on https://github.com/wundergraph/cosmo-docs.
  • I have read the Contributors Guide.

…ueue

- Replaced individual worker and queue interfaces with abstract  base classes for better code reuse and maintainability.
- Updated worker and queue classes to extend from the new base abstract classes, simplifying their constructors and error handling and log managements.
- Removed deprecated worker creation functions in favor of the new base worker & queue pattern.
- Adjusted imports and cleaned up unused code across multiple worker files.
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jul 6, 2025

Walkthrough

Replaces factory-style worker creation with class-based workers built on new BaseQueue/BaseWorker abstractions, adds DeleteOrganizationAuditLogs and ReactivateOrganization modules, removes legacy IQueue/IWorker interfaces and factory files, and updates imports/bootstrap/CLI to use new ...(...).create().

Changes

Cohort / File(s) Change Summary
Core base classes
controlplane/src/core/workers/base/Queue.ts, controlplane/src/core/workers/base/Worker.ts, controlplane/src/core/workers/base/index.ts
Added BaseQueue and BaseWorker abstractions plus index re-export to centralize BullMQ queue/worker creation, logging, and event handling.
Migrated worker modules
controlplane/src/core/workers/AIGraphReadme.ts, controlplane/src/core/workers/CacheWarmer.ts, controlplane/src/core/workers/DeactivateOrganization.ts, controlplane/src/core/workers/DeleteOrganization.ts, controlplane/src/core/workers/DeleteUser.ts, controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts
Converted queue/worker implementations to extend BaseQueue/BaseWorker; removed direct BullMQ instantiation and factory functions; changed handler visibility to protected; updated imports and constructor parameter types (use generic DB).
New worker modules
controlplane/src/core/workers/DeleteOrganizationAuditLogs.ts, controlplane/src/core/workers/ReactivateOrganization.ts
Added class-based queue and worker modules with input interfaces, implemented using BaseQueue/BaseWorker.
Removed / replaced files
controlplane/src/core/workers/Worker.ts
Deleted legacy IQueue/IWorker interfaces (replaced by base classes).
Build/server bootstrap changes
controlplane/src/core/build-server.ts
Switched imports from factory functions to class-based worker modules and replaced usages with new ...(...).create(); removed MetricsPluginOptions import and adjusted metrics import.
Repository & route import updates
controlplane/src/core/repositories/OrganizationRepository.ts, controlplane/src/core/repositories/UserRepository.ts, controlplane/src/core/routes.ts
Updated import paths to new worker module filenames (removed "Worker" suffix); no logic changes.
CLI scripts import updates
controlplane/src/bin/deactivate-org.ts, controlplane/src/bin/delete-user.ts, controlplane/src/bin/reactivate-org.ts
Updated imports to reference renamed worker modules; no other changes.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately reflects the main change: introducing abstract base classes to refactor and improve workers and queue implementations throughout the codebase.
✨ Finishing touches
  • 📝 Generate docstrings

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
controlplane/src/core/workers/base/Queue.ts (1)

18-18: Remove trailing whitespace

Line 18 has trailing whitespace that should be removed.

-  },
-};
-  
+  },
+};
+
controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts (1)

50-63: Use consistent logger instance

Consider using the BaseWorker's child logger for consistency, as it includes worker context.

     } catch (err) {
-      this.input.logger.error(
+      this.logger.error(
         { jobId: job.id, organizationId: job.data.organizationId, err },
         'Failed to delete audit logs for organization',
       );
     }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ae21c5d and 30e4be4.

📒 Files selected for processing (12)
  • controlplane/src/core/build-server.ts (3 hunks)
  • controlplane/src/core/workers/AIGraphReadmeWorker.ts (4 hunks)
  • controlplane/src/core/workers/CacheWarmerWorker.ts (3 hunks)
  • controlplane/src/core/workers/DeactivateOrganizationWorker.ts (3 hunks)
  • controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts (3 hunks)
  • controlplane/src/core/workers/DeleteOrganizationWorker.ts (5 hunks)
  • controlplane/src/core/workers/DeleteUserQueue.ts (5 hunks)
  • controlplane/src/core/workers/ReactivateOrganizationWorker.ts (3 hunks)
  • controlplane/src/core/workers/Worker.ts (0 hunks)
  • controlplane/src/core/workers/base/Queue.ts (1 hunks)
  • controlplane/src/core/workers/base/Worker.ts (1 hunks)
  • controlplane/src/core/workers/base/index.ts (1 hunks)
💤 Files with no reviewable changes (1)
  • controlplane/src/core/workers/Worker.ts
🧰 Additional context used
🧬 Code Graph Analysis (1)
controlplane/src/core/workers/ReactivateOrganizationWorker.ts (1)
controlplane/src/core/workers/DeleteOrganizationWorker.ts (1)
  • DeleteOrganizationQueue (20-39)
🔇 Additional comments (20)
controlplane/src/core/workers/base/index.ts (1)

1-2: Clean barrel export pattern!

The module correctly provides a single entry point for the base classes.

controlplane/src/core/workers/base/Worker.ts (1)

14-26: Well-structured worker lifecycle management

The create method properly initializes the BullMQ worker with appropriate event handlers for monitoring and debugging.

controlplane/src/core/workers/base/Queue.ts (1)

5-17: Production-ready job configuration defaults

The default job options provide excellent balance between reliability (6 retry attempts with exponential backoff) and resource management (90-day retention).

controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts (1)

15-34: Clean queue implementation with proper job deduplication

The queue correctly extends BaseQueue and uses organizationId as jobId to prevent duplicate jobs per organization.

controlplane/src/core/build-server.ts (1)

314-375: Consistent worker instantiation pattern successfully implemented

All workers now follow the standardized pattern of class instantiation followed by .create() method call, with proper lifecycle management through the bullWorkers array.

controlplane/src/core/workers/DeleteOrganizationWorker.ts (3)

1-1: Imports look good

The import of ConnectionOptions, Job, JobsOptions from bullmq is appropriate for the refactored implementation.


20-23: Queue constructor implementation is correct

The constructor properly delegates to the base class with the required parameters.


53-56: Worker constructor properly delegates to base class

The constructor correctly passes all required parameters to the base class constructor, including the concurrency setting of 10.

controlplane/src/core/workers/AIGraphReadmeWorker.ts (2)

21-34: Queue constructor with custom job options is well-structured

The constructor properly configures default job options including retry logic with exponential backoff, which is a good practice for AI-related operations that might be rate-limited.


69-77: Worker initialization maintains proper separation of concerns

The constructor correctly delegates to the base class while maintaining the OpenAI client initialization, preserving the original functionality.

controlplane/src/core/workers/CacheWarmerWorker.ts (2)

22-27: Queue constructor follows standard pattern

The constructor properly delegates to the base class with the minimal required configuration.


60-63: Worker constructor correctly configured

The constructor properly passes all parameters to the base class with appropriate concurrency settings.

controlplane/src/core/workers/ReactivateOrganizationWorker.ts (2)

6-6: Cross-worker dependency is appropriate

The import of DeleteOrganizationQueue from another worker file is valid and maintains proper separation of concerns.


47-50: Worker constructor maintains consistency

The constructor follows the same pattern as other workers with identical concurrency settings, ensuring consistent behavior across the worker pool.

controlplane/src/core/workers/DeleteUserQueue.ts (3)

24-24: Queue constructor is consistent with other implementations

The constructor follows the established pattern for base class delegation.


56-59: Worker configuration aligns with other workers

The constructor maintains consistency with the concurrency setting of 10, matching all other refactored workers.


43-105: Excellent refactoring to reduce code duplication

This refactoring successfully achieves the PR objectives by:

  • Eliminating 30-40 lines of boilerplate code per worker
  • Standardizing error handling and logging through base classes
  • Maintaining type safety with generics
  • Ensuring consistent concurrency settings across all workers

The implementation aligns perfectly with the template pattern, allowing workers to focus solely on their business logic.

controlplane/src/core/workers/DeactivateOrganizationWorker.ts (3)

1-1: Clean import refactoring!

The import changes correctly reflect the move to base classes while retaining necessary BullMQ types.

Also applies to: 8-8


19-22: Excellent refactoring to extend BaseQueue!

The constructor correctly delegates to the base class, eliminating boilerplate code while maintaining type safety through generics.


41-56: Well-structured worker refactoring with centralized configuration!

The constructor properly extends BaseWorker and consolidates all dependencies into a single input object. This aligns perfectly with the PR's goal of reducing boilerplate.

Please verify that the concurrency setting of 10 is appropriate for deactivation operations. Consider if this could lead to race conditions or excessive load on dependent services (Keycloak, database).

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
controlplane/src/core/workers/base/Queue.ts (2)

12-16: Consider reducing the initial backoff delay.

The exponential backoff delay of 112 seconds (112,000ms) seems quite high for the initial retry attempt. Typically, exponential backoff starts with a smaller delay (e.g., 1-10 seconds) to allow for faster recovery from transient failures.

Consider adjusting the delay:

  backoff: {
    type: 'exponential',
-    delay: 112_000,
+    delay: 2_000, // Start with 2 seconds
  },

41-45: Consider improving type safety in Job type parameters.

The abstract method signatures use any types in the Job type parameters (Job<T, any, string>), which reduces type safety. Consider using more specific types or generics for better compile-time checking.

For example, you could introduce additional generic parameters:

- public abstract addJob(job: T, opts?: Omit<JobsOptions, 'jobId'>): Promise<Job<T, any, string> | undefined>;
+ public abstract addJob(job: T, opts?: Omit<JobsOptions, 'jobId'>): Promise<Job<T, unknown, string> | undefined>;

- public abstract getJob(job: T): Promise<Job<T, any, string> | undefined>;
+ public abstract getJob(job: T): Promise<Job<T, unknown, string> | undefined>;

Or introduce a generic for the result type if it's known by concrete implementations.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 30e4be4 and 7752503.

📒 Files selected for processing (3)
  • controlplane/src/core/workers/DeleteOrganizationWorker.ts (5 hunks)
  • controlplane/src/core/workers/base/Queue.ts (1 hunks)
  • controlplane/src/core/workers/base/Worker.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • controlplane/src/core/workers/base/Worker.ts
  • controlplane/src/core/workers/DeleteOrganizationWorker.ts
🔇 Additional comments (3)
controlplane/src/core/workers/base/Queue.ts (3)

1-2: LGTM - Clean imports.

The imports are well-structured and include the necessary dependencies for queue management and logging.


18-23: LGTM - Well-structured type definition.

The BaseQueueParams type is properly defined with appropriate required and optional properties.


25-39: LGTM - Excellent abstract class design.

The constructor properly initializes the queue and logger with contextual information, and the error handling setup ensures consistent logging across all queue implementations. This design effectively centralizes common queue functionality.

@StarpTech
Copy link
Contributor

StarpTech commented Jul 6, 2025

Hi @fahimfaisaal, thanks for the PR. That looks like a useful refactor. We're gonna take a look soon.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (1)
controlplane/src/core/workers/DeleteUser.ts (1)

27-41: Add validation for userId before using as jobId.

The methods use job.userId as the job identifier without validation. Consider adding a check to ensure userId is defined and valid.

 public addJob(job: DeleteUserInput, opts?: Omit<JobsOptions, 'jobId'>) {
+  if (!job.userId) {
+    throw new Error('userId is required for job identification');
+  }
   return this.queue.add(job.userId, job, {
     ...opts,
     jobId: job.userId,
   });
 }

 public removeJob(job: DeleteUserInput) {
+  if (!job.userId) {
+    throw new Error('userId is required for job identification');
+  }
   return this.queue.remove(job.userId);
 }

 public getJob(job: DeleteUserInput) {
+  if (!job.userId) {
+    throw new Error('userId is required for job identification');
+  }
   return this.queue.getJob(job.userId);
 }
🧹 Nitpick comments (3)
controlplane/src/core/workers/ReactivateOrganization.ts (1)

38-47: Consider making concurrency configurable.

The worker class refactoring looks good overall. However, the concurrency is hardcoded to 10 which reduces flexibility.

Consider accepting concurrency as a constructor parameter:

 export class ReactivateOrganizationWorker extends BaseWorker<ReactivateOrganizationInput> {
   constructor(
     private input: {
       redisConnection: ConnectionOptions;
       db: PostgresJsDatabase<typeof schema>;
       logger: pino.Logger;
       deleteOrganizationQueue: DeleteOrganizationQueue;
+      concurrency?: number;
     },
   ) {
-    super(WorkerName, QueueName, { connection: input.redisConnection, concurrency: 10 }, input.logger);
+    super(WorkerName, QueueName, { connection: input.redisConnection, concurrency: input.concurrency ?? 10 }, input.logger);
   }
controlplane/src/core/workers/CacheWarmer.ts (1)

49-61: Consider making concurrency configurable.

The worker refactoring looks good and properly extends BaseWorker. However, the concurrency value is hardcoded to 10.

Consider making concurrency configurable to allow performance tuning across different deployments:

 export class CacheWarmerWorker extends BaseWorker<CacheWarmerInput> {
   constructor(
     private input: {
       redisConnection: ConnectionOptions;
       db: PostgresJsDatabase<typeof schema>;
       logger: pino.Logger;
       chClient: ClickHouseClient | undefined;
       blobStorage: BlobStorage;
       cacheWarmerQueue: CacheWarmerQueue;
+      concurrency?: number;
     },
   ) {
-    super(WorkerName, QueueName, { connection: input.redisConnection, concurrency: 10 }, input.logger);
+    super(WorkerName, QueueName, { connection: input.redisConnection, concurrency: input.concurrency ?? 10 }, input.logger);
   }
controlplane/src/core/workers/DeleteUser.ts (1)

56-56: Consider making concurrency configurable.

The concurrency is hardcoded to 10. Consider making this configurable through the input parameters for better flexibility across different environments.

 private input: {
   db: PostgresJsDatabase<typeof schema>;
   redisConnection: ConnectionOptions;
   logger: pino.Logger;
   keycloakClient: Keycloak;
   keycloakRealm: string;
   blobStorage: BlobStorage;
   platformWebhooks: PlatformWebhookService;
   deleteOrganizationAuditLogsQueue: DeleteOrganizationAuditLogsQueue;
+  concurrency?: number;
 },
) {
- super(WorkerName, QueueName, { connection: input.redisConnection, concurrency: 10 }, input.logger);
+ super(WorkerName, QueueName, { connection: input.redisConnection, concurrency: input.concurrency ?? 10 }, input.logger);
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a859b2f and f1e5eb8.

📒 Files selected for processing (14)
  • controlplane/src/bin/deactivate-org.ts (1 hunks)
  • controlplane/src/bin/delete-user.ts (1 hunks)
  • controlplane/src/bin/reactivate-org.ts (1 hunks)
  • controlplane/src/core/build-server.ts (3 hunks)
  • controlplane/src/core/repositories/OrganizationRepository.ts (1 hunks)
  • controlplane/src/core/repositories/UserRepository.ts (1 hunks)
  • controlplane/src/core/routes.ts (1 hunks)
  • controlplane/src/core/workers/AIGraphReadme.ts (5 hunks)
  • controlplane/src/core/workers/CacheWarmer.ts (3 hunks)
  • controlplane/src/core/workers/DeactivateOrganization.ts (3 hunks)
  • controlplane/src/core/workers/DeleteOrganization.ts (5 hunks)
  • controlplane/src/core/workers/DeleteOrganizationAuditLogs.ts (1 hunks)
  • controlplane/src/core/workers/DeleteUser.ts (5 hunks)
  • controlplane/src/core/workers/ReactivateOrganization.ts (3 hunks)
✅ Files skipped from review due to trivial changes (6)
  • controlplane/src/bin/deactivate-org.ts
  • controlplane/src/core/repositories/UserRepository.ts
  • controlplane/src/bin/delete-user.ts
  • controlplane/src/bin/reactivate-org.ts
  • controlplane/src/core/routes.ts
  • controlplane/src/core/repositories/OrganizationRepository.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • controlplane/src/core/build-server.ts
🧰 Additional context used
🧬 Code Graph Analysis (2)
controlplane/src/core/workers/ReactivateOrganization.ts (1)
controlplane/src/core/workers/DeleteOrganization.ts (1)
  • DeleteOrganizationQueue (20-39)
controlplane/src/core/workers/DeactivateOrganization.ts (2)
controlplane/src/core/services/Keycloak.ts (1)
  • Keycloak (8-431)
controlplane/src/core/workers/DeleteOrganization.ts (1)
  • DeleteOrganizationQueue (20-39)
🔇 Additional comments (25)
controlplane/src/core/workers/DeleteOrganization.ts (4)

1-1: Import statements look correct.

The imports properly include the necessary BullMQ types and the new base classes from the refactored architecture.

Also applies to: 10-11


20-39: Queue class refactoring is well-implemented.

The DeleteOrganizationQueue class correctly extends BaseQueue<DeleteOrganizationInput> with proper generic typing. The constructor appropriately delegates to the base class with the required configuration parameters, and all public methods maintain their original functionality.


41-54: Worker class constructor follows proper template pattern.

The DeleteOrganizationWorker class correctly extends BaseWorker<DeleteOrganizationInput> and the constructor properly delegates to the base class with all required parameters (worker name, queue name, options, and logger). The concurrency setting of 10 is preserved from the original implementation.


56-110: Handler method refactoring maintains functionality with proper encapsulation.

The handler method visibility change from public to protected is appropriate for the template pattern, as the base class will manage the public interface. The method signature correctly uses the generic type Job<DeleteOrganizationInput>, and all business logic including error handling is preserved intact.

controlplane/src/core/workers/DeactivateOrganization.ts (3)

1-8: Imports are well-organized and appropriate.

The imports correctly bring in the new base classes and maintain necessary dependencies.


19-39: Queue implementation follows consistent patterns.

The refactored queue class properly extends BaseQueue and maintains the same public interface. The implementation aligns with other queue implementations like DeleteOrganizationQueue.


41-86: Worker refactoring maintains functionality while improving structure.

The worker class properly extends BaseWorker and correctly delegates initialization to the base class. The handler method visibility change to protected is appropriate as it enforces proper encapsulation.

controlplane/src/core/workers/ReactivateOrganization.ts (3)

1-7: Import changes look good!

The imports correctly include the new base classes and the required DeleteOrganizationQueue dependency.


17-36: Queue class refactoring is well implemented!

The class correctly extends BaseQueue and maintains the same public API while delegating queue initialization to the base class.


50-75: Handler method visibility change is appropriate!

The change from public to protected correctly implements the template method pattern, where the base class manages the worker lifecycle and calls the handler internally.

controlplane/src/core/workers/CacheWarmer.ts (3)

1-1: Imports correctly updated for base class usage.

The import changes properly reflect the refactoring to use BaseQueue and BaseWorker while retaining necessary type imports from bullmq.

Also applies to: 9-9


20-27: Clean refactoring to extend BaseQueue.

The queue class properly extends BaseQueue<CacheWarmerInput> and delegates initialization to the base class, aligning with the PR's goal of reducing boilerplate code.


63-63: Appropriate visibility change for template method pattern.

Changing the handler to protected correctly implements the template method pattern where the base class manages the worker lifecycle and calls the handler internally.

controlplane/src/core/workers/DeleteOrganizationAuditLogs.ts (4)

1-6: LGTM! Clean imports and proper ES module syntax.

The imports are well-organized and follow proper ES module conventions with .js extensions. The dependency injection pattern is correctly established through the imported types.


8-9: Good practice using descriptive constants.

The queue and worker names are well-defined and follow consistent naming conventions.


11-13: Well-defined interface for job input.

The interface is appropriately scoped and provides clear type safety for the worker's input requirements.


15-34: Excellent implementation of the queue class.

The queue class effectively leverages the base class architecture with proper generic typing. Using organizationId as both the job name and jobId ensures job uniqueness per organization, which is appropriate for this use case.

controlplane/src/core/workers/DeleteUser.ts (4)

22-22: Good use of generics for type safety!

The refactoring to extend BaseQueue<DeleteUserInput> properly leverages TypeScript's type system.


23-25: Clean constructor delegation!

The simplified constructor properly delegates queue initialization to the base class, reducing boilerplate.


43-47: Consistent refactoring pattern and improved flexibility!

The worker class properly extends BaseWorker<DeleteUserInput> and uses ConnectionOptions for better flexibility in Redis configuration.


59-59: Correct visibility modifier for template method pattern!

Changing the handler method to protected properly encapsulates it within the class hierarchy, ensuring it's only called by the base class.

controlplane/src/core/workers/AIGraphReadme.ts (4)

2-2: Clean import refactoring!

The imports correctly reflect the new architecture - importing only the necessary types from bullmq while the actual Queue and Worker implementations are now handled by the base classes.

Also applies to: 8-8


58-74: Excellent worker class refactoring!

The class properly extends BaseWorker with correct generic type and delegates worker setup to the base class. The concurrency setting (10) is preserved, and the OpenAI GraphQL client initialization is appropriately placed after the base class constructor.


120-136: Appropriate visibility change for the handler method!

Changing the handler to protected correctly implements the template method pattern, ensuring it can only be called by the base class orchestration logic while maintaining proper async/await handling and error propagation.


19-35: Confirmed: BaseQueue exposes the queue property
The protected readonly queue: Queue<T> in controlplane/src/core/workers/base/Queue.ts ensures that addJob, removeJob, and getJob on AIGraphReadmeQueue will work as intended.

All set—no further changes needed.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8584730 and fd8310f.

📒 Files selected for processing (7)
  • controlplane/src/core/workers/AIGraphReadme.ts (4 hunks)
  • controlplane/src/core/workers/CacheWarmer.ts (3 hunks)
  • controlplane/src/core/workers/DeactivateOrganization.ts (3 hunks)
  • controlplane/src/core/workers/DeleteOrganization.ts (3 hunks)
  • controlplane/src/core/workers/DeleteOrganizationAuditLogs.ts (1 hunks)
  • controlplane/src/core/workers/DeleteUser.ts (4 hunks)
  • controlplane/src/core/workers/ReactivateOrganization.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
  • controlplane/src/core/workers/DeleteOrganizationAuditLogs.ts
  • controlplane/src/core/workers/DeactivateOrganization.ts
  • controlplane/src/core/workers/AIGraphReadme.ts
  • controlplane/src/core/workers/CacheWarmer.ts
  • controlplane/src/core/workers/DeleteOrganization.ts
  • controlplane/src/core/workers/DeleteUser.ts
🔇 Additional comments (1)
controlplane/src/core/workers/ReactivateOrganization.ts (1)

37-74: Well-structured worker implementation following the new base class pattern.

The worker implementation correctly extends the BaseWorker class and properly handles errors by logging with context before re-throwing. The use of async/await aligns with the PR's goal of fixing asynchronous handling.

Comment on lines +28 to +34
public removeJob(job: ReactivateOrganizationInput) {
return this.queue.remove(job.organizationId);
}

public getJob(job: ReactivateOrganizationInput) {
return this.queue.getJob(job.organizationId);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Rename misleading parameter names in removeJob and getJob methods.

The parameter name job in these methods is misleading as it's not a Job instance but rather the input data. This could cause confusion for developers.

-  public removeJob(job: ReactivateOrganizationInput) {
-    return this.queue.remove(job.organizationId);
+  public removeJob(input: ReactivateOrganizationInput) {
+    return this.queue.remove(input.organizationId);
   }

-  public getJob(job: ReactivateOrganizationInput) {
-    return this.queue.getJob(job.organizationId);
+  public getJob(input: ReactivateOrganizationInput) {
+    return this.queue.getJob(input.organizationId);
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public removeJob(job: ReactivateOrganizationInput) {
return this.queue.remove(job.organizationId);
}
public getJob(job: ReactivateOrganizationInput) {
return this.queue.getJob(job.organizationId);
}
public removeJob(input: ReactivateOrganizationInput) {
return this.queue.remove(input.organizationId);
}
public getJob(input: ReactivateOrganizationInput) {
return this.queue.getJob(input.organizationId);
}
🤖 Prompt for AI Agents
In controlplane/src/core/workers/ReactivateOrganization.ts around lines 28 to
34, rename the parameter named 'job' in the removeJob and getJob methods to a
more descriptive name such as 'input' or 'reactivateInput' to accurately reflect
that it is input data, not a Job instance. Update all references to this
parameter within these methods accordingly to avoid confusion.

Comment on lines +49 to +65
protected async handler(job: Job<ReactivateOrganizationInput>) {
try {
const orgRepo = new OrganizationRepository(this.input.logger, this.input.db);

const org = await orgRepo.bySlug(job.data.organizationSlug);
if (!org) {
throw new Error('Organization not found');
}

if (org.id !== job.data.organizationId) {
throw new Error('Id and slug mismatch');
}

await orgRepo.reactivateOrganization({
organizationId: job.data.organizationId,
deleteOrganizationQueue: this.input.deleteOrganizationQueue,
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify handling of already-active organizations.

The handler doesn't check if the organization is already active before attempting reactivation. Ensure that the reactivateOrganization repository method properly handles this scenario to avoid unnecessary processing or state inconsistencies.


🏁 Script executed:

#!/bin/bash
# Description: Check if the OrganizationRepository.reactivateOrganization method handles already-active organizations

# Search for the reactivateOrganization method implementation
ast-grep --pattern 'reactivateOrganization($_) {
  $$$
}'

# Also search with async keyword
ast-grep --pattern 'async reactivateOrganization($_) {
  $$$
}'

# Search for any status checks in the implementation
rg -A 10 "reactivateOrganization.*\{" --type ts

Length of output: 4280


Add a guard for already-active organizations before reactivation

The reactivateOrganization method in OrganizationRepository always unsets the deactivated fields, even if the org is already active. Add an explicit check on isDeactivated and throw or short-circuit when it’s false to avoid redundant updates or hidden bugs.

• File: controlplane/src/core/repositories/OrganizationRepository.ts
– Before running the UPDATE, fetch the current isDeactivated flag for organizationId.
– If isDeactivated is already false, throw an error (or simply return) to indicate no action is needed.

Suggested diff (inside reactivateOrganization):

 public async reactivateOrganization(input: {
   organizationId: string;
   deleteOrganizationQueue: DeleteOrganizationQueue;
 }) {
-  await this.db.transaction(async (tx) => {
+  await this.db.transaction(async (tx) => {
+    // Guard: don’t reactivate an already-active organization
+    const current = await tx
+      .select({ isDeactivated: schema.organizations.isDeactivated })
+      .from(schema.organizations)
+      .where({ id: input.organizationId })
+      .first();
+
+    if (current && !current.isDeactivated) {
+      throw new Error('Organization is already active');
+    }
+
     await tx
       .update(schema.organizations)
       .set({
         isDeactivated: false,
         deactivatedAt: null,
         deactivationReason: null,
       })
       .where({ id: input.organizationId });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
protected async handler(job: Job<ReactivateOrganizationInput>) {
try {
const orgRepo = new OrganizationRepository(this.input.logger, this.input.db);
const org = await orgRepo.bySlug(job.data.organizationSlug);
if (!org) {
throw new Error('Organization not found');
}
if (org.id !== job.data.organizationId) {
throw new Error('Id and slug mismatch');
}
await orgRepo.reactivateOrganization({
organizationId: job.data.organizationId,
deleteOrganizationQueue: this.input.deleteOrganizationQueue,
});
// File: controlplane/src/core/repositories/OrganizationRepository.ts
public async reactivateOrganization(input: {
organizationId: string;
deleteOrganizationQueue: DeleteOrganizationQueue;
}) {
await this.db.transaction(async (tx) => {
// Guard: don’t reactivate an already-active organization
const current = await tx
.select({ isDeactivated: schema.organizations.isDeactivated })
.from(schema.organizations)
.where({ id: input.organizationId })
.first();
if (current && !current.isDeactivated) {
throw new Error('Organization is already active');
}
await tx
.update(schema.organizations)
.set({
isDeactivated: false,
deactivatedAt: null,
deactivationReason: null,
})
.where({ id: input.organizationId });
});
// …any remaining logic (e.g. enqueueing deleteOrganizationQueue)…
}
🤖 Prompt for AI Agents
In controlplane/src/core/repositories/OrganizationRepository.ts around the
reactivateOrganization method, add a check before executing the update query to
fetch the current isDeactivated status for the given organizationId. If
isDeactivated is already false, either throw an error or return early to prevent
unnecessary updates and signal that the organization is already active. This
avoids redundant database operations and potential hidden bugs.

@github-actions
Copy link

github-actions bot commented Oct 6, 2025

This PR was marked stale due to lack of activity. It will be closed in 14 days.

@github-actions github-actions bot added the Stale label Oct 6, 2025
@github-actions github-actions bot removed the Stale label Oct 8, 2025
@github-actions
Copy link

This PR was marked stale due to lack of activity. It will be closed in 14 days.

@github-actions github-actions bot added the Stale label Oct 22, 2025
@github-actions github-actions bot removed the Stale label Oct 26, 2025
@github-actions
Copy link

This PR was marked stale due to lack of activity. It will be closed in 14 days.

@github-actions github-actions bot added the Stale label Nov 10, 2025
@github-actions github-actions bot removed the Stale label Nov 11, 2025
@github-actions
Copy link

This PR was marked stale due to lack of activity. It will be closed in 14 days.

@github-actions github-actions bot added the Stale label Nov 25, 2025
@github-actions github-actions bot removed the Stale label Nov 26, 2025
@github-actions
Copy link

This PR was marked stale due to lack of activity. It will be closed in 14 days.

@github-actions github-actions bot added the Stale label Dec 10, 2025
@github-actions github-actions bot removed the Stale label Dec 11, 2025
@github-actions
Copy link

This PR was marked stale due to lack of activity. It will be closed in 14 days.

@github-actions github-actions bot added the Stale label Dec 25, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
controlplane/src/core/build-server.ts (1)

321-332: Consider conditional queue creation to match worker instantiation.

The readmeQueue is created unconditionally (line 321), but the readmeWorker is only instantiated when opts.openaiAPIKey is present. This means jobs could be enqueued without any worker to process them when the API key is not configured, potentially leading to jobs accumulating indefinitely.

Consider moving the queue instantiation inside the conditional block to maintain consistency between queue and worker lifecycle.

🔎 Proposed refactor
-  const readmeQueue = new AIGraphReadmeQueue(logger, fastify.redisForQueue);
-
+  let readmeQueue: AIGraphReadmeQueue | undefined;
+  
   if (opts.openaiAPIKey) {
+    readmeQueue = new AIGraphReadmeQueue(logger, fastify.redisForQueue);
     const readmeWorker = new AIGraphReadmeWorker({
       redisConnection: fastify.redisForWorker,
       db: fastify.db,
       logger,
       openAiApiKey: opts.openaiAPIKey,
     });
 
     bullWorkers.push(readmeWorker.create());
   }

Note: This would also require updating the queues object passed to routes (line 486) to handle the optional queue.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c0de038 and f92252c.

📒 Files selected for processing (1)
  • controlplane/src/core/build-server.ts
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-11-20T14:54:43.904Z
Learnt from: pepol
Repo: wundergraph/cosmo PR: 2347
File: controlplane/src/core/plugins/redis.ts:70-75
Timestamp: 2025-11-20T14:54:43.904Z
Learning: For BullMQ workers using ioredis v5, only `maxRetriesPerRequest: null` is required in the Redis connection configuration. The `enableReadyCheck: false` setting is not required per current BullMQ documentation for ioredis v5.

Applied to files:

  • controlplane/src/core/build-server.ts
🔇 Additional comments (5)
controlplane/src/core/build-server.ts (5)

334-340: LGTM! Clean migration to class-based worker.

The instantiation follows the new class-based pattern consistently with appropriate parameters for audit log deletion.


342-352: LGTM! Proper dependency composition.

The worker correctly receives the deleteOrganizationAuditLogsQueue as a dependency, enabling proper orchestration between related background jobs.


354-363: LGTM! Consistent class-based instantiation.

The deactivation worker properly depends on deleteOrganizationQueue, following the established pattern.


365-372: LGTM! Clean worker implementation.

The reactivation worker is properly configured with the necessary queue dependency.


374-385: LGTM! Comprehensive worker configuration.

The delete user worker is properly configured with all necessary dependencies including Keycloak, blob storage, webhooks, and the audit logs queue. The Redis configuration in the plugin already includes the required maxRetriesPerRequest: null setting for BullMQ workers with ioredis v5, and the worker correctly receives this pre-configured connection.

Comment on lines +39 to +50
import { AIGraphReadmeQueue, AIGraphReadmeWorker } from './workers/AIGraphReadme.js';
import { fastifyLoggerId, createS3ClientConfig, extractS3BucketName, isGoogleCloudStorageUrl } from './util.js';

import { ApiKeyRepository } from './repositories/ApiKeyRepository.js';
import { createDeleteOrganizationWorker, DeleteOrganizationQueue } from './workers/DeleteOrganizationWorker.js';
import { DeleteOrganizationWorker, DeleteOrganizationQueue } from './workers/DeleteOrganization.js';
import {
createDeleteOrganizationAuditLogsWorker,
DeleteOrganizationAuditLogsWorker,
DeleteOrganizationAuditLogsQueue,
} from './workers/DeleteOrganizationAuditLogsWorker.js';
import {
createDeactivateOrganizationWorker,
DeactivateOrganizationQueue,
} from './workers/DeactivateOrganizationWorker.js';
import { createDeleteUserWorker, DeleteUserQueue } from './workers/DeleteUserQueue.js';
import {
createReactivateOrganizationWorker,
ReactivateOrganizationQueue,
} from './workers/ReactivateOrganizationWorker.js';
} from './workers/DeleteOrganizationAuditLogs.js';
import { DeactivateOrganizationWorker, DeactivateOrganizationQueue } from './workers/DeactivateOrganization.js';
import { DeleteUserWorker, DeleteUserQueue } from './workers/DeleteUser.js';
import { ReactivateOrganizationWorker, ReactivateOrganizationQueue } from './workers/ReactivateOrganization.js';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for old factory function calls and imports that should have been migrated

echo "=== Searching for factory function calls ==="
rg -n 'create(AIGraphReadme|DeleteOrganization|DeactivateOrganization|DeleteUser|ReactivateOrganization|DeleteOrganizationAuditLogs)Worker' --type-list

echo ""
echo "=== Searching for factory function calls (broader search) ==="
rg -n 'create[A-Z].*Worker\(' -A 2

echo ""
echo "=== Checking for 'createXxxWorker' pattern imports ==="
rg -n "import.*\{.*create.*Worker" --type ts --type js

echo ""
echo "=== Checking entire build-server.ts for old patterns ==="
rg -n "create.*Worker" controlplane/src/core/build-server.ts

Repository: wundergraph/cosmo

Length of output: 50782


🏁 Script executed:

#!/bin/bash
# Search for old factory function calls

echo "=== Searching for factory function calls (createXxxWorker) ==="
rg -n 'create[A-Z][a-zA-Z]*Worker\(' --type ts --type js controlplane/src/core/

echo ""
echo "=== Searching for old factory imports in build-server.ts ==="
rg -n 'import.*create.*Worker' controlplane/src/core/build-server.ts

echo ""
echo "=== Checking for any remaining 'createXxxWorker' patterns in entire controlplane directory ==="
rg -n 'create(AIGraphReadme|DeleteOrganization|DeactivateOrganization|DeleteUser|ReactivateOrganization|DeleteOrganizationAuditLogs)Worker' --type ts --type js controlplane/

echo ""
echo "=== Verify new pattern is used - check for 'new XxxWorker' instantiations ==="
rg -n 'new [A-Z][a-zA-Z]*Worker\(' controlplane/src/core/build-server.ts | head -20

Repository: wundergraph/cosmo

Length of output: 2609


🏁 Script executed:

#!/bin/bash
# Check if the factory functions are still exported from worker files

echo "=== Check DeleteOrganizationWorker.ts for factory function export ==="
rg -A 5 'export.*createDeleteOrganizationWorker' controlplane/src/core/workers/DeleteOrganizationWorker.ts | head -20

echo ""
echo "=== Check DeleteOrganizationAuditLogsWorker.ts for factory function export ==="
rg -A 5 'export.*createDeleteOrganizationAuditLogsWorker' controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts | head -20

echo ""
echo "=== Check ReactivateOrganizationWorker.ts for factory function export ==="
rg -A 5 'export.*createReactivateOrganizationWorker' controlplane/src/core/workers/ReactivateOrganizationWorker.ts | head -20

echo ""
echo "=== Summary: Check if any factory functions remain in worker exports ==="
fd -e ts -e js controlplane/src/core/workers/ -x grep -l 'export.*create.*Worker' {} \;

Repository: wundergraph/cosmo

Length of output: 1109


🏁 Script executed:

#!/bin/bash
# Find where worker files actually exist

echo "=== Find all worker-related files ==="
find controlplane -name "*Worker*" -type f | head -20

echo ""
echo "=== Check what's in workers directory ==="
find controlplane -type d -name "workers" | head -5

echo ""
echo "=== List files in workers directories ==="
find controlplane -type d -name "workers" -exec ls -la {} \;

Repository: wundergraph/cosmo

Length of output: 929


🏁 Script executed:

#!/bin/bash
# Check the exports and structure of the worker files

echo "=== Check DeleteOrganization.ts exports ==="
rg -A 2 '^export' controlplane/src/core/workers/DeleteOrganization.ts | head -30

echo ""
echo "=== Check AIGraphReadme.ts exports ==="
rg -A 2 '^export' controlplane/src/core/workers/AIGraphReadme.ts | head -30

echo ""
echo "=== Check if any factory function still exists anywhere in worker files ==="
rg 'export.*function.*create.*Worker' controlplane/src/core/workers/ | wc -l

echo ""
echo "=== Look for 'create' pattern in any worker file ==="
rg 'export.*create' controlplane/src/core/workers/ --type ts

Repository: wundergraph/cosmo

Length of output: 1027


Update test files to use the new class-based worker pattern.

The migration to class-based workers in build-server.ts is complete, but old factory function imports remain in test files and will fail at runtime. Update controlplane/test/restore-organization.test.ts, delete-organization.test.ts, delete-audit-logs.test.ts, and deactivate-org.test.ts to use the new new XxxWorker({...}) instantiation pattern instead of createXxxWorker() factory functions, which no longer exist in the worker module exports.

@github-actions github-actions bot removed the Stale label Dec 26, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants