Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
34ccc5c
implement a Kafka loader
incrypto32 Nov 17, 2025
20c0b66
Add kafka streaming loader app
incrypto32 Nov 20, 2025
afa2f4e
Refactor kafka streaming loader and fix Resume watermark
incrypto32 Nov 24, 2025
91b0b4d
use transactions in kafka loader
incrypto32 Nov 24, 2025
aa7dffb
Fix unit tests failing
incrypto32 Nov 25, 2025
104bc54
Add more integration tests for kafka loader
incrypto32 Nov 25, 2025
82299c2
Add a simple kafka consumer script
incrypto32 Dec 1, 2025
83d9763
Dockerize kafka loader
incrypto32 Dec 1, 2025
d7fbdbb
use lmdb stream state for kafka loader
incrypto32 Dec 1, 2025
465ce06
Docs for kafka loader
incrypto32 Dec 2, 2025
e89d88d
Add example query with specific schema
incrypto32 Dec 3, 2025
dca4493
Udpate docs for kafka loader and fix query
incrypto32 Dec 15, 2025
1e4beaa
Quote dataset names in SQL queries to support slashes
incrypto32 Jan 11, 2026
8c3bb2e
Pass data_dir config to LMDB state store in Kafka loader
incrypto32 Jan 11, 2026
769438c
Add --state-dir CLI parameter for LMDB state storage path
incrypto32 Jan 11, 2026
6beb402
Add auth, retry logic, and proper logging to Kafka streaming loader
incrypto32 Jan 12, 2026
5eccfda
Add pass-through Kafka producer config via --kafka-config
incrypto32 Jan 13, 2026
5e0b721
Add last_valid_hash to reorg messages
incrypto32 Jan 13, 2026
aa821ff
Add block range to batch log output
incrypto32 Jan 13, 2026
76f0188
Add --reorg-topic option for separate reorg message topic
incrypto32 Jan 14, 2026
03676ad
Support --start-block=latest to start from current chain tip
incrypto32 Jan 14, 2026
149fbda
Update Kafka loader guide with new options and Docker example
incrypto32 Jan 14, 2026
1ff9187
Fix crash recovery (#29)
incrypto32 Feb 3, 2026
7db6466
Refactor KafkaLoader for robustness
incrypto32 Feb 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,4 @@ data/
# Build artifacts
*.tar.gz
*.zip
.amp_state
Empty file added .test.env
Empty file.
23 changes: 23 additions & 0 deletions Dockerfile.kafka
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
FROM python:3.12-slim

RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
curl \
&& rm -rf /var/lib/apt/lists/*

COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv

WORKDIR /app

COPY pyproject.toml README.md ./
COPY src/ ./src/
COPY apps/ ./apps/

RUN uv pip install --system --no-cache . && \
uv pip install --system --no-cache kafka-python lmdb

ENV PYTHONPATH=/app
ENV PYTHONUNBUFFERED=1

ENTRYPOINT ["python", "apps/kafka_streaming_loader.py"]
CMD ["--help"]
75 changes: 75 additions & 0 deletions apps/kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#!/usr/bin/env python3
"""Simple Kafka consumer script to print messages from a topic in real-time.

Messages are consumed from a consumer group, so subsequent runs will only show
new messages. Press Ctrl+C to exit cleanly.

Usage:
python kafka_consumer.py [topic] [broker] [group_id]

Examples:
python kafka_consumer.py
python kafka_consumer.py anvil_logs
python kafka_consumer.py anvil_logs localhost:9092
python kafka_consumer.py anvil_logs localhost:9092 my-group
"""

import json
import sys
from datetime import datetime

from kafka import KafkaConsumer

topic = sys.argv[1] if len(sys.argv) > 1 else 'anvil_logs'
broker = sys.argv[2] if len(sys.argv) > 2 else 'localhost:9092'
group_id = sys.argv[3] if len(sys.argv) > 3 else 'kafka-consumer-cli'

print(f'Consuming from: {broker} -> topic: {topic}')
print(f'Consumer group: {group_id}')
print(f'Started at: {datetime.now().strftime("%H:%M:%S")}')
print('-' * 80)

consumer = KafkaConsumer(
topic,
bootstrap_servers=broker,
group_id=group_id,
auto_offset_reset='earliest',
enable_auto_commit=True,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
)

msg_count = 0
data_count = 0
reorg_count = 0

try:
for msg in consumer:
msg_count += 1
msg_type = msg.value.get('_type', 'unknown')

if msg_type == 'data':
data_count += 1
print(f'\nMessage #{msg_count} [DATA] - Key: {msg.key.decode() if msg.key else "None"}')
print(f'Offset: {msg.offset} | Partition: {msg.partition}')

for k, v in msg.value.items():
if k != '_type':
print(f'{k}: {v}')

elif msg_type == 'reorg':
reorg_count += 1
print(f'\nMessage #{msg_count} [REORG] - Key: {msg.key.decode() if msg.key else "None"}')
print(f'Network: {msg.value.get("network")}')
print(f'Blocks: {msg.value.get("start_block")} -> {msg.value.get("end_block")}')

else:
print(f'\nMessage #{msg_count} [UNKNOWN]')
print(json.dumps(msg.value, indent=2))

print(f'\nTotal: {msg_count} msgs | Data: {data_count} | Reorgs: {reorg_count}')
print('-' * 80)

except KeyboardInterrupt:
print('\n\nStopped')
finally:
consumer.close()
218 changes: 218 additions & 0 deletions apps/kafka_streaming_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
#!/usr/bin/env python3
"""Stream data to Kafka with resume watermark support."""

import argparse
import json
import logging
import os
import time
from pathlib import Path

from amp.client import Client
from amp.loaders.types import LabelJoinConfig
from amp.streaming import BlockRange, ResumeWatermark

logger = logging.getLogger('amp.kafka_streaming_loader')

RETRYABLE_ERRORS = (
ConnectionError,
TimeoutError,
OSError,
)


def retry_with_backoff(func, max_retries=5, initial_delay=1.0, max_delay=60.0, backoff_factor=2.0):
"""Execute function with exponential backoff retry on transient errors."""
delay = initial_delay
last_exception = None

for attempt in range(max_retries + 1):
try:
return func()
except RETRYABLE_ERRORS as e:
last_exception = e
if attempt == max_retries:
logger.error(f'Max retries ({max_retries}) exceeded: {e}')
raise
logger.warning(f'Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s...')
time.sleep(delay)
delay = min(delay * backoff_factor, max_delay)

raise last_exception


def get_block_hash(client: Client, raw_dataset: str, block_num: int) -> str:
"""Get block hash from dataset.blocks table."""
query = f'SELECT hash FROM "{raw_dataset}".blocks WHERE block_num = {block_num} LIMIT 1'
result = client.get_sql(query, read_all=True)
hash_val = result.to_pydict()['hash'][0]
return '0x' + hash_val.hex() if isinstance(hash_val, bytes) else hash_val


def get_latest_block(client: Client, raw_dataset: str) -> int:
"""Get latest block number from dataset.blocks table."""
query = f'SELECT block_num FROM "{raw_dataset}".blocks ORDER BY block_num DESC LIMIT 1'
logger.debug(f'Fetching latest block from {raw_dataset}')
logger.debug(f'Query: {query}')
result = client.get_sql(query, read_all=True)
block_num = result.to_pydict()['block_num'][0]
logger.info(f'Latest block in {raw_dataset}: {block_num}')
return block_num


def create_watermark(client: Client, raw_dataset: str, network: str, start_block: int) -> ResumeWatermark:
"""Create a resume watermark for the given start block."""
watermark_block = start_block - 1
watermark_hash = get_block_hash(client, raw_dataset, watermark_block)
return ResumeWatermark(
ranges=[BlockRange(network=network, start=watermark_block, end=watermark_block, hash=watermark_hash)]
)


def main(
amp_server: str,
kafka_brokers: str,
topic: str,
query_file: str,
raw_dataset: str,
network: str,
start_block: str = None,
label_csv: str = None,
state_dir: str = '.amp_state',
auth: bool = False,
auth_token: str = None,
max_retries: int = 5,
retry_delay: float = 1.0,
kafka_config: dict = None,
reorg_topic: str = None,
):
def connect():
return Client(amp_server, auth=auth, auth_token=auth_token)

client = retry_with_backoff(connect, max_retries=max_retries, initial_delay=retry_delay)
logger.info(f'Connected to {amp_server}')

if label_csv and Path(label_csv).exists():
client.configure_label('tokens', label_csv)
logger.info(f'Loaded {len(client.label_manager.get_label("tokens"))} labels from {label_csv}')
label_config = LabelJoinConfig(
label_name='tokens', label_key_column='token_address', stream_key_column='token_address'
)
else:
label_config = None

connection_config = {
'bootstrap_servers': kafka_brokers,
'client_id': 'amp-kafka-loader',
'state': {'enabled': True, 'storage': 'lmdb', 'data_dir': state_dir},
}
if reorg_topic:
connection_config['reorg_topic'] = reorg_topic
if kafka_config:
connection_config.update(kafka_config)
client.configure_connection('kafka', 'kafka', connection_config)

with open(query_file) as f:
query = f.read()

if start_block == 'latest':
block = get_latest_block(client, raw_dataset)
resume_watermark = create_watermark(client, raw_dataset, network, block)
logger.info(f'Starting from latest block {block}')
elif start_block is not None:
block = int(start_block)
resume_watermark = create_watermark(client, raw_dataset, network, block) if block > 0 else None
logger.info(f'Starting from block {block}')
else:
resume_watermark = None
logger.info('Resuming from LMDB state')
logger.info(f'Streaming to Kafka: {kafka_brokers} -> {topic}')

batch_count = 0

def stream_batches():
nonlocal batch_count
for result in client.sql(query).load(
'kafka', topic, stream=True, label_config=label_config, resume_watermark=resume_watermark
):
if result.success:
batch_count += 1
block_info = ''
if result.metadata and result.metadata.get('block_ranges'):
ranges = result.metadata['block_ranges']
parts = [f'{r["network"]}:{r["start"]}-{r["end"]}' for r in ranges]
block_info = f' [{", ".join(parts)}]'
logger.info(f'Batch {batch_count}: {result.rows_loaded} rows in {result.duration:.2f}s{block_info}')
else:
logger.error(f'Batch error: {result.error}')

retry_with_backoff(stream_batches, max_retries=max_retries, initial_delay=retry_delay)


if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Stream data to Kafka with resume watermark')
parser.add_argument('--amp-server', default=os.getenv('AMP_SERVER_URL', 'grpc://127.0.0.1:1602'))
parser.add_argument('--kafka-brokers', default='localhost:9092')
parser.add_argument('--topic', required=True)
parser.add_argument('--reorg-topic', help='Separate topic for reorg messages (default: same as --topic)')
parser.add_argument('--query-file', required=True)
parser.add_argument(
'--raw-dataset', required=True, help='Dataset name for the raw dataset of the chain (e.g., anvil, eth_firehose)'
)
parser.add_argument('--network', default='anvil')
parser.add_argument('--start-block', type=str, help='Start from specific block number or "latest"')
parser.add_argument('--label-csv', help='Optional CSV for label joining')
parser.add_argument('--state-dir', default='.amp_state', help='Directory for LMDB state storage')
parser.add_argument('--auth', action='store_true', help='Enable auth using ~/.amp/cache or AMP_AUTH_TOKEN env var')
parser.add_argument('--auth-token', help='Explicit auth token (works independently, does not require --auth)')
parser.add_argument('--max-retries', type=int, default=5, help='Max retries for connection failures (default: 5)')
parser.add_argument('--retry-delay', type=float, default=1.0, help='Initial retry delay in seconds (default: 1.0)')
parser.add_argument(
'--kafka-config',
type=str,
help='Extra Kafka producer config as JSON. Uses kafka-python naming (underscores). '
'Example: \'{"compression_type": "lz4", "linger_ms": 5}\'. '
'See: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html',
)
parser.add_argument(
'--kafka-config-file',
type=Path,
help='Path to JSON file with extra Kafka producer config',
)
parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'])
args = parser.parse_args()

logging.basicConfig(level=logging.WARNING, format='%(asctime)s [%(name)s] %(levelname)s: %(message)s')
log_level = getattr(logging, args.log_level) if args.log_level else logging.INFO
logging.getLogger('amp').setLevel(log_level)

kafka_config = {}
if args.kafka_config_file:
kafka_config = json.loads(args.kafka_config_file.read_text())
logger.info(f'Loaded Kafka config from {args.kafka_config_file}')
if args.kafka_config:
kafka_config.update(json.loads(args.kafka_config))

try:
main(
amp_server=args.amp_server,
kafka_brokers=args.kafka_brokers,
topic=args.topic,
query_file=args.query_file,
raw_dataset=args.raw_dataset,
network=args.network,
start_block=args.start_block,
label_csv=args.label_csv,
state_dir=args.state_dir,
auth=args.auth,
auth_token=args.auth_token,
max_retries=args.max_retries,
retry_delay=args.retry_delay,
kafka_config=kafka_config or None,
reorg_topic=args.reorg_topic,
)
except KeyboardInterrupt:
logger.info('Stopped by user')
except Exception as e:
logger.error(f'Fatal error: {e}')
raise
Loading
Loading