Skip to content

Commit 5341a7a

Browse files
committed
Refactor KafkaLoader for robustness
1 parent 1ff9187 commit 5341a7a

File tree

5 files changed

+47
-40
lines changed

5 files changed

+47
-40
lines changed

apps/kafka_streaming_loader_guide.md

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,11 +188,42 @@ uv run python apps/kafka_consumer.py anvil_logs localhost:9092 my-group
188188

189189
## Docker Usage
190190

191+
### Build the loader image
192+
191193
```bash
192-
# Build image
193194
docker build -f Dockerfile.kafka -t amp-kafka .
195+
```
196+
197+
### Quick demo: local Kafka with Docker
198+
199+
This section runs a single-broker Kafka in Docker for quick testing. For production, point `--kafka-brokers` at your real Kafka cluster and skip this section.
200+
201+
Start a single-broker Kafka using `confluentinc/cp-kafka`:
202+
203+
```bash
204+
docker network create kafka-net
205+
206+
docker run -d --name kafka --network kafka-net -p 9092:9092 \
207+
-e KAFKA_NODE_ID=1 \
208+
-e KAFKA_PROCESS_ROLES=broker,controller \
209+
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 \
210+
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092 \
211+
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,EXTERNAL://localhost:9092 \
212+
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT \
213+
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
214+
-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \
215+
-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \
216+
-e CLUSTER_ID=MkU3OEVBNTcwNTJENDM2Qk \
217+
confluentinc/cp-kafka:latest
218+
```
219+
220+
This configures two listeners:
221+
- `kafka:29092` — for containers on the `kafka-net` network (use this from the loader)
222+
- `localhost:9092` — for host access (use this from `uv run` or the consumer script)
194223

195-
# Run loader (with auth via env var)
224+
### Run the loader
225+
226+
```bash
196227
docker run -d \
197228
--name amp-kafka-loader \
198229
--network kafka-net \
@@ -201,7 +232,7 @@ docker run -d \
201232
-v $(pwd)/.amp_state:/data/state \
202233
amp-kafka \
203234
--amp-server 'grpc+tls://gateway.amp.staging.thegraph.com:443' \
204-
--kafka-brokers kafka:9092 \
235+
--kafka-brokers kafka:29092 \
205236
--topic erc20_transfers \
206237
--query-file /data/queries/erc20_transfers_activity.sql \
207238
--raw-dataset 'edgeandnode/ethereum_mainnet' \
@@ -214,7 +245,11 @@ docker run -d \
214245
docker logs -f amp-kafka-loader
215246
```
216247

217-
Note: Ensure your Kafka container is on the same Docker network (`kafka-net`) and advertises the correct listener (`kafka:9092`).
248+
### Consume messages from host
249+
250+
```bash
251+
uv run python apps/kafka_consumer.py erc20_transfers localhost:9092
252+
```
218253

219254
## Getting Help
220255

src/amp/loaders/implementations/kafka_loader.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import json
2-
from dataclasses import dataclass
2+
from dataclasses import dataclass, fields
33
from typing import Any, Dict, List, Optional
44

55
import pyarrow as pa
@@ -18,7 +18,7 @@ class KafkaConfig:
1818
reorg_topic: Optional[str] = None
1919

2020

21-
KAFKA_CONFIG_FIELDS = {'bootstrap_servers', 'client_id', 'key_field', 'reorg_topic'}
21+
KAFKA_CONFIG_FIELDS = {f.name for f in fields(KafkaConfig)}
2222
RESERVED_CONFIG_FIELDS = {'resilience', 'state', 'checkpoint', 'idempotency'}
2323

2424

@@ -40,11 +40,11 @@ def _get_required_config_fields(self) -> list[str]:
4040
def connect(self) -> None:
4141
try:
4242
producer_config = {
43+
**self._extra_producer_config,
4344
'bootstrap_servers': self.config.bootstrap_servers,
4445
'client_id': self.config.client_id,
4546
'value_serializer': lambda x: json.dumps(x, default=str).encode('utf-8'),
4647
'transactional_id': f'{self.config.client_id}-txn',
47-
**self._extra_producer_config,
4848
}
4949
if self._extra_producer_config:
5050
self.logger.info(f'Extra Kafka config: {list(self._extra_producer_config.keys())}')
@@ -67,6 +67,9 @@ def connect(self) -> None:
6767
self._is_connected = True
6868

6969
except Exception as e:
70+
if self._producer:
71+
self._producer.close()
72+
self._producer = None
7073
self.logger.error(f'Failed to connect to Kafka: {e}')
7174
raise
7275

@@ -135,7 +138,7 @@ def _handle_reorg(self, invalidation_ranges: List[BlockRange], table_name: str,
135138
Args:
136139
invalidation_ranges: List of block ranges to invalidate
137140
table_name: The Kafka topic name (used if reorg_topic not configured)
138-
connection_name: Connection identifier (unused for Kafka, but required by base class)
141+
connection_name: Connection identifier (required by base class interface)
139142
"""
140143
if not invalidation_ranges:
141144
return

src/amp/streaming/types.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -180,11 +180,3 @@ def to_json(self) -> str:
180180

181181
data[r.network] = {'number': r.end, 'hash': r.hash}
182182
return json.dumps(data)
183-
184-
# TODO: ResumeWatermark.from_json appears to be unused. Remove?
185-
@classmethod
186-
def from_json(cls, json_str: str) -> 'ResumeWatermark':
187-
"""Deserialize from JSON string"""
188-
data = json.loads(json_str)
189-
ranges = [BlockRange.from_dict(r) for r in data['ranges']]
190-
return cls(ranges=ranges, timestamp=data.get('timestamp'), sequence=data.get('sequence'))

tests/unit/test_crash_recovery.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
import pytest
1010

11-
from src.amp.loaders.base import LoadResult
1211
from src.amp.streaming.types import BlockRange, ResumeWatermark
1312
from tests.fixtures.mock_clients import MockDataLoader
1413

tests/unit/test_streaming_types.py

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -369,29 +369,7 @@ def test_to_json_minimal_data(self):
369369
assert data['ethereum']['number'] == 200
370370
assert data['ethereum']['hash'] == '0xabc123'
371371

372-
# TODO: ResumeWatermark.from_json appears to be unused. Remove?
373-
def test_from_json_full_data(self):
374-
"""Test deserializing watermark with all fields"""
375-
json_str = json.dumps(
376-
{
377-
'ranges': [
378-
{'network': 'ethereum', 'start': 100, 'end': 200},
379-
{'network': 'polygon', 'start': 50, 'end': 150},
380-
],
381-
'timestamp': '2024-01-01T00:00:00Z',
382-
'sequence': 42,
383-
}
384-
)
385-
386-
watermark = ResumeWatermark.from_json(json_str)
387-
388-
assert len(watermark.ranges) == 2
389-
assert watermark.ranges[0].network == 'ethereum'
390-
assert watermark.timestamp == '2024-01-01T00:00:00Z'
391-
assert watermark.sequence == 42
392-
393-
def test_round_trip_serialization(self):
394-
"""Test that to_json produces server format (not reversible via from_json)"""
372+
def test_to_json_server_format(self):
395373
watermark = ResumeWatermark(
396374
ranges=[
397375
BlockRange(network='ethereum', start=100, end=200, hash='0xabc123'),

0 commit comments

Comments
 (0)