-
Notifications
You must be signed in to change notification settings - Fork 2
/
content.py
88 lines (81 loc) · 3.56 KB
/
content.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
"""Content related REST endpoints"""
import logging
from dataclasses import fields
from services.rest import RestService
from objects.content_object import ContentObject
from objects.policy import Policy
class ContentService:
""" Content related endpoints"""
def __init__(self, rest: RestService, logger: logging.Logger = None):
""" Initiate the Service
"""
self._ca_rest = rest
self._base_endpoint = '/api/v1/content'
self._logger = logger or logging.getLogger(__name__)
def get_content(self,
content_id: str ='',
content_fields_list:[str]=None) -> ContentObject:
""" Get content object
"""
response = self._ca_rest.get(
endpoint=f'{self._base_endpoint}/{content_id}',
params={'fields':','.join(content_fields_list)} if content_fields_list is not None
else None)
# need to return policies
permissions_list = []
if 'policies' in response.data:
for element in response.data["policies"]:
permissions_list.append(Policy(
permissions= element['permissions'],
securityObject = element['securityObject'])
)
return ContentObject(id=response.data['id'],
type=response.data['type'],
defaultName=response.data['defaultName'],
modificationTime=response.data['modificationTime'],
policies=permissions_list)
def get_content_items(self,
content_id: str ='') -> [ContentObject]:
""" Get content objects items, e.g. objects in folder
"""
response = self._ca_rest.get(
endpoint=f'{self._base_endpoint}/{content_id}/items')
object_list = []
if 'content' in response.data:
for obj in response.data['content']:
object_list.append(
ContentObject(id=obj['id'],
type=obj['type'],
modificationTime=obj['modificationTime'],
defaultName=obj['defaultName'])
)
return object_list
def update_content(self,
content_object: ContentObject):
""" Update content object
"""
class_attributes = set(f.name for f in fields(ContentObject))
data = {}
for attr in class_attributes:
if attr not in ('policies'):
data[attr] = content_object.__getattribute__(attr)
#TODO: there must be a better way to deserialize nested class to JSON
policy_list = []
for policy in content_object.policies:
#print (policy.to_json())
policy_serialized = {}
permission_list = []
for permission in policy.permissions:
permission_list.append(permission)
policy_serialized['securityObject'] = policy.securityObject
policy_serialized['permissions'] = permission_list
policy_list.append(policy_serialized)
data['policies'] = policy_list
response = self._ca_rest.put(
endpoint=f'{self._base_endpoint}/{content_object.id}',
data=data)
if response.status_code == 204:
logging.debug('Object %s updated sucessfully',content_object.defaultName)
else:
logging.warning('Updating object %s failed with %s'
,content_object.defaultName, response.message)