Skip to content

Commit

Permalink
python: simplify event handler
Browse files Browse the repository at this point in the history
  • Loading branch information
narumiruna committed Apr 13, 2022
1 parent 00c8722 commit 7942558
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 116 deletions.
3 changes: 3 additions & 0 deletions python/bbgo/handlers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .error import ErrorHandler
from .handler import Handler
from .update import UpdateHandler
12 changes: 12 additions & 0 deletions python/bbgo/handlers/error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from ..data import Event
from ..enums import EventType
from .handler import Handler


class ErrorHandler(Handler):

def __call__(self, event: Event) -> None:
if event.event_type != EventType.ERROR:
return

super(ErrorHandler, self).__call__(event)
10 changes: 10 additions & 0 deletions python/bbgo/handlers/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from ..data import Event


class Handler(object):

def __call__(self, event: Event) -> None:
self.handle(event)

def handle(self, event: Event) -> None:
raise NotImplementedError
12 changes: 12 additions & 0 deletions python/bbgo/handlers/update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from ..data import Event
from ..enums import EventType
from .handler import Handler


class UpdateHandler(Handler):

def __call__(self, event: Event) -> None:
if event.event_type != EventType.UPDATE:
return

super(UpdateHandler, self).__call__(event)
141 changes: 37 additions & 104 deletions python/bbgo/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,141 +2,74 @@
from typing import Callable
from typing import List

import grpc
from bbgo.enums import ChannelType
from bbgo.enums.depth_type import DepthType

import bbgo_pb2
import bbgo_pb2_grpc
import grpc

from .data import Event
from .data import Subscription


class Stream(object):
subscriptions: List[Subscription]

def __init__(self, host: str, port: int, subscriptions: List[bbgo_pb2.Subscription], user_data: bool = False):
def __init__(self, host: str, port: int, user_data: bool = False):
self.host = host
self.port = port
self.subscriptions = subscriptions
self.user_data = user_data

# callbacks for public channel
self.book_event_callbacks = []
self.trade_event_callbacks = []
self.ticker_event_callbacks = []
self.subscriptions = []
self.event_handlers = []

def subscribe(self, exchange: str, channel: str, symbol: str, depth: str = None, interval: str = None):
subscription = Subscription(exchange=exchange, channel=ChannelType(channel), symbol=symbol)

if depth is not None:
subscription.depth = DepthType(depth)

if interval is not None:
subscription.interval = interval

self.subscriptions.append(subscription)

# callbacks for private channel
self.order_snapshot_event_callbacks = []
self.order_update_event_callbacks = []
self.trade_snapshot_event_callbacks = []
self.trade_update_event_callbacks = []
self.account_snapshot_event_callbacks = []
self.account_update_event_callbacks = []
def add_event_handler(self, event_handler: Callable) -> None:
self.event_handlers.append(event_handler)

def fire_event_handlers(self, event: Event) -> None:
for event_handler in self.event_handlers:
event_handler(event)

@property
def address(self):
return f'{self.host}:{self.port}'

async def subscribe(self):
async def _subscribe(self):
async with grpc.aio.insecure_channel(self.address) as channel:
stub = bbgo_pb2_grpc.MarketDataServiceStub(channel)

request = bbgo_pb2.SubscribeRequest(subscriptions=self.subscriptions)
request = bbgo_pb2.SubscribeRequest(subscriptions=[s.to_pb() for s in self.subscriptions])
async for response in stub.Subscribe(request):
self.dispatch(response)
event = Event.from_pb(response)
self.fire_event_handlers(event)

async def subscribe_user_data(self):
async def _subscribe_user_data(self):
async with grpc.aio.insecure_channel(self.address) as channel:
stub = bbgo_pb2_grpc.UserDataServiceStub(channel)

request = bbgo_pb2.Empty()
async for response in stub.SubscribeUserData(request):
self.dispatch_user_events(response)
event = Event.from_pb(response)
self.fire_event_handlers(event)

def start(self):
coroutines = [self.subscribe()]
coroutines = [self._subscribe()]
if self.user_data:
coroutines.append(self.subscribe_user_data())
coroutines.append(self._subscribe_user_data())

group = asyncio.gather(*coroutines)
loop = asyncio.get_event_loop()
loop.run_until_complete(group)
loop.close()

def dispatch(self, response: bbgo_pb2.SubscribeResponse):
m = {
bbgo_pb2.Channel.BOOK: self.emit_book_event_callbacks,
bbgo_pb2.Channel.TRADE: self.emit_trade_event_callbacks,
bbgo_pb2.Channel.TICKER: self.emit_ticker_event_callbacks,
bbgo_pb2.Channel.USER: self.dispatch_user_events,
}
m[response.channel](response)

def dispatch_user_events(self, response: bbgo_pb2.SubscribeResponse):
m = {
bbgo_pb2.Event.ORDER_SNAPSHOT: self.emit_order_snapshot_event_callbacks,
bbgo_pb2.Event.ORDER_UPDATE: self.emit_order_update_event_callbacks,
bbgo_pb2.Event.TRADE_SNAPSHOT: self.emit_trade_snapshot_event_callbacks,
bbgo_pb2.Event.TRADE_UPDATE: self.emit_trade_update_event_callbacks,
bbgo_pb2.Event.ACCOUNT_SNAPSHOT: self.emit_account_snapshot_event_callbacks,
bbgo_pb2.Event.ACCOUNT_UPDATE: self.emit_account_update_event_callbacks,
}
m[response.event](response)

def on_book_event(self, callback: Callable) -> None:
self.book_event_callbacks.append(callback)

def emit_book_event_callbacks(self, event) -> None:
for callback in self.book_event_callbacks:
callback(event)

def on_trade_event(self, callback: Callable) -> None:
self.trade_event_callbacks.append(callback)

def emit_trade_event_callbacks(self, event) -> None:
for callback in self.trade_event_callbacks:
callback(event)

def on_ticker_event(self, callback: Callable) -> None:
self.ticker_event_callbacks.append(callback)

def emit_ticker_event_callbacks(self, event) -> None:
for callback in self.ticker_event_callbacks:
callback(event)

def on_order_snapshot_event(self, callback: Callable) -> None:
self.order_snapshot_event_callbacks.append(callback)

def emit_order_snapshot_event_callbacks(self, event) -> None:
for callback in self.order_snapshot_event_callbacks:
callback(event)

def on_order_update_event(self, callback: Callable) -> None:
self.order_update_event_callbacks.append(callback)

def emit_order_update_event_callbacks(self, event) -> None:
for callback in self.order_update_event_callbacks:
callback(event)

def on_trade_snapshot_event(self, callback: Callable) -> None:
self.trade_snapshot_event_callbacks.append(callback)

def emit_trade_snapshot_event_callbacks(self, event) -> None:
for callback in self.trade_snapshot_event_callbacks:
callback(event)

def on_trade_update_event(self, callback: Callable) -> None:
self.trade_update_event_callbacks.append(callback)

def emit_trade_update_event_callbacks(self, event) -> None:
for callback in self.trade_update_event_callbacks:
callback(event)

def on_account_snapshot_event(self, callback: Callable) -> None:
self.account_snapshot_event_callbacks.append(callback)

def emit_account_snapshot_event_callbacks(self, event) -> None:
for callback in self.account_snapshot_event_callbacks:
callback(event)

def on_account_update_event(self, callback: Callable) -> None:
self.account_update_event_callbacks.append(callback)

def emit_account_update_event_callbacks(self, event) -> None:
for callback in self.account_update_event_callbacks:
callback(event)
24 changes: 12 additions & 12 deletions python/stream.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
import click
from loguru import logger

import bbgo_pb2
from bbgo import Stream
from bbgo.data import Event
from bbgo.handlers import UpdateHandler


@click.command()
@click.option('--host', default='127.0.0.1')
@click.option('--port', default=50051)
def main(host, port):
subscriptions = [
bbgo_pb2.Subscription(exchange='max', channel=bbgo_pb2.Channel.BOOK, symbol='BTCUSDT', depth='full'),
bbgo_pb2.Subscription(exchange='binance', channel=bbgo_pb2.Channel.BOOK, symbol='BTCUSDT', depth='full'),
]
class LogBook(UpdateHandler):

def book_event_callback(event):
def handle(self, event: Event) -> None:
logger.info(event)

stream = Stream(host, port, subscriptions)
stream.on_book_event(book_event_callback)

@click.command()
@click.option('--host', default='127.0.0.1')
@click.option('--port', default=50051)
def main(host, port):
stream = Stream(host, port)
stream.subscribe('max', 'book', 'BTCUSDT', 'full')
stream.subscribe('max', 'book', 'ETHUSDT', 'full')
stream.add_event_handler(LogBook())
stream.start()


Expand Down

0 comments on commit 7942558

Please sign in to comment.