Skip to content

Commit 4442a84

Browse files
feat: Support event with long polling (box/box-codegen#757) (#936)
1 parent 0ff2c48 commit 4442a84

File tree

7 files changed

+538
-3
lines changed

7 files changed

+538
-3
lines changed

.codegen.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
{ "engineHash": "1d94996", "specHash": "24fbeb9", "version": "0.1.0" }
1+
{ "engineHash": "1c54d3c", "specHash": "24fbeb9", "version": "0.1.0" }

box_sdk_gen/box/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from box_sdk_gen.box.event_stream import *
2+
13
from box_sdk_gen.box.errors import *
24

35
from box_sdk_gen.box.token_storage import *

box_sdk_gen/box/event_stream.py

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
import threading
2+
from enum import Enum
3+
from typing import Optional, Generator
4+
5+
from ..box.errors import BoxSDKError
6+
from ..schemas.events import Events
7+
from ..schemas.event import Event
8+
from ..schemas.realtime_server import RealtimeServer
9+
from ..networking.fetch_options import FetchOptions, ResponseFormat
10+
from ..networking.fetch_response import FetchResponse
11+
12+
13+
class RealtimeServerEvent(str, Enum):
14+
NEW_CHANGE = 'new_change'
15+
RECONNECT = 'reconnect'
16+
17+
18+
class EventStreamAction(str, Enum):
19+
FETCH_EVENTS = 'fetch_events'
20+
RECONNECT = 'reconnect'
21+
RETRY = 'retry'
22+
STOP = 'stop'
23+
24+
25+
class EventStream:
26+
"""
27+
EventStream is an iterator that fetches events from the Box API.
28+
It uses long polling to receive real-time updates.
29+
This class is designed to be used as a Python iterator.
30+
31+
Example usage:
32+
events_stream = client.events.get_event_stream()
33+
for event in events_stream:
34+
print(event)
35+
"""
36+
37+
def __init__(self, *, events_manager, query_params, headers_input):
38+
"""
39+
Initialize the EventStream.
40+
41+
:param events_manager: The EventsManager instance which provides relevant methods to fetch events.
42+
:param query_params: The query parameters to use for fetching events.
43+
:param headers_input: The headers to include in the request.
44+
"""
45+
self._events_manager = events_manager
46+
self._query_params = query_params
47+
self._headers_input = headers_input
48+
self._stream_position = query_params.stream_position or 'now'
49+
self._long_polling_info: Optional[RealtimeServer] = None
50+
self._long_polling_retries: int = 0
51+
self._started: bool = False
52+
self._stopped: bool = False
53+
self._stop_event = threading.Event()
54+
self._deduplication_size = 1000
55+
self._dedupHash = dict()
56+
57+
def __iter__(self) -> Generator[Event, None, None]:
58+
"""Make EventStream iterable. Yields Event objects."""
59+
return self._event_generator()
60+
61+
def _event_generator(self) -> Generator[Event, None, None]:
62+
"""Generator that yields Event objects from the stream."""
63+
if not self._started:
64+
self._started = True
65+
66+
try:
67+
# Start with fetching events to get initial events and stream position
68+
yield from self._fetch_events()
69+
70+
# Then start long polling loop
71+
while not self._stopped and not self._stop_event.is_set():
72+
try:
73+
action = self._get_long_poll_info_and_poll()
74+
75+
if action == EventStreamAction.FETCH_EVENTS:
76+
# Fetch new events when notified
77+
yield from self._fetch_events()
78+
elif action == EventStreamAction.RECONNECT:
79+
# Continue the loop to get new long polling info
80+
continue
81+
elif action == EventStreamAction.RETRY:
82+
# Wait a bit before retrying
83+
if not self._stop_event.wait(5):
84+
continue
85+
else:
86+
break
87+
elif action == EventStreamAction.STOP:
88+
break
89+
else:
90+
# Continue long polling
91+
continue
92+
93+
except Exception as e:
94+
if not self._stopped and not self._stop_event.is_set():
95+
# Wait a bit before retrying
96+
if not self._stop_event.wait(5):
97+
continue
98+
break
99+
100+
except Exception as e:
101+
if not self._stopped and not self._stop_event.is_set():
102+
pass
103+
return
104+
105+
def stop(self):
106+
"""Stop the event stream."""
107+
self._stopped = True
108+
self._stop_event.set()
109+
110+
def _get_long_poll_info(self):
111+
"""Fetch long polling info from the server."""
112+
if self._stopped or self._stop_event.is_set():
113+
return
114+
115+
try:
116+
info = self._events_manager.get_events_with_long_polling()
117+
118+
server = next(
119+
(e for e in (info.entries or []) if e.type == 'realtime_server'), None
120+
)
121+
if not server:
122+
raise BoxSDKError(message='No realtime server found in the response.')
123+
124+
self._long_polling_info = server
125+
self._long_polling_retries = 0
126+
127+
except Exception as error:
128+
if not self._stopped and not self._stop_event.is_set():
129+
raise error
130+
131+
def _get_long_poll_info_and_poll(self) -> str:
132+
"""Get long polling info and perform a long poll, returning the action to take."""
133+
if self._stopped or self._stop_event.is_set():
134+
return 'stop'
135+
136+
# Get long polling info if needed
137+
if not self._long_polling_info or self._long_polling_retries > int(
138+
self._long_polling_info.max_retries or '10'
139+
):
140+
self._get_long_poll_info()
141+
142+
return self._do_long_poll()
143+
144+
def _do_long_poll(self) -> str:
145+
"""Perform the long polling request and return action to take."""
146+
if self._stopped or self._stop_event.is_set():
147+
return EventStreamAction.STOP
148+
149+
try:
150+
self._long_polling_retries += 1
151+
152+
long_poll_url = self._long_polling_info.url
153+
separator = '&' if '?' in long_poll_url else '?'
154+
long_poll_with_stream_position = (
155+
f"{long_poll_url}{separator}stream_position={self._stream_position}"
156+
)
157+
158+
response: FetchResponse = (
159+
self._events_manager.network_session.network_client.fetch(
160+
FetchOptions(
161+
url=long_poll_with_stream_position,
162+
method='GET',
163+
headers={
164+
'Content-Type': 'application/json',
165+
},
166+
response_format=ResponseFormat.JSON,
167+
auth=self._events_manager.auth,
168+
network_session=self._events_manager.network_session,
169+
)
170+
)
171+
)
172+
173+
if self._stopped or self._stop_event.is_set():
174+
return EventStreamAction.STOP
175+
176+
if response.data:
177+
message = response.data
178+
179+
if isinstance(message, dict):
180+
message_text = message.get('message', '')
181+
182+
if message_text == RealtimeServerEvent.NEW_CHANGE:
183+
return EventStreamAction.FETCH_EVENTS
184+
elif message_text == RealtimeServerEvent.RECONNECT:
185+
return EventStreamAction.RECONNECT
186+
187+
# Continue long polling
188+
return self._do_long_poll()
189+
190+
except Exception as error:
191+
if not self._stopped and not self._stop_event.is_set():
192+
return 'retry'
193+
194+
return 'stop'
195+
196+
def _fetch_events(self) -> Generator[Event, None, None]:
197+
"""Fetch events from the API and yield Event objects."""
198+
if self._stopped or self._stop_event.is_set():
199+
return
200+
201+
try:
202+
# Prepare query parameters for the get_events call
203+
fetch_params = self._query_params.__dict__
204+
fetch_params['stream_position'] = self._stream_position
205+
206+
# Add extra headers if provided
207+
if self._headers_input and self._headers_input.extra_headers:
208+
fetch_params['extra_headers'] = self._headers_input.extra_headers
209+
210+
events: Events = self._events_manager.get_events(**fetch_params)
211+
212+
# Update stream position for next request
213+
if events.next_stream_position is not None:
214+
self._stream_position = str(events.next_stream_position)
215+
else:
216+
self._stream_position = 'now'
217+
218+
# Yield Event objects if any
219+
if events.entries:
220+
for event in events.entries:
221+
event_id = event.event_id
222+
if event_id not in self._dedupHash:
223+
self._dedupHash[event_id] = True
224+
if self._stopped or self._stop_event.is_set():
225+
return
226+
yield event
227+
228+
if len(self._dedupHash) > self._deduplication_size:
229+
self._dedupHash.clear()
230+
event_ids = list(events.entries.map(lambda e: e.event_id))
231+
for event_id in event_ids:
232+
self._dedupHash[event_id] = True
233+
234+
except Exception as error:
235+
if not self._stopped and not self._stop_event.is_set():
236+
raise error

0 commit comments

Comments
 (0)