Skip to content

Commit cc1367d

Browse files
committed
Version 1.0.27: Example retry logic for real-time connections
1 parent 484e77c commit cc1367d

File tree

7 files changed

+71
-19
lines changed

7 files changed

+71
-19
lines changed

README.md

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -94,17 +94,17 @@ with open('output.csv') as fp:
9494

9595
### Streaming real-time data
9696

97-
It is possible to subscribe to a real-time stream for a dataset:
97+
It is possible to subscribe to a real-time stream for a dataset.
9898

99-
```python
100-
ds = api.get_dataset(dataset_id='us500')
101-
for record in ds.request_realtime():
102-
print(record)
103-
print(record.timestamp_utc, record.entity_name,
104-
record['event_relevance'])
105-
```
99+
Once you create a streaming connection to the real-time feed with your dataset,
100+
you will receive analytics records as soon as they are published.
101+
102+
103+
It is suggested to handle possible disconnection with a retry policy.
104+
You can find a [real-time streaming example here](ravenpackapi/examples/get_realtime_news.py).
106105

107-
The Result object takes care of converting the various fields to the appropriate type, so `record.timestamp_utc` will be a `datetime`
106+
The Result object handles the conversion of various fields into the appropriate type,
107+
i.e. `record.timestamp_utc` will be converted to `datetime`
108108

109109
### Entity mapping
110110

@@ -157,3 +157,24 @@ for ticker in references.tickers:
157157
if ticker.is_valid():
158158
print(ticker)
159159
```
160+
161+
### Accessing the low-level requests
162+
163+
RavenPack API wrapper is using the [requests library](https://2.python-requests.org) to do HTTPS requests,
164+
you can set common requests parameters to all the outbound calls by setting the `common_request_params` attribute.
165+
166+
For example, to disable HTTPS certificate verification and to setup your internal proxy:
167+
168+
```python
169+
api = RPApi()
170+
api.common_request_params.update(
171+
dict(
172+
proxies={'https': 'http://your_internal_proxy:9999'},
173+
verify=False,
174+
)
175+
)
176+
177+
# use the api to do requests
178+
```
179+
180+
PS. For setting your internal proxies, requests will honor the HTTPS_PROXY environment variable.

ravenpackapi/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
from .models.dataset import Dataset
22
from .core import RPApi
3+
from .exceptions import ApiConnectionError

ravenpackapi/core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from ravenpackapi.utils.constants import JSON_AVAILABLE_FIELDS, ENTITY_TYPES
1515

1616
_VALID_METHODS = ('get', 'post', 'put', 'delete')
17-
VERSION = '1.0.26'
17+
VERSION = '1.0.27'
1818

1919
logger = logging.getLogger("ravenpack.core")
2020

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,38 @@
11
import logging
2+
import random
3+
import time
24

3-
from ravenpackapi import RPApi
5+
from ravenpackapi import RPApi, ApiConnectionError
46

57
logging.basicConfig(level=logging.DEBUG)
8+
logger = logging.getLogger(__name__)
9+
610
# initialize the API (here we use the RP_API_KEY in os.environ)
711
api = RPApi()
12+
api.common_request_params['timeout'] = (10, None) # set a timeout on connection
813

914
# query the realtime feed
1015
ds = api.get_dataset(dataset_id='us500')
1116

12-
for record in ds.request_realtime():
13-
print(record)
14-
print(record.timestamp_utc, record.entity_name,
15-
record['event_relevance'])
17+
18+
def wait_between_attempts():
19+
""" Incremental backoff between connection attempts """
20+
wait_time = 0.3 # time is in seconds
21+
while True:
22+
yield wait_time
23+
wait_time = min(wait_time * 1.5, 30)
24+
wait_time *= (100 + random.randint(0, 50)) / 100
25+
26+
27+
wait_time = wait_between_attempts()
28+
while True:
29+
try:
30+
for record in ds.request_realtime():
31+
print(record)
32+
print(record.timestamp_utc, record.entity_name,
33+
record['event_relevance'])
34+
except (KeyboardInterrupt, SystemExit):
35+
break
36+
except ApiConnectionError as e:
37+
logger.error("Connection error %s: reconnecting..." % e)
38+
time.sleep(next(wait_time))

ravenpackapi/exceptions.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from functools import wraps
22

3+
import requests
4+
35

46
class APIException(Exception):
57
def __init__(self, *args, **kwargs):
@@ -37,3 +39,7 @@ def decorated_func(instance, *args, **kwargs):
3739

3840
class ValidationError(Exception):
3941
pass
42+
43+
44+
class ApiConnectionError(requests.ConnectionError):
45+
pass

ravenpackapi/models/dataset.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,8 @@ def request_datafile(self, start_date, end_date,
224224
@api_method
225225
def request_realtime(self):
226226
api = self.api
227-
endpoint = "{base}/{dataset_id}".format(base=api._FEED_BASE_URL,
228-
dataset_id=self.id)
227+
endpoint = "{base}/{dataset_id}?keep_alive".format(base=api._FEED_BASE_URL,
228+
dataset_id=self.id)
229229
logger.debug("Connecting with RT feed: %s" % endpoint)
230230
response = requests.get(endpoint,
231231
headers=api.headers,
@@ -241,4 +241,5 @@ def request_realtime(self):
241241
response.encoding = 'utf-8'
242242

243243
for line in response.iter_lines(decode_unicode=True, chunk_size=1):
244-
yield Result(line)
244+
if line: # skip empty lines to support keep-alive
245+
yield Result(line)

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from setuptools import setup, find_packages
22

3-
VERSION = '1.0.26'
3+
VERSION = '1.0.27'
44

55
with open('README.rst') as readme_file:
66
readme = readme_file.read()

0 commit comments

Comments
 (0)