Skip to content

chore(SQO): Create unit tests. #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 20 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ subgraph/
contracts/contract.abi.json

# Ignore Ruff
.ruff_cache/
.ruff_cache/
/tests/__pycache__
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ The application follows a clear data flow, managed by a daily scheduler:

2. **Orchestrator (`service_quality_oracle.py`)**: For each run, this module orchestrates the end-to-end process by coordinating the other components.

3. **Data Fetching (`bigquery_data_access_provider.py`)**: The orchestrator calls this provider to execute a configurable SQL query against Google BigQuery, fetching the raw indexer performance data.
3. **Data Fetching (`bigquery_provider.py`)**: The orchestrator calls this provider to execute a configurable SQL query against Google BigQuery, fetching the raw indexer performance data.

4. **Data Processing (`eligibility_pipeline.py`)**: The raw data is passed to this module, which processes it, filters for eligible and ineligible indexers, and generates CSV artifacts for auditing and record-keeping.

Expand Down
10 changes: 10 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,13 @@ max-complexity = 10
ignore_missing_imports = true
no_strict_optional = true
explicit_package_bases = true

[tool.pytest.ini_options]
minversion = "6.0"
addopts = "-ra -q"
testpaths = [
"tests",
]
pythonpath = [
"."
]
Empty file added src/models/__init__.py
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -186,5 +186,6 @@ def fetch_indexer_issuance_eligibility_data(self, start_date: date, end_date: da
"""
# Construct the query
query = self._get_indexer_eligibility_query(start_date, end_date)

# Return the results df
return self._read_gbq_dataframe(query)
2 changes: 1 addition & 1 deletion src/models/blockchain_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ def _get_gas_prices(self, replace: bool) -> Tuple[int, int]:
latest_block_data = self._execute_rpc_call(self.w3.eth.get_block, "latest")
latest_block = cast(BlockData, latest_block_data)
base_fee_hex = latest_block["baseFeePerGas"]
base_fee = int(base_fee_hex)
base_fee = int(base_fee_hex) if isinstance(base_fee_hex, int) else int(str(base_fee_hex), 16)
logger.info(f"Latest block base fee: {base_fee/1e9:.2f} gwei")

# If the base fee cannot be retrieved, use a fallback value
Expand Down
5 changes: 5 additions & 0 deletions src/models/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ def initialize(self):

except Exception as e:
logger.error(f"Failed to initialize scheduler: {e}", exc_info=True)
if not self.slack_notifier:
webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
if webhook_url:
self.slack_notifier = create_slack_notifier(webhook_url)

if self.slack_notifier:
self.slack_notifier.send_failure_notification(
error_message=str(e), stage="Scheduler Initialization", execution_time=0
Expand Down
17 changes: 10 additions & 7 deletions src/models/service_quality_oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@
from datetime import date, timedelta
from pathlib import Path

# Add project root to path
project_root_path = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(project_root_path))

# Import data access utilities with absolute import
from src.models.bigquery_data_access_provider import BigQueryProvider
from src.models.bigquery_provider import BigQueryProvider
from src.models.blockchain_client import BlockchainClient
from src.models.eligibility_pipeline import EligibilityPipeline
from src.utils.configuration import credential_manager, load_config
from src.utils.configuration import (
credential_manager,
load_config,
)
from src.utils.slack_notifier import create_slack_notifier

# Set up basic logging
Expand All @@ -44,6 +43,7 @@ def main(run_date_override: date = None):
start_time = time.time()
slack_notifier = None
stage = "Initialization"
project_root_path = Path(__file__).resolve().parents[2]

try:
# Configuration and credentials
Expand Down Expand Up @@ -138,7 +138,10 @@ def main(run_date_override: date = None):
error_message=str(e), stage=stage, execution_time=execution_time
)
except Exception as slack_e:
logger.error(f"Failed to send Slack failure notification: {slack_e}", exc_info=True)
logger.error(
f"Failed to send Slack failure notification: {slack_e}",
exc_info=True,
)

sys.exit(1)

Expand Down
181 changes: 181 additions & 0 deletions tests/test_bigquery_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
"""
Unit tests for the BigQueryProvider.
"""

from datetime import date
from unittest.mock import MagicMock, patch

import pandas as pd
import pytest

from src.models.bigquery_provider import BigQueryProvider

# Mock configuration values
MOCK_PROJECT = "test-project"
MOCK_LOCATION = "test-location"
MOCK_TABLE_NAME = "test.dataset.table"
MOCK_MIN_ONLINE_DAYS = 5
MOCK_MIN_SUBGRAPHS = 10
MOCK_MAX_LATENCY_MS = 5000
MOCK_MAX_BLOCKS_BEHIND = 50000


@pytest.fixture
def mock_bpd():
"""Fixture to mock the bigframes.pandas module."""
with patch("src.models.bigquery_provider.bpd") as mock_bpd_module:
# We need to mock the nested attribute access `bpd.options.bigquery`
# and then allow attributes to be set on it.
mock_options = MagicMock()
mock_bigquery = MagicMock()
mock_options.bigquery = mock_bigquery
mock_bpd_module.options = mock_options
yield mock_bpd_module


@pytest.fixture
def provider(mock_bpd: MagicMock) -> BigQueryProvider:
"""Fixture to create a BigQueryProvider instance with mocked dependencies."""
return BigQueryProvider(
project=MOCK_PROJECT,
location=MOCK_LOCATION,
table_name=MOCK_TABLE_NAME,
min_online_days=MOCK_MIN_ONLINE_DAYS,
min_subgraphs=MOCK_MIN_SUBGRAPHS,
max_latency_ms=MOCK_MAX_LATENCY_MS,
max_blocks_behind=MOCK_MAX_BLOCKS_BEHIND,
)


# 1. Test Initialization


def test_initialization(provider: BigQueryProvider, mock_bpd: MagicMock):
"""
Tests that BigQueryProvider initializes correctly, setting BigQuery options and instance variables.
"""
# Assertions
# Check that BigQuery options were configured
assert mock_bpd.options.bigquery.project == MOCK_PROJECT
assert mock_bpd.options.bigquery.location == MOCK_LOCATION

# Check that instance variables are set correctly
assert provider.table_name == MOCK_TABLE_NAME
assert provider.min_online_days == MOCK_MIN_ONLINE_DAYS
assert provider.min_subgraphs == MOCK_MIN_SUBGRAPHS
assert provider.max_latency_ms == MOCK_MAX_LATENCY_MS
assert provider.max_blocks_behind == MOCK_MAX_BLOCKS_BEHIND


# 2. Test Query Construction


def test_get_indexer_eligibility_query_constructs_correctly(provider: BigQueryProvider):
"""
Tests that _get_indexer_eligibility_query constructs a query string that
contains all the dynamic configuration parameters.
"""
# 1. Action
start_date_val = date(2025, 1, 1)
end_date_val = date(2025, 1, 28)
query = provider._get_indexer_eligibility_query(start_date=start_date_val, end_date=end_date_val)

# 2. Assertions
assert isinstance(query, str)
assert MOCK_TABLE_NAME in query
assert str(MOCK_MAX_LATENCY_MS) in query
assert str(MOCK_MAX_BLOCKS_BEHIND) in query
assert str(MOCK_MIN_SUBGRAPHS) in query
assert str(MOCK_MIN_ONLINE_DAYS) in query
assert start_date_val.strftime("%Y-%m-%d") in query
assert end_date_val.strftime("%Y-%m-%d") in query


# 3. Test Data Reading


def test_read_gbq_dataframe_success(provider: BigQueryProvider, mock_bpd: MagicMock):
"""
Tests the success case for _read_gbq_dataframe, ensuring it returns a DataFrame.
"""
# 1. Setup
mock_df = pd.DataFrame({"col1": [1, 2]})

# The call chain is bpd.read_gbq(query).to_pandas()
mock_bpd.read_gbq.return_value.to_pandas.return_value = mock_df

# 2. Action
result_df = provider._read_gbq_dataframe("SELECT * FROM table")

# 3. Assertions
mock_bpd.read_gbq.assert_called_once_with("SELECT * FROM table")
mock_bpd.read_gbq.return_value.to_pandas.assert_called_once()
pd.testing.assert_frame_equal(result_df, mock_df)
assert "col1" in result_df.columns


def test_read_gbq_dataframe_retry_and_fail():
"""
Tests that _read_gbq_dataframe retries on connection errors and eventually fails.
"""
# 1. Setup
error_to_raise = ConnectionError("Test connection error")
mock_func = MagicMock(side_effect=error_to_raise)

# Directly mock the tenacity retry decorator to just call the function
with patch("tenacity.retry", side_effect=lambda *args, **kwargs: lambda f: mock_func):
with patch("src.models.bigquery_provider.bpd") as mock_bpd:
mock_bpd.read_gbq.side_effect = error_to_raise

provider = BigQueryProvider(
project="test-proj",
location="us-central1",
table_name="test-tbl",
min_online_days=5,
min_subgraphs=10,
max_latency_ms=5000,
max_blocks_behind=100,
)

# 2. Action and Assertion
with pytest.raises(ConnectionError):
# We can't test the retries directly anymore, so we just check for failure
provider._read_gbq_dataframe("SELECT * FROM table")


# 4. Test Orchestration


def test_fetch_indexer_issuance_eligibility_data_orchestration(provider: BigQueryProvider):
"""
Tests that the main `fetch_indexer_issuance_eligibility_data` method correctly
orchestrates calls to its internal helper methods.
"""
# 1. Setup
start_date_val = date(2025, 1, 1)
end_date_val = date(2025, 1, 28)
mock_query = "SELECT * FROM mock_table;"
mock_df = pd.DataFrame({"eligible": [1]})

# Mock the internal methods
provider._get_indexer_eligibility_query = MagicMock(return_value=mock_query)
provider._read_gbq_dataframe = MagicMock(return_value=mock_df)

# 2. Action
result_df = provider.fetch_indexer_issuance_eligibility_data(
start_date=start_date_val,
end_date=end_date_val,
)

# 3. Assertions
# Verify that the query builder was called correctly
provider._get_indexer_eligibility_query.assert_called_once_with(
start_date_val,
end_date_val,
)

# Verify that the data reader was called with the query from the previous step
provider._read_gbq_dataframe.assert_called_once_with(mock_query)

# Verify that the final result is the DataFrame from the reader
pd.testing.assert_frame_equal(result_df, mock_df)
Loading