Async Python client for ShipStation v1 and v2 APIs with an emphasis on typing.
pip install AsyncShipStationgit clone git@github.com:sudoDeVinci/AsyncShipStation.git
cd AsyncShipStation
pip install -r requirements.txtCreate a connection, then pass it to the portal you want to use.
import asyncio
from AsyncShipStation import ShipStationClient, ShipmentPortal
async def main() -> None:
connection = await ShipStationClient.connect(
v2_key="your_v2_api_key",
v1_key="your_v1_api_key",
v1_secret="your_v1_secret",
)
async with ShipStationClient.scoped_client(connection=connection, version="v2"):
status, shipments = await ShipmentPortal.list(
connection,
page_size=10,
page=1,
)
print(status, shipments)
if __name__ == "__main__":
asyncio.run(main())The library comes pre-configured with 'sensible' default values for connections, but you can configure these parameters by passing a ConnectionConfig object with the parameters you would like changed.
config = ConnectionConfig(
version="v2",
timeout=30,
max_connections=20,
max_keepalive_connections=10,
http2=False,
retries=4,
user_agent="your_custom_UA_string",
)
connection = await ShipStationClient.connect(
v2_key="your_v2_api_key",
v1_key="your_v1_api_key",
v1_secret="your_v1_secret",
config=config,
)NOTE: A connection's config is tied to its identity. Two connections to the ssame credentials, if using different configs, will be treated as separate connections.
Use scoped_client() when you want the connection lifecycle handled for you.
import asyncio
import os
from dotenv import load_dotenv
from AsyncShipStation import ShipStationClient, ShipmentPortal
load_dotenv()
V2_API_KEY: str | None = os.getenv("SHIP_STATION_V2")
V1_API_KEY: str | None = os.getenv("SHIP_STATION_V1")
V1_SECRET: str | None = os.getenv("SHIP_STATION_SECRET")
async def main() -> None:
connection = await ShipStationClient.connect(
v2_key=V2_API_KEY or "",
v1_key=V1_API_KEY,
v1_secret=V1_SECRET,
)
async with ShipStationClient.scoped_client(connection=connection, version="v2"):
status, shipments = await ShipmentPortal.list(connection, page_size=10, page=1)
print(status, shipments)
if __name__ == "__main__":
asyncio.run(main())If you don't need to reuse the connection object, you can skip the explicit configuration step and pass the credentials directly to scoped_client().
import asyncio
import os
from dotenv import load_dotenv
from AsyncShipStation import ShipStationClient, ShipmentPortal
load_dotenv()
V2_API_KEY: str | None = os.getenv("SHIP_STATION_V2")
V1_API_KEY: str | None = os.getenv("SHIP_STATION_V1")
V1_SECRET: str | None = os.getenv("SHIP_STATION_SECRET")
async def main() -> None:
connection = await ShipStationClient.connect(
)
async with ShipStationClient.scoped_client(
v2_key=V2_API_KEY or "",
v1_key=V1_API_KEY,
v1_secret=V1_SECRET, version="v2"
):
status, shipments = await ShipmentPortal.list(connection, page_size=10, page=1)
print(status, shipments)
if __name__ == "__main__":
asyncio.run(main())Use start() and close() if you want to manage the lifecycle yourself.
import asyncio
import os
from dotenv import load_dotenv
from AsyncShipStation import ShipStationClient, ShipmentPortal
load_dotenv()
V2_API_KEY: str | None = os.getenv("SHIP_STATION_V2")
V1_API_KEY: str | None = os.getenv("SHIP_STATION_V1")
V1_SECRET: str | None = os.getenv("SHIP_STATION_SECRET")
async def main() -> None:
connection = await ShipStationClient.connect(
v2_key=V2_API_KEY or "",
v1_key=V1_API_KEY,
v1_secret=V1_SECRET,
)
await ShipStationClient.start(connection=connection, version="v2")
try:
status, shipments = await ShipmentPortal.list(connection, page_size=10, page=1)
print(status, shipments)
finally:
await ShipStationClient.close(connection=connection, version="v2")
if __name__ == "__main__":
asyncio.run(main())A single connection is meant to be shared across concurrent requests.
import asyncio
from AsyncShipStation import (
BatchPortal,
LabelPortal,
ShipmentPortal,
ShipStationClient,
)
async def main() -> None:
connection = await ShipStationClient.connect(
v2_key="your_v2_api_key",
v1_key="your_v1_api_key",
v1_secret="your_v1_secret",
)
async with ShipStationClient.scoped_client(connection=connection, version="v2"):
results = await asyncio.gather(
ShipmentPortal.list(connection, page_size=10, page=1),
BatchPortal.list(connection, page_size=10, page=1),
LabelPortal.list(connection, page_size=10, page=1),
)
for status, data in results:
if status in (200, 201, 207):
print(f"Success :: {data}")
else:
print(f"Error :: {data}")
if __name__ == "__main__":
asyncio.run(main())If you need to retrieve a connection from the pool later, use connection.uid.
import asyncio
from AsyncShipStation import ShipStationClient, ShipmentPortal
async def main() -> None:
connection = await ShipStationClient.connect(
v2_key="your_v2_api_key",
v1_key="your_v1_api_key",
v1_secret="your_v1_secret",
)
async with ShipStationClient.scoped_client(
connection_hash=connection.pool_key,
version="v2",
) as scoped_connection:
status, shipments = await ShipmentPortal.list(
scoped_connection,
page_size=10,
page=1,
)
print(status, shipments)
if __name__ == "__main__":
asyncio.run(main())Accounts that send too many requests in quick succession will receive a 429 Too Many Requests response with a Retry-After header that tells you how long to wait.
ShipStation bulk operation endpoints count as a single request.