Skip to content

Download data by chunks without redownloading existing data #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/app.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
framework:
cache:
prefix_seed: 'stochastix.download'

messenger:
transports:
stochastix: '%env(MESSENGER_TRANSPORT_DSN)%'
Expand Down
9 changes: 9 additions & 0 deletions config/services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,12 @@ services:
- '../src/**/Exception'
- '../src/**/Event'
- '../src/**/Model'

stochastix.download.cancel.cache:
class: Symfony\Component\Cache\Adapter\FilesystemAdapter
arguments:
- 'stochastix.download.cancel'
- 3600
- '%kernel.project_dir%/data/.cache'
tags:
- { name: 'cache.pool', namespace: 'stochastix.download' }
36 changes: 36 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,27 @@ This document outlines the RESTful API endpoints provided by the `stochastix-cor
---
### Market Data

* **`GET /api/data/exchanges`**
* **Description:** Retrieves a sorted list of all exchange IDs supported by the backend (via the CCXT library).
* **Requires:** None
* **Request Body:** None
* **Success Response:** `200 OK`
* **Example Success Response Body:**
```json
[
"ace",
"alpaca",
"ascendex",
"bequant",
"bigone",
"binance",
"binancecoinm",
"binanceus",
"binanceusdm",
"bingx"
]
```

* **`GET /api/data-availability`**
* **Description:** Scans the server for available market data (`.stchx` files) and returns a manifest detailing available symbols, their timeframes, and the start/end dates for each dataset.
* **Requires:** None
Expand Down Expand Up @@ -114,6 +135,21 @@ This document outlines the RESTful API endpoints provided by the `stochastix-cor
* `400 Bad Request`: Invalid input (e.g., validation errors in the request body, end date before start date).
* `500 Internal Server Error`: If the download message could not be dispatched to the queue.

* **`DELETE /api/data/download/{jobId}`**
* **Description:** Requests the cancellation of a running download job. The cancellation may take a few moments to take effect, as it is checked between data chunk fetches.
* **Requires:** None
* **URL Parameters:**
* `jobId` (string, required): The unique ID of the download job to cancel.
* **Request Body:** None
* **Success Response:** `202 Accepted`
* **Example Success Response Body:**
```json
{
"status": "cancellation_requested",
"jobId": "download_666c1e5a7b8d9"
}
```
* **Error Responses:** None.

* **`GET /api/data/inspect/{exchangeId}/{symbol}/{timeframe}`**
* **Description:** Inspects a specific market data file (`.stchx`). Returns the file's header metadata, a sample of the first and last records, and a full data consistency validation report (checking for gaps, duplicates, and out-of-order records).
Expand Down
14 changes: 13 additions & 1 deletion src/Command/DataDownloadCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ protected function configure(): void
'E',
InputOption::VALUE_OPTIONAL,
'The UTC end date/time (Format: Y-m-d[THH:MM:SS]). Defaults to "now".'
)
->addOption(
'force',
'f',
InputOption::VALUE_NONE,
'Force re-download and overwrite of the entire date range, even if data exists.'
);
}

Expand All @@ -74,6 +80,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$exchange = $input->getArgument('exchange');
$symbol = $input->getArgument('symbol');
$timeframe = $input->getArgument('timeframe');
$forceOverwrite = $input->getOption('force');

$startDateStr = $input->getOption('start-date');
$endDateStr = $input->getOption('end-date');
Expand All @@ -89,6 +96,10 @@ protected function execute(InputInterface $input, OutputInterface $output): int
return Command::INVALID;
}

if ($forceOverwrite) {
$io->warning('Force mode enabled: Existing data in the specified range will be overwritten.');
}

$io->title('🚀 Stochastix OHLCV Data Downloader 🚀');
$io->newLine();
$io->section('Download Progress');
Expand Down Expand Up @@ -116,7 +127,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$symbol,
$timeframe,
$startDate,
$endDate
$endDate,
$forceOverwrite
);

$this->progressBar->finish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,14 @@ public function __invoke(RunBacktestMessage $message): void

private function publishUpdate(string $topic, array $data): void
{
$update = new Update($topic, json_encode($data, JSON_THROW_ON_ERROR));
$this->mercureHub->publish($update);
try {
$update = new Update($topic, json_encode($data, JSON_THROW_ON_ERROR));
$this->mercureHub->publish($update);
} catch (\Throwable $e) {
$this->logger->warning('Failed to publish update to Mercure. The backtest will continue.', [
'topic' => $topic,
'error' => $e->getMessage(),
]);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private function generateFilePath(string $exchangeId, string $symbol, string $ti
rtrim($this->baseDataPath, '/'),
strtolower($exchangeId),
strtoupper($sanitizedSymbol),
strtolower($timeframe)
$timeframe
);
}
}
41 changes: 41 additions & 0 deletions src/Domain/Data/Controller/CancelDownloadAction.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

namespace Stochastix\Domain\Data\Controller;

use Psr\Cache\CacheItemPoolInterface;
use Psr\Cache\InvalidArgumentException;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\DependencyInjection\Attribute\Autowire;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\HttpKernel\Attribute\AsController;
use Symfony\Component\Routing\Attribute\Route;

#[AsController]
#[Route('/api/data/download/{jobId}', name: 'stochastix_api_data_cancel_download', methods: ['DELETE'])]
class CancelDownloadAction extends AbstractController
{
public function __construct(
#[Autowire(service: 'stochastix.download.cancel.cache')]
private readonly CacheItemPoolInterface $cache,
) {
}

/**
* @throws InvalidArgumentException
*/
public function __invoke(string $jobId): JsonResponse
{
$cacheKey = 'download.cancel.' . $jobId;
$item = $this->cache->getItem($cacheKey);

$item->set(true);
$item->expiresAfter(3600);
$this->cache->save($item);

return $this->json(
['status' => 'cancellation_requested', 'jobId' => $jobId],
Response::HTTP_ACCEPTED
);
}
}
23 changes: 23 additions & 0 deletions src/Domain/Data/Controller/GetExchangesAction.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

namespace Stochastix\Domain\Data\Controller;

use Stochastix\Domain\Data\Service\MarketDataService;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpKernel\Attribute\AsController;
use Symfony\Component\Routing\Attribute\Route;

#[AsController]
#[Route('/api/data/exchanges', name: 'stochastix_api_data_exchanges', methods: ['GET'])]
class GetExchangesAction extends AbstractController
{
public function __construct(private readonly MarketDataService $marketDataService)
{
}

public function __invoke(): JsonResponse
{
return $this->json($this->marketDataService->getExchanges());
}
}
11 changes: 5 additions & 6 deletions src/Domain/Data/Dto/DownloadRequestDto.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@ public function __construct(
public string $startDate,
#[Assert\Date(message: 'End date must be in Y-m-d format.')]
public string $endDate,
public bool $forceOverwrite = false,
) {
}

public function validateDateRange(ExecutionContextInterface $context): void
{
if ($this->startDate !== null && $this->endDate !== null) {
if ($this->endDate < $this->startDate) {
$context->buildViolation('End date must be after or the same as start date.')
->atPath('endDate')
->addViolation();
}
if ($this->startDate !== null && $this->endDate !== null && $this->endDate < $this->startDate) {
$context->buildViolation('End date must be after or the same as start date.')
->atPath('endDate')
->addViolation();
}
}
}
18 changes: 14 additions & 4 deletions src/Domain/Data/EventSubscriber/DownloadProgressSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@

namespace Stochastix\Domain\Data\EventSubscriber;

use Psr\Log\LoggerInterface;
use Stochastix\Domain\Data\Event\DownloadProgressEvent;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Mercure\HubInterface;
use Symfony\Component\Mercure\Update;

final readonly class DownloadProgressSubscriber implements EventSubscriberInterface
{
public function __construct(private HubInterface $mercureHub)
{
public function __construct(
private HubInterface $mercureHub,
private readonly LoggerInterface $logger,
) {
}

public static function getSubscribedEvents(): array
Expand Down Expand Up @@ -40,7 +43,14 @@ public function onDownloadProgress(DownloadProgressEvent $event): void
'message' => "Fetched {$event->recordsFetchedInBatch} records up to " . gmdate('Y-m-d H:i:s', $event->lastTimestamp),
];

$update = new Update($topic, json_encode($data, JSON_THROW_ON_ERROR));
$this->mercureHub->publish($update);
try {
$update = new Update($topic, json_encode($data, JSON_THROW_ON_ERROR));
$this->mercureHub->publish($update);
} catch (\Throwable $e) {
$this->logger->warning('Failed to publish progress update to Mercure.', [
'jobId' => $event->jobId,
'error' => $e->getMessage(),
]);
}
}
}
7 changes: 7 additions & 0 deletions src/Domain/Data/Exception/DownloadCancelledException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php

namespace Stochastix\Domain\Data\Exception;

class DownloadCancelledException extends DownloaderException
{
}
7 changes: 7 additions & 0 deletions src/Domain/Data/Exception/EmptyHistoryException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php

namespace Stochastix\Domain\Data\Exception;

class EmptyHistoryException extends ExchangeException
{
}
16 changes: 14 additions & 2 deletions src/Domain/Data/MessageHandler/DownloadDataMessageHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Stochastix\Domain\Data\MessageHandler;

use Psr\Log\LoggerInterface;
use Stochastix\Domain\Data\Exception\DownloadCancelledException;
use Stochastix\Domain\Data\Message\DownloadDataMessage;
use Stochastix\Domain\Data\Service\OhlcvDownloader;
use Symfony\Component\Mercure\HubInterface;
Expand Down Expand Up @@ -46,6 +47,7 @@ public function __invoke(DownloadDataMessage $message): void
$dto->timeframe,
$startDate,
$endDate,
$dto->forceOverwrite,
$jobId
);

Expand All @@ -54,6 +56,9 @@ public function __invoke(DownloadDataMessage $message): void
'progress' => 100,
'message' => 'Download completed successfully.',
]);
} catch (DownloadCancelledException $e) {
$this->logger->info('Data download job {jobId} was cancelled.', ['jobId' => $jobId, 'reason' => $e->getMessage()]);
$this->publishUpdate($topic, ['status' => 'cancelled', 'message' => 'Download was cancelled by the user.']);
} catch (\Throwable $e) {
$this->logger->error('Data download job {jobId} failed: {message}', [
'jobId' => $jobId,
Expand All @@ -70,7 +75,14 @@ public function __invoke(DownloadDataMessage $message): void

private function publishUpdate(string $topic, array $data): void
{
$update = new Update($topic, json_encode($data, JSON_THROW_ON_ERROR));
$this->mercureHub->publish($update);
try {
$update = new Update($topic, json_encode($data, JSON_THROW_ON_ERROR));
$this->mercureHub->publish($update);
} catch (\Throwable $e) {
$this->logger->warning('Failed to publish update to Mercure. The process will continue.', [
'topic' => $topic,
'error' => $e->getMessage(),
]);
}
}
}
Loading