Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion anms-core/anms/routes/ARIs/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
@router.get("/incoming", status_code=status.HTTP_200_OK)
def alerts_get():
MANAGER_CECKER.check_list()
alerts = MANAGER_CECKER.alerts
alerts = MANAGER_CECKER.get_alerts()
return list(alerts.values())


Expand Down
185 changes: 96 additions & 89 deletions anms-core/anms/routes/ARIs/reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ async def report_def_by_id(agent_id: int):
# select all reports belonging to the agent
final_res = []
agent_id_str = ""
dec = ace.ari_cbor.Decoder()
enc = ace.ari_text.Encoder()
adms = ace.AdmSet()
adms.load_default_dirs()
nn_func = ace.nickname.Converter(ace.nickname.Mode.FROM_NN , adms.db_session(), False)
stmt = select(Report).where(Report.agent_id == agent_id)
agent_id_stmt = select(RegisteredAgent).where(RegisteredAgent.registered_agents_id == agent_id)
async with get_async_session() as session:
Expand All @@ -86,104 +91,83 @@ async def report_def_by_id(agent_id: int):
# select from exec_set
try:
nonce_cbor = res.nonce_cbor
stmt = select(ExecutionSet).where(and_(ExecutionSet.agent_id == agent_id_str, ExecutionSet.nonce_cbor == nonce_cbor) )
result: Result = await session.scalars(stmt)
exc_set = result.all()
for res in exc_set:
ari_val = ""
if(res):
hex_str = res.entries.hex()
hex_str = "0x"+hex_str.upper()
ari_val = await transcoder.transcoder_put_cbor_await(hex_str)
ari_val = ari_val['data']
logger.info(str(nonce_cbor))
addition = {'exec_set': ari_val,'nonce_cbor':str(nonce_cbor)}
if addition not in final_res:
final_res.append(addition)
if(nonce_cbor != b'\xf6'): # not a null nonce
stmt = select(ExecutionSet).where(and_(ExecutionSet.agent_id == agent_id_str, ExecutionSet.nonce_cbor == nonce_cbor) )
result: Result = await session.scalars(stmt)
exc_set = result.all()
for res_exec in exc_set:
ari_val = ""
if(res_exec):
hex_str = res_exec.entries.hex()
hex_str = "0x"+hex_str.upper()
ari_val = await transcoder.transcoder_put_cbor_await(hex_str)
ari_val = ari_val['data']
addition = {'exec_set': ari_val,'nonce_cbor':str(nonce_cbor)}
if addition not in final_res:
final_res.append(addition)
else: #null nonce use report source
rpt_set = res.report_list_cbor.hex()
# Using Ace to translate CBOR into ARI object to process individual parts
in_text = '0x'+rpt_set
ari_rpt = None
try:
in_bytes = ace.cborutil.from_hexstr(in_text)
ari_rpt = dec.decode(io.BytesIO(in_bytes))
except Exception as err:
logger.error(err)

# running through and translating all parts of rptset
for rpt in ari_rpt.value.reports:
try:
enc = ace.ari_text.Encoder()
buf = io.StringIO()
enc.encode(rpt.source, buf)
out_text = buf.getvalue()
ari_val = out_text
# TODO look at better way to handle storing nonce with null
addition = {'exec_set': ari_val,'nonce_cbor':str(nonce_cbor)}
if addition not in final_res:
final_res.append(addition)
except Exception as err:
logger.error(err)

except Exception as e:
logger.error(f"Error {e}, while processing nonce:{nonce_cbor} for agent: {agent_id_str}")

return final_res


# entries tabulated returns header and values in correct order
@router.get("/entries/table/{agent_id}/{nonce_cbor}", status_code=status.HTTP_200_OK,
response_model=list)
async def report_ac(agent_id: int, nonce_cbor: str):

# handling if nonce_cbor is null
@router.get("/entries/table/{agent_id}/{nonce_cbor}", status_code=status.HTTP_200_OK)
async def report_ac(agent_id: int, nonce_cbor: str) -> dict:
ari = None
dec = ace.ari_cbor.Decoder()
enc = ace.ari_text.Encoder()
exec_set_dir = {}
logger.info(nonce_cbor)
logger.info(type(nonce_cbor))
try:
store_nonce = nonce_cbor
nonce_cbor = ast.literal_eval(nonce_cbor)
except Exception as e:
logger.error(f"{e} while processing nonce")
return []

agent_id_str =""
agent_id_stmt = select(RegisteredAgent).where(RegisteredAgent.registered_agents_id == agent_id)
async with get_async_session() as session:
result_agent: Result = await session.scalars(agent_id_stmt)
agent_id_str = result_agent.one_or_none()
agent_id_str = agent_id_str.agent_endpoint_uri

# Load in adms
# get command that made the report as first entry
stmt = select(ExecutionSet).where(and_(ExecutionSet.agent_id == agent_id_str, ExecutionSet.nonce_cbor == nonce_cbor) )
async with get_async_session() as session:
result: Result = await session.scalars(stmt)
try:
nonce_cbor = ast.literal_eval(str(bytes.fromhex(nonce_cbor)))
except Exception as e:
logger.error(f"{e} while processing nonce")
return []

# there should only be one execution per agent per nonce_cbor
# in the event that two occur pull the latest one
result = result.all()
exec_set_dir = {}

if result:
result = result[-1]
exec_set = result.entries.hex()
# use ACE to handle report set decoding
in_text = '0x'+exec_set
try:
in_bytes = ace.cborutil.from_hexstr(in_text)
ari = dec.decode(io.BytesIO(in_bytes))

except Exception as err:
logger.error(err)

# current ARI should be an exection set
if ari:
if type(ari.value) == ace.ari.ExecutionSet:
try:

# run through targets and their parameters to get all things parts translated
for targ in ari.value.targets:
buf = io.StringIO()
exec_set_entry=["time"]
enc.encode(targ, buf)
out_text_targ = buf.getvalue()
if targ is ace.LiteralARI and targ.type_id is ace.StructType.AC:
for part in targ.value:
buf = io.StringIO()
enc.encode(part, buf)
out_text = buf.getvalue()
exec_set_entry.append(out_text)
else:
exec_set_entry.append(out_text_targ)

exec_set_dir[out_text_targ] = [exec_set_entry]

except Exception as err:
logger.error(err)


# final_res.append(exec_set_entry)

# process each report in the rpt set and place inside appropiate nonce case or if null use source as key
# TODO use td off set in report set to update actual time
#
ari = None
stmt = select(Report).where(and_(Report.agent_id == agent_id, Report.nonce_cbor == nonce_cbor) )
async with get_async_session() as session:
result: Result = await session.scalars(stmt)
for res in result.all():
# used to hold final report set
addition = [res.reference_time]
curr_time = res.reference_time
# addition = {time:}
rpt_set = res.report_list_cbor.hex()
# Using Ace to translate CBOR into ARI object to process individual parts
in_text = '0x'+rpt_set
Expand All @@ -197,22 +181,45 @@ async def report_ac(agent_id: int, nonce_cbor: str):
# current ARI should be an report set
if ari:
if type(ari.value) == ace.ari.ReportSet:
# for each report in a rptset
# add to the top level nonce dict or to source dict if nonce is null null
for rpt in ari.value.reports:
try:
# structure for the reports
# time: source_name:{[values of reprots ]}
buf = io.StringIO()
enc.encode(rpt.source, buf)
rpt_src = buf.getvalue()
addition = {"time":curr_time, rpt_src:[]}
rpt_entries = []
enc = ace.ari_text.Encoder()
# running through and translating all parts of rptset
for item in rpt.items:
buf = io.StringIO()
enc.encode(item, buf)
out_text = buf.getvalue()
addition.append(out_text)
buf = io.StringIO()
enc.encode(rpt.source, buf)
out_text = buf.getvalue()
# using ace to decode the components
# item = dec.decode(item)
if type(item.value) == ace.ari.Table:
table_vals = []
for tab_val in item.value:
table_vals.append([t.value for t in tab_val])
rpt_entries.append(table_vals)
else:#handle values as normal
buf = io.StringIO()
enc.encode(item, buf)
out_text = buf.getvalue()
rpt_entries.append(out_text)

# placing all the values in the sources section
addition[rpt_src] = rpt_entries

exec_set_dir[out_text].append(addition)
if(nonce_cbor == b'\xf6' ):
curr_dic = exec_set_dir.get(rpt_src,[])
curr_dic.append(addition)
exec_set_dir[rpt_src] = curr_dic
else:
curr_dic = exec_set_dir.get(store_nonce,[])
curr_dic.append(addition)
exec_set_dir[store_nonce] = curr_dic
except Exception as err:
logger.error(err)

return list(exec_set_dir.values())
return exec_set_dir

4 changes: 2 additions & 2 deletions anms-core/anms/routes/system_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
{"name": "anms-ui", "url": "http://anms-ui:9030"},
{"name": "aricodec"},
{"name": "authnz", "url": "http://authnz/authn/login.html"},
{"name": "grafana", "url": "http://grafana:3000"},
{"name": "grafana", "tcp_port": 3000},
{"name": "grafana-image-renderer", "url": "http://grafana-image-renderer:8081"},
{"name": "amp-manager", "url": "http://amp-manager:8089/nm/api/version"},
{"name": "mqtt-broker"},
Expand Down Expand Up @@ -78,7 +78,7 @@ def get_containers_status():
if "url" in container:
url = container['url']
try:
response = requests.get(url, timeout=timeout)
response = requests.get(url, timeout=timeout, allow_redirects=False)
if response.status_code == 200:
statuses[name] = "healthy"
else:
Expand Down
Loading
Loading