Skip to content

coded-streams/codedstreams-risko-library

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

1 Commit
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

CodedStreams Risko Library

A comprehensive Avro schema library for the codedstream real-time crypto risk detection demonstration systems. This library provides standardized data contracts and schema definitions for building scalable, evolvable risk management platforms.

πŸ“Š Schema Overview

graph TB
    A[TradeEvent] --> D[RiskAlert]
    B[MarketDataEvent] --> D
    C[RiskRule] --> D
    A --> E[PositionUpdate]
    B --> E
    
    subgraph "Core Events"
        A
        B
    end
    
    subgraph "Risk Domain"
        C
        D
        E
    end
Loading

πŸ—οΈ Schema Architecture

Core Data Contracts

Schema Purpose Key Features
TradeEvent Trading activity Multi-exchange support, cancellation tracking
MarketDataEvent Price and volume data Bid/ask spreads, 24h metrics
RiskAlert Risk detection outputs Severity scoring, pattern metadata
RiskRule Dynamic rules Hot-reloadable, versioned configurations
PositionUpdate Real-time exposures PnL tracking, position aggregation

πŸ“¦ Installation

Maven

<dependency>
    <groupId>com.codedstreams</groupId>
    <artifactId>codedstreams-risko-library</artifactId>
    <version>1.0.0</version>
</dependency>

Gradle

implementation("com.codedstreams:codedstreams-risko-library:1.0.0")

πŸ”§ Build from Source

git clone https://github.com/codedstreams/risko-library.git
cd risko-library
mvn clean install

πŸ“‹ Available Schemas

TradeEvent

{
  "type": "record",
  "name": "TradeEvent",
  "namespace": "com.codedstreams.schemas",
  "fields": [
    {"name": "eventId", "type": "string"},
    {"name": "traderId", "type": "string"},
    {"name": "exchange", "type": {"type": "enum", "name": "Exchange", "symbols": ["BINANCE", "COINBASE", "KRAKEN"]}},
    {"name": "symbol", "type": "string"},
    {"name": "tradeType", "type": {"type": "enum", "name": "TradeType", "symbols": ["MARKET", "LIMIT"]}},
    {"name": "side", "type": {"type": "enum", "name": "OrderSide", "symbols": ["BUY", "SELL"]}},
    {"name": "price", "type": "double"},
    {"name": "quantity", "type": "double"},
    {"name": "timestamp", "type": "long"},
    {"name": "orderId", "type": "string"},
    {"name": "isCancel", "type": "boolean", "default": false},
    {"name": "parentOrderId", "type": ["null", "string"], "default": null}
  ]
}

RiskAlert

{
  "type": "record", 
  "name": "RiskAlert",
  "namespace": "com.codedstreams.schemas",
  "fields": [
    {"name": "alertId", "type": "string"},
    {"name": "traderId", "type": "string"},
    {"name": "alertType", "type": {"type": "enum", "name": "AlertType", "symbols": ["WASH_TRADE", "SPOOFING", "POSITION_LIMIT", "RATE_LIMIT", "MARKET_ABUSE"]}},
    {"name": "pattern", "type": "string"},
    {"name": "severity", "type": "double"},
    {"name": "description", "type": "string"},
    {"name": "timestamp", "type": "long"},
    {"name": "metadata", "type": {"type": "map", "values": "string"}},
    {"name": "ruleId", "type": ["null", "string"], "default": null}
  ]
}

🎯 Usage Examples

Java Usage

// Creating a trade event
TradeEvent trade = TradeEvent.newBuilder()
    .setEventId(UUID.randomUUID().toString())
    .setTraderId("trader-123")
    .setExchange(Exchange.BINANCE)
    .setSymbol("BTC-USDT")
    .setTradeType(TradeType.LIMIT)
    .setSide(OrderSide.BUY)
    .setPrice(45000.50)
    .setQuantity(1.5)
    .setTimestamp(System.currentTimeMillis())
    .setOrderId("order-456")
    .setIsCancel(false)
    .setParentOrderId(null)
    .build();

// Creating a risk alert
RiskAlert alert = RiskAlert.newBuilder()
    .setAlertId(UUID.randomUUID().toString())
    .setTraderId("trader-123")
    .setAlertType(AlertType.WASH_TRADE)
    .setPattern("BUY-SELL-CANCEL")
    .setSeverity(0.85)
    .setDescription("Potential wash trading detected")
    .setTimestamp(System.currentTimeMillis())
    .setMetadata(Map.of("buyOrderId", "order-456", "sellOrderId", "order-457"))
    .setRuleId("rule-wash-trade-v1")
    .build();

Kafka Producer Configuration

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");

KafkaProducer<String, TradeEvent> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("trades", trade.getTraderId(), trade));

Kafka Consumer Configuration

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "risk-engine");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
props.put("specific.avro.reader", "true");

KafkaConsumer<String, RiskAlert> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("risk-alerts"));

πŸ”„ Schema Evolution

The library supports backward and forward compatibility through Avro schema evolution:

Adding Fields

// New field with default value
{
  "name": "newField",
  "type": "string",
  "default": "default_value"
}

Removing Fields

// Field can be safely removed if it had a default
{
  "name": "oldField",
  "type": "string",
  "default": "old_default"  // Required for safe removal
}

πŸ“ˆ Data Flow with Schema Registry

sequenceDiagram
    participant P as Producer
    participant SR as Schema Registry
    participant K as Kafka
    participant C as Consumer
    
    P->>SR: Register/Validate Schema
    SR->>P: Schema ID
    P->>K: Send Avro + Schema ID
    C->>K: Poll Messages
    C->>SR: Get Schema by ID
    SR->>C: Return Schema
    C->>C: Deserialize Avro
Loading

πŸ§ͺ Testing

Unit Testing

public class TradeEventTest {
    
    @Test
    public void testTradeEventCreation() {
        TradeEvent trade = TradeEvent.newBuilder()
            .setEventId("test-123")
            .setTraderId("trader-1")
            .setExchange(Exchange.BINANCE)
            // ... set other fields
            .build();
            
        assertEquals("trader-1", trade.getTraderId());
        assertEquals(Exchange.BINANCE, trade.getExchange());
    }
}

Schema Compatibility Testing

# Test schema compatibility
mvn avro:schema
mvn avro:test-compatibility

πŸš€ Integration with Flink

// Flink Avro Deserialization
KafkaSource<TradeEvent> source = KafkaSource.<TradeEvent>builder()
    .setBootstrapServers("localhost:9092")
    .setTopics("trades")
    .setValueOnlyDeserializer(
        RegistryAvroDeserializationSchema.forSpecific(TradeEvent.class, schemaRegistryUrl)
    )
    .build();

πŸ“Š Monitoring and Metrics

The library supports integration with monitoring systems through structured logging and metrics:

// Structured logging example
logger.info("Risk alert generated", 
    "alertId", alert.getAlertId(),
    "traderId", alert.getTraderId(), 
    "severity", alert.getSeverity(),
    "pattern", alert.getPattern());

🀝 Contributing

  1. Fork the repository
  2. Create a feature branch: git checkout -b feature/new-schema
  3. Update schemas in src/main/avro/
  4. Regenerate Java classes: mvn compile
  5. Submit a pull request

Schema Change Process

flowchart LR
    A[Propose Change] --> B[Update .avsc]
    B --> C[Regenerate Java]
    C --> D[Test Compatibility]
    D --> E[Submit PR]
    E --> F[Review & Merge]
Loading

πŸ“„ License

Apache License 2.0 - See LICENSE file for details.

πŸ†˜ Support

🏷️ Versioning

This library follows Semantic Versioning:

  • MAJOR - Breaking schema changes
  • MINOR - New features, backward compatible
  • PATCH - Bug fixes, backward compatible

Built with ❀️ by CodedStreams for the passionate data streamers and risk mitigation engineers

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors 2

  •  
  •