Skip to content

Commit

Permalink
polygon no longer supports L2 crypto
Browse files Browse the repository at this point in the history
  • Loading branch information
deeleeramone committed Dec 22, 2024
1 parent 23013e6 commit 9030261
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
)
from pydantic import Field, field_validator, model_validator


ASSET_CHOICES = [
"stock",
"stock_delayed",
Expand All @@ -46,7 +45,6 @@
"aggs_sec": "XAS",
"trade": "XT",
"quote": "XQ",
"l2": "XL2",
"fmv": "FMV",
},
"fx": {
Expand Down Expand Up @@ -125,6 +123,17 @@ class PolygonWebSocketQueryParams(WebSocketQueryParams):
"multiple_items_allowed": False,
"choices": ASSET_CHOICES,
},
"feed": {
"multiple_items_allowed": False,
"choices": [
"aggs_min",
"aggs_sec",
"trade",
"quote",
"fmv",
"value",
],
},
}

symbol: str = Field(
Expand Down Expand Up @@ -164,7 +173,6 @@ class PolygonWebSocketQueryParams(WebSocketQueryParams):
- aggs_sec: XAS.<SYMBOL>
- trade: XT.<SYMBOL>
- quote: XQ.<SYMBOL>
- l2: XL2.<SYMBOL>
- fmv: FMV.<SYMBOL>
FX
Expand All @@ -189,12 +197,10 @@ class PolygonWebSocketQueryParams(WebSocketQueryParams):
description="The asset type associated with the symbol(s)."
+ " Choose from: stock, stock_delayed, fx, crypto.",
)
feed: Literal["aggs_min", "aggs_sec", "trade", "quote", "l2", "fmv", "value"] = (
Field(
default="aggs_sec",
description="The asset type associated with the symbol."
+ "l2 is only available for crypto, and value is only available for index.",
)
feed: Literal["aggs_min", "aggs_sec", "trade", "quote", "fmv", "value"] = Field(
default="aggs_sec",
description="The asset type associated with the symbol."
+ "Value is only available for index.",
)

@model_validator(mode="before")
Expand All @@ -203,26 +209,30 @@ def _validate_feed(cls, values):
"""Validate the feed."""
feed = values.get("feed")
asset_type = values.get("asset_type")
if asset_type == "fx" and feed in ["trade", "l2", "value"]:
if asset_type == "fx" and feed in ["trade", "value"]:
raise ValueError(f"FX does not support the {feed} feed.")
if asset_type in [
"stock",
"stock_delayed",
"options",
"options_delayed",
] and feed in ["l2", "value"]:
if (
asset_type
in [
"stock",
"stock_delayed",
"options",
"options_delayed",
]
and feed == "value"
):
raise ValueError(
f"Asset type, {asset_type}, does not support the {feed} feed."
)
if asset_type in ["index", "index_delayed"] and feed in [
"trade",
"quote",
"l2",
"fmv",
]:
raise ValueError(f"Index does not support the {feed} feed.")
if asset_type == "crypto" and feed == "value":
raise ValueError(f"Crypto does not support the {feed} feed.")

return values


Expand Down Expand Up @@ -291,6 +301,8 @@ def _validate_date(cls, v):
def _validate_model(cls, values):
"""Validate the model."""
_ = values.pop("s", None)
symbol = values.pop("p", "") if "p" in values else values.pop("pair", "")
values["pair"] = symbol.replace("-", "").replace("/", "")
return values


Expand Down Expand Up @@ -366,6 +378,8 @@ def _validate_exchange(cls, v):
def _validate_model(cls, values):
"""Validate the model."""
_ = values.pop("i", None)
symbol = values.pop("pair", "")
values["pair"] = symbol.replace("-", "")
return values


Expand Down Expand Up @@ -433,65 +447,17 @@ def _validate_model(cls, values):
"""Validate the model."""
lp = values.pop("lp", None)
ls = values.pop("ls", None)

if lp:
values["last_price"] = lp

if ls:
values["last_size"] = ls

return values


class PolygonCryptoL2WebSocketData(WebSocketData):
"""Polygon Crypto L2 WebSocket data model."""
symbol = values.pop("pair", "")
values["pair"] = symbol.replace("-", "")

__alias_dict__ = {
"type": "ev",
"symbol": "pair",
"date": "t",
"exchange": "x",
"bid": "b",
"ask": "a",
"received_at": "r",
}

type: str = Field(
description="The type of data.",
)
date: datetime = Field(
description=DATA_DESCRIPTIONS.get("date", ""),
)
received_at: datetime = Field(
description="The time the data was received by Polygon.",
)
symbol: str = Field(
description=DATA_DESCRIPTIONS.get("symbol", ""),
)
exchange: str = Field(
default=None,
description="The exchange of the data.",
)
bid: list[list[float]] = Field(
description="An array of bid prices, where each entry contains two elements:"
+ " the first is the bid price, and the second is the size, with a maximum depth of 100.",
json_schema_extra={"x-unit_measurement": "currency"},
)
ask: list[list[float]] = Field(
description="An array of ask prices, where each entry contains two elements:"
+ " the first is the ask price, and the second is the size, with a maximum depth of 100.",
json_schema_extra={"x-unit_measurement": "currency"},
)

@field_validator("date", "received_at", mode="before", check_fields=False)
@classmethod
def _validate_date(cls, v):
"""Validate the date."""
return validate_date(cls, v)

@field_validator("exchange", mode="before", check_fields=False)
@classmethod
def _validate_exchange(cls, v):
"""Validate the exchange."""
return CRYPTO_EXCHANGE_MAP.get(v, str(v))
return values


class PolygonFXQuoteWebSocketData(WebSocketData):
Expand Down Expand Up @@ -545,6 +511,8 @@ def _validate_exchange(cls, v):
def _validate_model(cls, values):
"""Validate the model."""
_ = values.pop("i", None)
symbol = values.pop("p", "")
values["p"] = symbol.replace("/", "")
return values


Expand Down Expand Up @@ -1103,11 +1071,22 @@ class PolygonFairMarketValueData(WebSocketData):
json_schema_extra={"x-unit_measurement": "currency"},
)

@field_validator("date", mode="before", check_fields=False)
@classmethod
def _validate_date(cls, v):
"""Validate the date."""
return validate_date(cls, v)

@field_validator("symbol", mode="before", check_fields=False)
@classmethod
def _validate_symbol(cls, v):
"""Validate the symbol."""
return v.replace("-", "").replace("/", "")


MODEL_MAP = {
"XT": PolygonCryptoTradeWebSocketData,
"XQ": PolygonCryptoQuoteWebSocketData,
"XL2": PolygonCryptoL2WebSocketData,
"XA": PolygonCryptoAggsWebSocketData,
"XAS": PolygonCryptoAggsWebSocketData,
"FMV": PolygonFairMarketValueData,
Expand Down Expand Up @@ -1160,7 +1139,6 @@ class PolygonWebSocketData(Data):
- Aggs: XAS, XA - PolygonCryptoAggsWebSocketData
- Trade: XT - PolygonCryptoTradeWebSocketData
- Quote: XQ - PolygonCryptoQuoteWebSocketData
- L2: XL2 - PolygonCryptoL2WebSocketData
- Fair Market Value: FMV - PolygonFairMarketValueData
FX
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ async def handle_symbol(symbol):
_feed, _ticker = ticker.split(".") if "." in ticker else (feed, ticker)
ticker = f"{_feed}.O:{_ticker}"

new_symbols.append(ticker)
if ticker == "XL2.*":
symbol_error = f"SymbolError -> {symbol}: L2 Crypto does not support the all-symbols wildcard."
logger.error(symbol_error)
else:
new_symbols.append(ticker)

return ",".join(new_symbols)

Expand Down Expand Up @@ -130,7 +134,11 @@ async def login(websocket):
sys.exit(1)
logger.info("PROVIDER INFO: %s", msg.get("message"))
except Exception as e:
logger.error("PROVIDER ERROR: %s -> %s", e.__class__.__name__, e.args[0])
logger.error(
"PROVIDER ERROR: %s -> %s",
e.__class__.__name__ if hasattr(e, "__class__") else e,
e.args[0],
)
sys.exit(1)


Expand All @@ -154,15 +162,9 @@ async def read_stdin():
while True:
line = await asyncio.get_event_loop().run_in_executor(None, sys.stdin.readline)
sys.stdin.flush()

if not line:
continue

if "qsize" in line:
logger.info(f"PROVIDER INFO: Queue size: {queue.queue.qsize()}")
else:
if line:
try:
command = json.loads(line.strip())
command = line.strip() if "qsize" in line else json.loads(line.strip())
await command_queue.enqueue(command)
except json.JSONDecodeError:
logger.error("Invalid JSON received from stdin")
Expand All @@ -172,10 +174,13 @@ async def process_stdin_queue(websocket):
"""Process the command queue."""
while True:
command = await command_queue.dequeue()
symbol = command.get("symbol")
event = command.get("event")
if symbol and event:
await subscribe(websocket, symbol, event)
if command == "qsize":
logger.info(f"PROVIDER INFO: Queue size: {queue.queue.qsize()}")
else:
symbol = command.get("symbol")
event = command.get("event")
if symbol and event:
await subscribe(websocket, symbol, event)


async def process_message(message):
Expand Down Expand Up @@ -219,18 +224,18 @@ async def connect_and_stream():
queue.process_queue(lambda message: process_message(message))
)
tasks.add(handler_task)
for i in range(0, 48):
new_task = asyncio.shield(
asyncio.create_task(
queue.process_queue(lambda message: process_message(message))
)
for i in range(0, 64):
new_task = asyncio.create_task(
queue.process_queue(lambda message: process_message(message))
)
tasks.add(new_task)
stdin_task = asyncio.shield(asyncio.create_task(read_stdin()))
try:
connect_kwargs = CONNECT_KWARGS.copy()
connect_kwargs["max_size"] = None
connect_kwargs["read_limit"] = 2**32
connect_kwargs["close_timeout"] = 10
connect_kwargs["ping_timeout"] = None

try:
async with websockets.connect(URL, **connect_kwargs) as websocket:
Expand All @@ -242,22 +247,16 @@ async def connect_and_stream():
response = await websocket.recv()
messages = json.loads(response)
await process_message(messages)
cmd_task = asyncio.get_running_loop().create_task(
process_stdin_queue(websocket)
)
while True:
cmd_task = asyncio.create_task(process_stdin_queue(websocket))
msg_task = asyncio.create_task(websocket.recv())
done, pending = await asyncio.wait(
[cmd_task, msg_task],
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()

for task in done:
if task == cmd_task:
await cmd_task
elif task == msg_task:
messages = task.result()
await asyncio.shield(queue.enqueue(json.loads(messages)))
messages = await websocket.recv()
await queue.enqueue(json.loads(messages))

cmd_task.cancel()
await cmd_task
asyncio.gather(*cmd_task, return_exceptions=True)

except websockets.InvalidStatusCode as e:
if e.status_code == 404:
Expand Down

0 comments on commit 9030261

Please sign in to comment.