A scalable, fault-tolerant CSV ingestion pipeline designed to handle large datasets with high accuracy, modular processing, and real-time progress tracking.
This system is designed to:
- Process large CSV files efficiently using streaming & batching
- Ensure data consistency with atomic DB operations
- Support modular business logic (Vehicle, Insurance, VAS, Offers)
- Provide real-time progress updates
- Maintain fault tolerance with retry mechanisms and row-level isolation
βββββββββββββββββββββββββββββ
β ROW (JSON) β
β (get from worker) β
ββββββββββββββ¬βββββββββββββββ
β
βΌ
ββββββββββββββββββββββββββββββββββββ
β STEP 1: KEY NORMALIZER β
ββββββ¬βββββββββββββββββββββββββ¬βββββ
β β
[Header Mapping] β - Maps "VC Code" β "vc_code"
β - Maps "branch_1_(Hub)" β "branch_1"
β - Maps "VAS_Name1" β "vas_name1"
βΌ
ββββββββββββββββββββββββββββββββββββ
β STEP 2: SECTION DETECTION β
β (vas, insurance etc) β
ββββββ¬βββββββββββββββββββββββββ¬βββββ
β β
[Module Triggering] β - if 'vc_code' β Set hasVehicle
β - if 'offer1' β Set hasOffers
β - if 'insurance1' β Set hasInsurance
βΌ
ββββββββββββββββββββββββββββββββββββ
β STEP 3: MODEL CREATE/UPDATE β
ββββββ¬βββββββββββββββββββββββββ¬βββββ
β β
[Atomic Operation pp] β - try to find model by:
β {vc_code, model, variant, color, year}
β - Action: findOne β Update OR Create
βΌ
ββββββββββββββββββββββββββββββββββββββββββββββββ
β STEP 4: SUBTASKS (REQUIRE model_id) β
ββββββββ¬ββββββββββββββββ¬ββββββββββββββββββββββββ
β β
ββββββββββββββββββββ΄ββββββββββββββββΌββββββββββββββββββββββββββββββββ
β β β β
βΌ βΌ βΌ βΌ
INSURANCE ACCESSORIES VAS OFFERS
- findOrCreate - findOrCreate Acc - findOrCreate VAS - STRICT FIND Offer:
(LOWER match) (master table) (master table) **NO auto-create**
- upsert Base Price - upsert Model link- upsert Model link - If missing: Log Error
- upsert Addon - store model-spec - parse duration - upsert Model link
(ModelInsAddon) price - status: visible=true
β β β β
ββββββββββββ¬ββββββββ΄ββββββββββββββββ΄ββββββββββββββββ¬ββββββββββββββββ
β β
βΌ βΌ
[ACCURACY AUDIT] [FINAL CONSOLIDATION]
- Stats: {inserted, updated} - return stats object
- Row-level isolation: - push parsing/logic errors
(Offer error β Job failure) to errors array (max 1000)
β
βΌ
βββββββββββββββββββββββββββββ
β DB COMMITMENT β
β - Seq (ORM) transactions β
β - Real-time progress % β
βββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββ
β FRONTEND β
β (CsvUploader.jsx) β
ββββββββββββββ¬βββββββββββββββ
β
βββββββββββββββββββββββββΌβββββββββββββββββββββββββ
β β β
βΌ βΌ βΌ
[1] User selects file [2] Upload in parts [3] Start process
GET /presigned-url POST /proxy-upload POST /start
β β β
ββββββββββββββββ¬βββββββββ΄βββββββββββββββ¬ββββββββββ
βΌ βΌ
βββββββββββββββββββββββββββββ
β BACKEND API β
β (import.controller) β
ββββββββββββββ¬βββββββββββββββ
β
ββββββββββββββββββββββββββββββββΌβββββββββββββββββββββββββββββββ
β β β
βΌ βΌ βΌ
[File Row Logic] [System Check] [creat job Send to Queue]
- count total parts - check Redis - queue: csv-import
- create uploadId - check DB - send:
- check worker { jobId, fileKey }
=>
βββββββββββββββββββββββββββββ
β REDIS (BullMQ) β
β - job queue β
β - retry if failed (3x) β
β - track progress β
ββββββββββββββ¬βββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββ
β WORKER β
β (Background Processor) β
ββββββββββββββ¬βββββββββββββββ
β
βββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββ
β β β
βΌ βΌ βΌ
[Get File] [Read + Detect] [Process in Batches]
- stream file - read headers for each batch:
- donβt load - detect sections: - clean data
full file Vehicles / Offers / VAS - validate data
- insert/update DB
β
βΌ
[Save to Database]
βββββββββββββββββββββββββββββ
β DATABASE β
β - insert or update β
β - avoid duplicates β
ββββββββββββββ¬βββββββββββββββ
β
βΌ
[Progress Update]
- count success / fail
- calculate %
- store in Redis
- UI via WebSocket
REDIS_HOST=127.0.0.1
REDIS_PORT=6379
pm2 start src/server.js --name "quotation-api"
pm2 start index.js --name "quotation-worker"
- Scalable architecture for large CSV ingestion
- Memory-efficient streaming (no full file load)
- Retry mechanism (3x) using BullMQ
- Strong data consistency with transactional operations
- Modular processing (Insurance, VAS, Offers, Accessories)
- Real-time progress tracking via WebSocket
- Robust error isolation (row-level fault tolerance)