Skip to content

Mukeshdixena/csv-processing-pipeline

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

5 Commits
Β 
Β 

Repository files navigation

πŸš€ CSV Upload & Processing System

A scalable, fault-tolerant CSV ingestion pipeline designed to handle large datasets with high accuracy, modular processing, and real-time progress tracking.


🧠 System Overview

This system is designed to:

  • Process large CSV files efficiently using streaming & batching
  • Ensure data consistency with atomic DB operations
  • Support modular business logic (Vehicle, Insurance, VAS, Offers)
  • Provide real-time progress updates
  • Maintain fault tolerance with retry mechanisms and row-level isolation

πŸ”„ Row Processing Pipeline (Detailed Flow)

                                β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                                β”‚      ROW (JSON)           β”‚
                                β”‚   (get from worker)       β”‚
                                β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                             β”‚
                                             β–Ό
                          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                          β”‚    STEP 1: KEY NORMALIZER        β”‚
                          β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜
                               β”‚                        β”‚
         [Header Mapping]      β”‚  - Maps "VC Code" β†’ "vc_code"
                               β”‚  - Maps "branch_1_(Hub)" β†’ "branch_1"
                               β”‚  - Maps "VAS_Name1" β†’ "vas_name1"

                               β–Ό
                          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                          β”‚    STEP 2: SECTION DETECTION     β”‚
                          β”‚     (vas, insurance etc)         β”‚
                          β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜
                               β”‚                        β”‚
        [Module Triggering]    β”‚  - if 'vc_code'     β†’ Set hasVehicle
                               β”‚  - if 'offer1'      β†’ Set hasOffers
                               β”‚  - if 'insurance1'  β†’ Set hasInsurance

                               β–Ό
                          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                          β”‚   STEP 3: MODEL CREATE/UPDATE    β”‚
                          β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜
                               β”‚                        β”‚
    [Atomic Operation pp]      β”‚  - try to find model by:
                               β”‚    {vc_code, model, variant, color, year}
                               β”‚  - Action: findOne β†’ Update OR Create

                               β–Ό
                  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                  β”‚  STEP 4: SUBTASKS (REQUIRE model_id)         β”‚
                  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         β”‚               β”‚
      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
      β”‚                  β”‚               β”‚                               β”‚
      β–Ό                  β–Ό               β–Ό                               β–Ό
  INSURANCE          ACCESSORIES          VAS                           OFFERS

- findOrCreate      - findOrCreate Acc - findOrCreate VAS            - STRICT FIND Offer:
  (LOWER match)       (master table)     (master table)               **NO auto-create**
- upsert Base Price - upsert Model link- upsert Model link           - If missing: Log Error
- upsert Addon      - store model-spec - parse duration              - upsert Model link
  (ModelInsAddon)     price                                          - status: visible=true

      β”‚                  β”‚               β”‚                               β”‚
      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                 β”‚                                       β”‚
                 β–Ό                                       β–Ό
        [ACCURACY AUDIT]                         [FINAL CONSOLIDATION]
        - Stats: {inserted, updated}             - return stats object
        - Row-level isolation:                   - push parsing/logic errors
          (Offer error β‰  Job failure)              to errors array (max 1000)

                               β”‚
                               β–Ό
                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β”‚       DB COMMITMENT       β”‚
                    β”‚ - Seq (ORM) transactions  β”‚
                    β”‚ - Real-time progress %    β”‚
                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ“ File Upload Architecture (End-to-End Flow)

                                β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                                β”‚        FRONTEND           β”‚
                                β”‚    (CsvUploader.jsx)      β”‚
                                β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                             β”‚
                     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                     β”‚                       β”‚                        β”‚
                     β–Ό                       β–Ό                        β–Ό

        [1] User selects file     [2] Upload in parts       [3] Start process
        GET /presigned-url        POST /proxy-upload        POST /start

                     β”‚                       β”‚                        β”‚
                     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                    β–Ό                       β–Ό

                          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                          β”‚        BACKEND API        β”‚
                          β”‚   (import.controller)     β”‚
                          β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                       β”‚
        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
        β”‚                              β”‚                              β”‚
        β–Ό                              β–Ό                              β–Ό

   [File Row Logic]          [System Check]              [creat job Send to Queue]
   - count total parts       - check Redis               - queue: csv-import
   - create uploadId         - check DB                  - send:
                             - check worker                { jobId, fileKey }

  =>
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚      REDIS (BullMQ)       β”‚
β”‚  - job queue              β”‚
β”‚  - retry if failed (3x)   β”‚
β”‚  - track progress         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
             β”‚
             β–Ό

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚         WORKER            β”‚
β”‚   (Background Processor)  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
             β”‚
   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
   β”‚         β”‚                                               β”‚
   β–Ό         β–Ό                                               β–Ό

 [Get File]   [Read + Detect]                      [Process in Batches]
 - stream file  - read headers                     for each batch:
 - don’t load   - detect sections:                 - clean data
   full file      Vehicles / Offers / VAS          - validate data
                                                     - insert/update DB

   β”‚
   β–Ό

[Save to Database]
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚         DATABASE          β”‚
β”‚  - insert or update       β”‚
β”‚  - avoid duplicates       β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
             β”‚
             β–Ό

[Progress Update]
- count success / fail
- calculate %
- store in Redis
- UI via WebSocket

βš™οΈ Deployment

REDIS_HOST=127.0.0.1
REDIS_PORT=6379
pm2 start src/server.js --name "quotation-api"
pm2 start index.js --name "quotation-worker"

πŸ’‘ Key Highlights

  • Scalable architecture for large CSV ingestion
  • Memory-efficient streaming (no full file load)
  • Retry mechanism (3x) using BullMQ
  • Strong data consistency with transactional operations
  • Modular processing (Insurance, VAS, Offers, Accessories)
  • Real-time progress tracking via WebSocket
  • Robust error isolation (row-level fault tolerance)

About

Distributed CSV ingestion and processing system featuring streaming uploads, BullMQ workers, Redis-backed queues, transactional database operations, and real-time progress tracking.

Topics

Resources

Stars

Watchers

Forks

Packages

 
 
 

Contributors