forked from PagerDuty/pdpyras
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pdpyras.py
1328 lines (1154 loc) · 50.3 KB
/
pdpyras.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright (c) PagerDuty.
# See LICENSE for details.
import logging
import sys
import time
import warnings
from copy import deepcopy
from random import random
import requests
from urllib3.exceptions import HTTPError, PoolError
from requests.exceptions import RequestException
if sys.version_info[0] >= 3:
string_types = str
else:
string_types = basestring
warnings.warn('Module pdpyras will no longer support Python 2.7 as of '
'June 21, 2021.')
__version__ = '4.1.4'
# These are API resource endpoints/methods for which multi-update is supported
VALID_MULTI_UPDATE_PATHS = [
('incidents', '{index}'),
('incidents', '{id}', 'alerts', '{index}'),
('priorities', '{index}'),
]
ITERATION_LIMIT = 1e4
TIMEOUT = 5
#########################
### UTILITY FUNCTIONS ###
#########################
def auto_json(method):
"""
Function decorator that makes methods return the full response JSON
"""
def call(self, path, **kw):
response = raise_on_error(method(self, path, **pass_kw))
return try_decoding(response)
return call
def last_4(secret):
"""Returns an abbreviation of the input"""
return '*'+str(secret)[-4:]
def object_type(r_name):
"""
Derives an object type (i.e. ``user``) from a resource name (i.e. ``users``)
:param r_name:
Resource name, i.e. would be ``users`` for the resource index URL
``https://api.pagerduty.com/users``
:returns: The object type name; usually the ``type`` property of an instance
of the given resource.
:rtype: str
"""
if r_name.endswith('ies'):
# Because English
return r_name[:-3]+'y'
else:
return r_name.rstrip('s')
def raise_on_error(r):
"""
Raise an exception if a HTTP error response has error status.
:param r: Response object corresponding to the response received.
:type r: `requests.Response`_
:returns: The response object, if its status was success
:rtype: `requests.Response`_
"""
if r.ok:
return r
else:
raise PDClientError("%s %s: API responded with non-success status "
"(%d)"%(
r.request.method.upper(),
r.request.url.replace('https://api.pagerduty.com', ''),
r.status_code
), response=r
)
def resource_envelope(method):
"""
Convenience and consistency decorator for HTTP verb functions.
This makes the request methods ``GET``, ``POST`` and ``PUT`` always return a
dictionary object representing the resource at the envelope property (i.e.
the ``{...}`` from ``{"escalation_policy":{...}}`` in a get/put request to
an escalation policy) rather than a `requests.Response`_ object.
Methods using this decorator will raise a :class:`PDClientError` with its
``response`` property being being the `requests.Response`_ object in the
case of any error, so that the implementer can access it by catching the
exception, and thus design their own custom logic around different types of
error responses.
It allows creation of methods that can provide more succinct ways of making
API calls. In particular, the methods using this decorator don't require
checking for a success status, JSON-decoding the response body and then
pulling the essential data out of the envelope (i.e. for ``GET
/escalation_policies/{id}`` one would have to access the
``escalation_policy`` property of the object decoded from the response body,
assuming nothing went wrong in the whole process).
These methods are :attr:`APISession.rget`, :attr:`APISession.rpost` and
:attr:`APISession.rput`.
:param method: Method being decorated. Must take one positional argument
after ``self`` that is the URL/path to the resource, and must return an
object of class `requests.Response`_, and be named after the HTTP method
but with "r" prepended.
:returns: A callable object; the reformed method
"""
global VALID_MULTI_UPDATE_PATHS
http_method = method.__name__.lstrip('r')
def call(self, path, **kw):
pass_kw = deepcopy(kw) # Make a copy for modification
nodes = tokenize_url_path(path, baseurl=self.url)
is_index = nodes[-1] == '{index}'
resource = nodes[-2]
multi_put = http_method == 'put' and nodes in VALID_MULTI_UPDATE_PATHS
envelope_name_single = object_type(resource) # Usually the "type"
if is_index and http_method=='get' or multi_put:
# Plural resource name, for index action (GET /<resource>), or for
# multi-update (PUT /<resource>). In both cases, the response
# (former) or request (latter) body is {<resource>:[<objects>]}
envelope_name = resource
else:
# Individual resource create/read/update
# Body = {<singular-resource-type>: {<object>}}
envelope_name = envelope_name_single
# Validate the abbreviated (or full) request payload, and automatically
# fill the gap for the implementer if some assumptions hold true:
if http_method in ('post', 'put') and 'json' in pass_kw and \
envelope_name not in pass_kw['json']:
pass_kw['json'] = {envelope_name: pass_kw['json']}
r = raise_on_error(method(self, path, **pass_kw))
# Now let's try to unpack...
response_obj = try_decoding(r)
# Get the encapsulated object
if envelope_name not in response_obj:
raise PDClientError("Cannot extract object; expected top-level "
"property \"%s\", but could not find it in the response "
"schema. Response body=%s"%(envelope_name, r.text[:99]),
response=r)
return None
return response_obj[envelope_name]
return call
def resource_name(obj_type):
"""
Transforms an object type into a resource name
:param obj_type:
The object type, i.e. ``user`` or ``user_reference``
:returns: The name of the resource, i.e. the last part of the URL for the
resource's index URL
:rtype: str
"""
if obj_type.endswith('_reference'):
# Strip down to basic type if it's a reference
obj_type = obj_type[:obj_type.index('_reference')]
if obj_type.endswith('y'):
# Because English
return obj_type[:-1]+'ies'
else:
return obj_type+'s'
def resource_path(method):
"""
API call decorator that allows passing a resource dict as the path/URL
Most resources returned by the API will contain a ``self`` attribute that is
the URL of the resource itself.
Using this decorator allows the implementer to pass either a URL/path or
such a resource dictionary as the ``path`` argument, thus eliminating the
need to re-construct the resource URL or hold it in a temporary variable.
"""
def call(self, resource, **kw):
url = resource
if type(resource) is dict and 'self' in resource: # passing an object
url = resource['self']
return method(self, url, **kw)
return call
def tokenize_url_path(url, baseurl='https://api.pagerduty.com'):
"""
Classifies a URL according to some global patterns in the API.
If the URL is to access a specific individual resource by ID, the node type
will be identified as ``{id}``, whereas if it is an index, it will be
identified as ``{index}``.
For instance, ``https://api.pagerduty.com/users`` would be classified as
``("users", "{index}")``, and ``https://api.pagerduty.com/users/PABC123``
would be classified as ``("users", "{id}")``
:param url:
The URL (or path) to be classified; the function should accept either
:param baseurl:
API base URL
:type method: str
:type url: str
:type baseurl: str
:rtype: tuple
"""
urlnparams = url.split('#')[0].split('?') # Ignore all #'s / params
url_nodes = urlnparams[0].lstrip('/').split('/')
path_index = 0
invalid_url = ValueError('Invalid API resource URL: '+url[:99])
# Validate URL or path:
if url.startswith(baseurl):
# Full URL: path starts after the third forward slash
path_index = 3
elif url.startswith('http') and url_nodes[0].endswith(':'):
# Full URL but not within the REST API
raise invalid_url
if len(url_nodes) - path_index < 1:
# Incomplete URL (API web root is not a valid resource)
raise invalid_url
# Path nodes generally start after the hostname, at path_index
path_nodes = tuple(url_nodes[path_index:])
if '' in path_nodes:
# Empty node due to two consecutive unescaped forward slashes (or
# trailing slash in the case of it being just the base URL plus slash)
raise invalid_url
# Tokenize / classify the URL now:
tokenized_nodes = [path_nodes[0]]
if len(path_nodes) >= 3:
# It's an endpoint like one of the following
# /{resource}/{id}/{sub-resource}
# We're interested in {resource} and {sub_resource}.
# More deeply-nested endpoints not known to exist.
tokenized_nodes.extend(('{id}', path_nodes[2]))
# If the number of path nodes is even: it's an individual resource URL, and
# the resource name will be the second to last path node. Otherwise, it is
# a resource index, and the resource name will be the last pathnode.
# However, if the request was GET, and made to an index endpoint, the
# envelope property should simply be the resource name.
#
# This is a ubiquitous pattern throughout the PagerDuty REST API: path
# nodes alternate between identifiers and resource names.
final_node_type = '{id}'
if len(path_nodes)%2 == 1:
final_node_type = '{index}'
tokenized_nodes.append(final_node_type)
return tuple(tokenized_nodes)
def try_decoding(r):
"""
JSON-decode a response body and raise :class:`PDClientError` if it fails.
:param r:
`requests.Response`_ object
"""
try:
return r.json()
except ValueError as e:
raise PDClientError("API responded with invalid JSON: "+r.text[:99],
response=r)
###############
### CLASSES ###
###############
class PDSession(requests.Session):
"""
Base class for making HTTP requests to PagerDuty APIs.
Instances of this class are essentially the same as `requests.Session`_
objects, but with a few modifications:
- The client will reattempt the request with configurable, auto-increasing
cooldown/retry intervals if encountering a network error or rate limit
- When making requests, headers specified ad-hoc in calls to HTTP verb
functions will not replace, but will be merged with, default headers.
- The request URL, if it doesn't already start with the REST API base URL,
will be prepended with the default REST API base URL.
- It will only perform requests with methods as given in the
:attr:`permitted_methods` list, and will raise :class:`PDClientError` for
any other HTTP methods.
"""
log = None
"""A ``logging.Logger`` object for printing messages."""
max_http_attempts = 10
"""
The number of times that the client will retry after error statuses, for any
that are defined greater than zero in :attr:`retry`.
"""
max_network_attempts = 3
"""
The number of times that connecting to the API will be attempted before
treating the failure as non-transient; a :class:`PDClientError` exception
will be raised if this happens.
"""
parent = None
"""The ``super`` object (`requests.Session`_)"""
permitted_methods = ()
raise_if_http_error = True
"""
Raise an exception upon receiving an error response from the server.
This affects iteration (in the REST API) as well as the generic request
method itself.
In the general case: if set to True, then upon receiving a non-transient
HTTP error (from too many retries), an exception will be raised. Otherwise,
the response object will be returned.
In iteration: if set to true, an exception will be raised in
:attr:`iter_all` if a HTTP error is encountered. This is the default
behavior in versions >= 2.1.0. If False, the behavior is to halt iteration
upon receiving a HTTP error.
"""
retry = {}
"""
A dict defining the retry behavior for each HTTP response status code.
Note, any value set for this class variable will not be reflected in
instances and so it must be set separately for each instance.
Each key in this dictionary represents a HTTP response code. The behavior is
specified by the value at each key as follows:
* ``-1`` to retry infinitely
* ``0`` to return the `requests.Response`_ object and exit (which is the
default behavior)
* ``n``, where ``n > 0``, to retry ``n`` times (or up
to :attr:`max_http_attempts` total for all statuses, whichever is
encountered first), and raise a :class:`PDClientError` after that many
attempts. For each successive attempt, the wait time will increase by a
factor of :attr:`sleep_timer_base`.
The default behavior is to retry infinitely on a 429, and return the
response in any other case (assuming a HTTP response was received from the
server).
"""
sleep_timer = 1.5
"""
Default initial cooldown time factor for rate limiting and network errors.
Each time that the request makes a followup request, there will be a delay
in seconds equal to this number times :attr:`sleep_timer_base` to the power
of how many attempts have already been made so far.
"""
sleep_timer_base = 2
"""
After each retry, the time to sleep before reattempting the API connection
and request will increase by a factor of this amount.
"""
url = ""
def __init__(self, api_key, name=None):
"""
Basic constructor for API sessions.
:param api_key:
The API credential to use.
:param name:
Identifying label for the session to use in log messages
"""
self.parent = super(PDSession, self)
self.parent.__init__()
self.api_key = api_key
if isinstance(name, string_types) and name:
my_name = name
else:
my_name = self.trunc_key
self.log = logging.getLogger('pdpyras.%s(%s)'%(
self.__class__.__name__, my_name))
self.retry = {}
def after_set_api_key(self):
"""
Setter hook for setting or updating the API key.
Child classes should implement this to perform additional steps.
"""
pass
@property
def api_key(self):
"""
API Key property getter.
Returns the _api_key attribute's value.
"""
return self._api_key
@api_key.setter
def api_key(self, api_key):
if not (isinstance(api_key, string_types) and api_key):
raise ValueError("API credential must be a non-empty string.")
self._api_key = api_key
self.headers.update(self.auth_header)
self.after_set_api_key()
@property
def api_key_access(self):
"""
Memoized API key access type getter.
Will be "user" if the API key is a user-level token (all users should
have permission to create an API key with the same permissions as they
have in the PagerDuty web UI).
If the API key in use is an account-level API token (as only a global
administrator user can create), this property will be "account".
"""
if not hasattr(self, '_api_key_access') or self._api_key_access is None:
response = self.get('/users/me')
if response.status_code == 400:
message = try_decoding(response).get('error', '')
if 'account-level access token' in message:
self._api_key_access = 'account'
else:
self._api_key_access = None
self.log.error("Failed to obtain API key access level; "
"the API did not respond as expected.")
self.log.debug("Body = %s", response.text[:99])
else:
self._api_key_access = 'user'
return self._api_key_access
@property
def auth_header(self):
"""
Generates the header with the API credential used for authentication.
"""
raise NotImplementedError
def cooldown_factor(self):
return self.sleep_timer_base*(1+self.stagger_cooldown*random())
def postprocess(self, response):
"""
Perform supplemental actions immediately after receiving a response.
"""
pass
def prepare_headers(self, method):
"""
Append special additional per-request headers.
:param method:
The HTTP method, in upper case.
"""
return self.headers
def request(self, method, url, **kwargs):
"""
Make a generic PagerDuty API request.
:param method:
The request method to use. Case-insensitive. May be one of get, put,
post or delete.
:param url:
The path/URL to request. If it does not start with the base URL, the
base URL will be prepended.
:param \*\*kwargs:
Additional keyword arguments to pass to `requests.Session.request
<https://2.python-requests.org/en/master/api/#requests.Session.request>`_
:type method: str
:type url: str
:returns: the HTTP response object
:rtype: `requests.Response`_
"""
sleep_timer = self.sleep_timer
network_attempts = 0
http_attempts = {}
method = method.strip().upper()
if method not in self.permitted_methods:
raise PDClientError(
"Method %s not supported by this API. Permitted methods: %s"%(
method, ', '.join(self.permitted_methods)))
req_kw = deepcopy(kwargs)
my_headers = self.prepare_headers(method)
# Merge, but do not replace, any headers specified in keyword arguments:
if 'headers' in kwargs:
my_headers.update(kwargs['headers'])
req_kw.update({'headers': my_headers, 'stream': False, 'timeout': TIMEOUT})
# Compose/normalize URL whether or not path is already a complete URL
if url.startswith(self.url) or not self.url:
my_url = url
else:
my_url = self.url + "/" + url.lstrip('/')
# Make the request (and repeat w/cooldown if the rate limit is reached):
while True:
try:
response = self.parent.request(method, my_url, **req_kw)
self.postprocess(response)
except (HTTPError, PoolError, RequestException) as e:
network_attempts += 1
if network_attempts > self.max_network_attempts:
raise PDClientError("Non-transient network error; exceeded "
"maximum number of attempts (%d) to connect to the "
"API"%self.max_network_attempts)
sleep_timer *= self.cooldown_factor()
self.log.debug("HTTP or network error: %s: %s; retrying in %g "
"seconds.", e.__class__.__name__, e, sleep_timer)
time.sleep(sleep_timer)
continue
status = response.status_code
retry_logic = self.retry.get(status, 0)
if not response.ok and retry_logic != 0:
# Take special action as defined by the retry logic
if retry_logic != -1:
# Retry a specific number of times (-1 implies infinite)
if http_attempts.get(status, 0)>=retry_logic or \
sum(http_attempts.values())>self.max_http_attempts:
self.log.error("Non-transient HTTP error: exceeded " \
"maximum number of attempts (%d) to make a " \
"successful request. Currently encountering "
"status %d.", self.retry[status], status)
return response
http_attempts[status] = 1 + http_attempts.get(status, 0)
sleep_timer *= self.sleep_timer_base
self.log.debug("HTTP error (%d); retrying in %g seconds.",
status, sleep_timer)
time.sleep(sleep_timer)
continue
elif status == 429:
sleep_timer *= self.sleep_timer_base
self.log.debug("Hit API rate limit (response status 429); "
"retrying in %g seconds", sleep_timer)
time.sleep(sleep_timer)
continue
elif status == 401:
# Stop. Authentication failed. We shouldn't try doing any more,
# because we'll run into problems later anyway.
raise PDClientError(
"Received 401 Unauthorized response from the API. The "
"access key (%s) might not be valid."%self.trunc_key,
response=response)
else:
# All went according to plan.
return response
def set_api_key(self, api_key):
"""
(Deprecated) set the API key/token.
:param api_key:
The API key to use
:type api_key: str
"""
raise DeprecationWarning("This method is deprecated. Please use the "
"object setter directly (i.e. session.api_key = <value>) or "
"implement the after_set_api_key method in a child class of "
"PDSession to define a hook that runs when the API credential is "
"changed.")
self.api_key = api_key
@property
def stagger_cooldown(self):
"""
Randomizing factor for wait times between retries during rate limiting.
If set to number greater than 0, the sleep time for rate limiting will
(for each successive sleep) be adjusted by a factor of one plus a
uniformly-distributed random number between 0 and 1 times this number,
on top of the base sleep timer :attr:`sleep_timer_base`.
For example:
* If this is 1, and :attr:`sleep_timer_base` is 2 (default), then after
each status 429 response, the sleep time will change overall by a
random factor between 2 and 4, whereas if it is zero, it will change
by a factor of 2.
* If :attr:`sleep_timer_base` is 1, then the cooldown time will be
adjusted by a random factor between one and one plus this number.
If the number is set to zero, then this behavior is effectively
disabled, and the cooldown factor (by which the sleep time is adjusted)
will just be :attr:`sleep_timer_base`
Setting this to a nonzero number helps avoid the "thundering herd"
effect that can potentially be caused by many API clients making
simultaneous concurrent API requests and consequently waiting for the
same amount of time before retrying. It is currently zero by default
for consistent behavior with previous versions.
"""
if hasattr(self, '_stagger_cooldown'):
return self._stagger_cooldown
else:
return 0
@stagger_cooldown.setter
def stagger_cooldown(self, val):
if type(val) not in [float, int] or val<0:
raise ValueError("Cooldown randomization factor stagger_cooldown "
"must be a positive real number")
self._stagger_cooldown = val
@property
def trunc_key(self):
"""Truncated key for secure display/identification purposes."""
return last_4(self.api_key)
@property
def user_agent(self):
return 'pdpyras/%s python-requests/%s Python/%d.%d'%(
__version__,
requests.__version__,
sys.version_info.major,
sys.version_info.minor
)
class EventsAPISession(PDSession):
"""
Session class for submitting events to the PagerDuty v2 Events API.
Provides methods for submitting events to the Events API.
Inherits from :class:`PDSession`.
"""
permitted_methods = ('POST',)
url = "https://events.pagerduty.com"
@property
def auth_header(self):
return {}
def acknowledge(self, dedup_key):
"""
Acknowledge an alert via Events API.
:param dedup_key:
The deduplication key of the alert to set to the acknowledged state.
"""
return self.send_event('acknowledge', dedup_key=dedup_key)
def prepare_headers(self, method):
"""Add user agent and content type headers for Events API requests."""
headers = deepcopy(self.headers)
headers.update({
'Content-Type': 'application/json',
'User-Agent': self.user_agent,
})
return headers
def resolve(self, dedup_key):
"""
Resolve an alert via Events API.
:param dedup_key:
The deduplication key of the alert to resolve.
"""
return self.send_event('resolve', dedup_key=dedup_key)
def send_event(self, action, dedup_key=None, **properties):
"""
Send an event to the v2 Events API.
See: https://v2.developer.pagerduty.com/docs/send-an-event-events-api-v2
:param action:
The action to perform through the Events API: trigger, acknowledge
or resolve.
:param dedup_key:
The deduplication key; used for determining event uniqueness and
associating actions with existing incidents.
:param \*\*properties:
Additional properties to set, i.e. if ``action`` is ``trigger``
this would include ``payload``
:type action: str
:type dedup_key: str
:returns:
The deduplication key of the incident, if any.
"""
actions = ('trigger', 'acknowledge', 'resolve')
if action not in actions:
raise ValueError("Event action must be one of: "+', '.join(actions))
event = {'event_action':action}
event.update(properties)
if isinstance(dedup_key, string_types):
event['dedup_key'] = dedup_key
elif not action == 'trigger':
raise ValueError("The dedup_key property is required for"
"event_action=%s events, and it must be a string."%action)
response = self.post('/v2/enqueue', json=event)
raise_on_error(response)
response_body = try_decoding(response)
if not 'dedup_key' in response_body:
raise PDClientError("Malformed response body; does not contain "
"deduplication key.", response=response)
return response_body['dedup_key']
def post(self, *args, **kw):
"""
Override of ``requests.Session.post``
Adds the ``routing_key`` parameter to the body before sending.
"""
if 'json' in kw and hasattr(kw['json'], 'update'):
kw['json'].update({'routing_key': self.api_key})
return super(EventsAPISession, self).post(*args, **kw)
def trigger(self, summary, source, dedup_key=None, severity='critical',
payload=None, custom_details=None, images=None, links=None):
"""
Trigger an incident
:param summary:
Summary / brief description of what is wrong.
:param source:
A human-readable name identifying the system that is affected.
:param dedup_key:
The deduplication key; used for determining event uniqueness and
associating actions with existing incidents.
:param severity:
Alert severity. Sets the ``payload.severity`` property.
:param payload:
Set the payload directly. Can be used in conjunction with other
parameters that also set payload properties; these properties will
be merged into the default payload, and any properties in this
parameter will take precedence except with regard to
``custom_details``.
:param custom_details:
The ``payload.custom_details`` property of the payload. Will
override the property set in the ``payload`` parameter if given.
:param images:
Set the ``images`` property of the event.
:param links:
Set the ``links`` property of the event.
:type action: str
:type summary: str
:type dedup_key: str
:type severity: str
:type payload: dict
:type custom_details: dict
:type images: list
:type links: list
:rtype: str
:type summary: str
:type source: str
:type dedup_key: str
:type severity: str
:type payload: dict
:type custom_details: dict
:type images: list
:type links: list
"""
for local in ('payload', 'custom_details'):
local_var = locals()[local]
if not (local_var is None or type(local_var) is dict):
raise ValueError(local+" must be a dict")
event = {'payload': {'summary':summary, 'source':source,
'severity':severity}}
if type(payload) is dict:
event['payload'].update(payload)
if type(custom_details) is dict:
details = event.setdefault('payload', {}).get('custom_details', {})
details.update(custom_details)
event['payload']['custom_details'] = details
if images:
event['images'] = images
if links:
event['links'] = links
return self.send_event('trigger', dedup_key=dedup_key, **event)
class APISession(PDSession):
"""
Reusable PagerDuty REST API session objects for making API requests.
Includes some convenience functions as well, i.e. :attr:`rget`, :attr:`find`
and :attr:`iter_all`, to eliminate some repetitive tasks associated with
making API calls.
Inherits from :class:`PDSession`.
:param api_key:
REST API access token to use for HTTP requests
:param name:
Optional name identifier for logging. If unspecified or ``None``, it
will be the last four characters of the REST API token.
:param default_from:
Email address of a valid PagerDuty user to use in API requests by
default as the ``From`` header (see: `HTTP Request Headers`_)
:type token: str
:type name: str or None
:type default_from: str or None
:members:
"""
api_call_counts = None
"""A dict object recording the number of API calls per endpoint"""
api_time = None
"""A dict object recording the total time of API calls to each endpoint"""
default_from = None
"""The default value to use as the ``From`` request header"""
default_page_size = 100
"""
This will be the default number of results requested in each page when
iterating/querying an index (the ``limit`` parameter). See: `pagination`_.
"""
permitted_methods = ('GET', 'POST', 'PUT', 'DELETE')
url = 'https://api.pagerduty.com'
"""Base URL of the REST API"""
def __init__(self, api_key, name=None, default_from=None,
auth_type='token'):
self.api_call_counts = {}
self.api_time = {}
self.auth_type = auth_type
super(APISession, self).__init__(api_key, name)
self.default_from = default_from
self.headers.update({
'Accept': 'application/vnd.pagerduty+json;version=2',
})
def after_set_api_key(self):
self._subdomain = None
@property
def auth_type(self):
"""
Defines the method of API authentication.
By default this is "token"; if "oauth2", the API key will be used.
"""
return self._auth_type
@auth_type.setter
def auth_type(self, value):
if value not in ('token', 'bearer', 'oauth2'):
raise AttributeError("auth_type value must be \"token\" (default) "
"or \"bearer\" or \"oauth\" to use OAuth2 authentication.")
self._auth_type = value
@property
def auth_header(self):
if self.auth_type in ('bearer', 'oauth2'):
return {"Authorization": "Bearer "+self.api_key}
else:
return {"Authorization": "Token token="+self.api_key}
def dict_all(self, path, **kw):
"""
Returns a dictionary of all objects from a given index endpoint.
With the exception of ``by``, all keyword arguments passed to this
method are also passed to :attr:`iter_all`; see the documentation on
that method for further details.
:param path:
The index endpoint URL to use.
:param by:
The attribute of each object to use for the key values of the
dictionary. This is ``id`` by default. Please note, there is no
uniqueness validation, so if you use an attribute that is not
distinct for the data set, this function will omit some data in the
results.
:param params:
Additional URL parameters to include.
:param paginate:
If True, use `pagination`_ to get through all available results. If
False, ignore / don't page through more than the first 100 results.
Useful for special index endpoints that don't fully support
pagination yet, i.e. "nested" endpoints like
``/users/{id}/contact_methods`` and ``/services/{id}/integrations``
"""
by = kw.pop('by', 'id')
iterator = self.iter_all(path, **kw)
return {obj[by]:obj for obj in iterator}
def find(self, resource, query, attribute='name', params=None):
"""
Finds an object of a given resource exactly matching a query.
Will query a given `resource index`_ endpoint using the ``query``
parameter supported by most indexes.
Returns a dict if a result is found. The structure will be that of an
entry in the index endpoint schema's array of results. Otherwise, it
will return `None` if no result is found or an error is encountered.
:param resource:
The name of the resource endpoint to query, i.e.
``escalation_policies``
:param query:
The string to query for in the the index.
:param attribute:
The property of each result to compare against the query value when
searching for an exact match. By default it is ``name``, but when
searching for user by email (for example) it can be set to ``email``
:param params:
Optional additional parameters to use when querying.
:type resource: str
:type query: str
:type attribute: str
:type params: dict or None
:rtype: dict
"""
query_params = {}
if params is not None:
query_params.update(params)
query_params.update({'query':query})
# When determining uniqueness, web/the API doesn't care about case.
simplify = lambda s: s.lower()
search_term = simplify(query)
equiv = lambda s: simplify(s[attribute]) == search_term
obj_iter = self.iter_all(resource, params=query_params)
return next(iter(filter(equiv, obj_iter)), None)
def iter_all(self, path, params=None, paginate=True, page_size=None,
item_hook=None, total=False):
"""
Iterator for the contents of an index endpoint or query.
Automatically paginates and yields the results in each page, until all
matching results have been yielded or a HTTP error response is received.
Each yielded value is a dict object representing a result returned from
the index. For example, if requesting the ``/users`` endpoint, each
yielded value will be an entry of the ``users`` array property in the
response; see: `List Users
<https://v2.developer.pagerduty.com/v2/page/api-reference#!/Users/get_users>`_
:param path:
The index endpoint URL to use.
:param params:
Additional URL parameters to include.
:param paginate:
If True, use `pagination`_ to get through all available results. If
False, ignore / don't page through more than the first 100 results.
Useful for special index endpoints that don't fully support
pagination yet, i.e. "nested" endpoints like (as of this writing):
``/users/{id}/contact_methods`` and ``/services/{id}/integrations``
:param page_size:
If set, the ``page_size`` argument will override the ``default_page_size``
parameter on the session and set the ``limit`` parameter to a custom
value (default is 100), altering the number of pagination results.
:param item_hook:
Callable object that will be invoked for each iteration, i.e. for
printing progress. It will be called with three parameters: a dict
representing a given result in the iteration, the number of the
item, and the total number of items in the series.
:param total:
If True, the ``total`` parameter will be included in API calls, and
the value for the third parameter to the item hook will be the total
count of records that match the query. Leaving this as False confers
a small performance advantage, as the API in this case does not have
to compute the total count of results in the query.
:type path: str
:type params: dict or None
:type paginate: bool
:type page_size: int or None
:type total: bool
:yields: Results from the index endpoint.
:rtype: dict
"""
# Validate that it's an index URL being requested:
path_nodes = tokenize_url_path(path, baseurl=self.url)
if not path_nodes[-1] == '{index}':
raise ValueError("Invalid index url/path: "+path[:99])
# Determine the resource name:
r_name = path_nodes[-2]
# Parameters to send:
data = {}
if paginate:
# Retrieve 100 at a time unless otherwise specified:
if page_size is None:
data['limit'] = self.default_page_size
else:
data['limit'] = page_size
if total: