-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_legacy.py
More file actions
23 lines (19 loc) · 826 Bytes
/
test_legacy.py
File metadata and controls
23 lines (19 loc) · 826 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
import grpc
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), 'generated'))
from generated import cache_pb2, cache_pb2_grpc
async def run():
# Connect to localhost:50051 (mapped to node1)
async with grpc.aio.insecure_channel('localhost:50051') as channel:
stub = cache_pb2_grpc.CacheServiceStub(channel)
# Query a key that ONLY exists in the Legacy API
print("Querying for 'user:1001'...")
response = await stub.Get(cache_pb2.GetRequest(key="user:1001"))
if response.found and response.value.decode() == "Dr. Heisenberg":
print("✅ SUCCESS: Fetched data from Legacy API!")
else:
print("❌ FAILED: Could not fetch from Legacy API.")
if __name__ == '__main__':
asyncio.run(run())