Skip to content

Commit

Permalink
Merge upstream develop.
Browse files Browse the repository at this point in the history
  • Loading branch information
rlutes committed Oct 11, 2024
2 parents 51eacea + 87650e1 commit ba5d6b1
Show file tree
Hide file tree
Showing 15 changed files with 136 additions and 94 deletions.
4 changes: 2 additions & 2 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ def __getattr__(cls, name):
author = 'The VOLTTRON Community'

# The short X.Y version
version = '9.0'
version = '9.0.1'
# The full version, including alpha/beta/rc tags
release = '9.0'
release = '9.0.1'

# -- General configuration ---------------------------------------------------

Expand Down
5 changes: 1 addition & 4 deletions services/core/ActuatorAgent/actuator/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1382,11 +1382,8 @@ def _request_new_schedule(self, sender, task_id, priority, requests, publish_res
'data': {'agentID': sender,
'taskID': task_id}})

# If we are successful we do something else with the real result data
data = result.data if not result.success else {}

results = {'result': success,
'data': data,
'data': result.data,
'info': result.info_string}

if publish_result:
Expand Down
13 changes: 8 additions & 5 deletions services/core/ActuatorAgent/actuator/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
#
# ===----------------------------------------------------------------------===
# }}}


import bisect
import logging
from pickle import dumps, loads

from base64 import b64encode
from collections import defaultdict, namedtuple
from copy import deepcopy
from datetime import timedelta
from pickle import dumps, loads

from volttron.platform.agent import utils

Expand Down Expand Up @@ -340,7 +340,7 @@ def save_state(self, now):

try:
self._cleanup(now)
self.save_state_callback(dumps(self.tasks))
self.save_state_callback(b64encode(dumps(self.tasks)).decode("utf-8"))
except Exception:
_log.error('Failed to save scheduler state!')

Expand Down Expand Up @@ -411,7 +411,10 @@ def request_slots(self, agent_id, id_, requests, priority, now=None):

self.save_state(now)

return RequestResult(True, preempted_tasks, '')
if preempted_tasks:
return RequestResult(True, list(preempted_tasks), 'TASK_WERE_PREEMPTED')
else:
return RequestResult(True, {}, '')

def cancel_task(self, agent_id, task_id, now):
if task_id not in self.tasks:
Expand Down
5 changes: 4 additions & 1 deletion services/core/PlatformDriverAgent/platform_driver/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,10 @@ def heart_beat(self):
"""
_log.debug("sending heartbeat")
for device in self.instances.values():
device.heart_beat()
try:
device.heart_beat()
except (Exception, gevent.Timeout) as e:
_log.warning(f'Failed to set heart_beat point on device: {device.device_name} -- {e}.')

@RPC.export
def revert_point(self, path, point_name, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
# ===----------------------------------------------------------------------===
# }}}


import gevent
import logging
from datetime import datetime, timedelta

Expand Down Expand Up @@ -97,27 +97,36 @@ def ping_target(self):
self.vip.rpc.call(self.proxy_address, 'ping_device', self.target_address, self.device_id).get(timeout=self.timeout)
pinged = True
except errors.Unreachable:
_log.warning("Unable to reach BACnet proxy.")

except errors.VIPError:
_log.warning("Error trying to ping device.")
_log.warning(f"Unable to reach BACnet proxy at: {self.proxy_address}.")
except (Exception, gevent.Timeout) as e:
_log.warning(f"Error trying to ping device with device_id '{self.device_id}' at {self.target_address}"
f"through proxy {self.proxy_address}: {e}")

self.scheduled_ping = None

# Schedule retry.
if not pinged:
self.schedule_ping()

def get_point(self, point_name, get_priority_array=False):
def get_point(self, point_name, on_property=None):
register = self.get_register_by_name(point_name)
property_name = "priorityArray" if get_priority_array else register.property
register_index = None if get_priority_array else register.index
result = self.vip.rpc.call(self.proxy_address, 'read_property',
self.target_address, register.object_type,
register.instance_number, property_name, register_index).get(timeout=self.timeout)
if on_property is None:
result = self.vip.rpc.call(self.proxy_address, 'read_property',
self.target_address, register.object_type,
register.instance_number, register.property, register.index).get(timeout=self.timeout)
else:
point_map = {}
point_map[register.point_name] = [register.object_type,
register.instance_number,
on_property,
register.index]
result = self.vip.rpc.call(self.proxy_address, 'read_properties',
self.target_address, point_map,
self.max_per_request, True).get(timeout=self.timeout)
result = list(result.values())[0]
return result

def set_point(self, point_name, value, priority=None):
def set_point(self, point_name, value, priority=None, on_property=None):
# TODO: support writing from an array.
register = self.get_register_by_name(point_name)
if register.read_only:
Expand All @@ -130,7 +139,7 @@ def set_point(self, point_name, value, priority=None):
args = [self.target_address, value,
register.object_type,
register.instance_number,
register.property,
on_property if on_property is not None else register.property,
priority if priority is not None else register.priority,
register.index]
result = self.vip.rpc.call(self.proxy_address, 'write_property', *args).get(timeout=self.timeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ activated environment:

::

pip install suds-jurko
pip install zeep

Alternatively requirements can be installed from requirements.txt using:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@
import logging
import abc
import sys

from . import service as cps
from . import async_service as async

from . import async_service as async_service
from .. import BaseInterface, BaseRegister, BasicRevert, DriverInterfaceError
from suds.sudsobject import asdict
#from suds.sudsobject import asdict
from zeep.helpers import serialize_object

_log = logging.getLogger(__name__)

Expand All @@ -54,7 +53,7 @@
point_name_mapping = {"Status.TimeStamp": "TimeStamp"}

service = {}
gevent.spawn(async.web_service)
gevent.spawn(async_service.web_service)


def recursive_asdict(d):
Expand All @@ -64,7 +63,7 @@ def recursive_asdict(d):
http://stackoverflow.com/questions/2412486/serializing-a-suds-object-in-python
"""
out = {}
for k, v in asdict(d).items():
for k, v in serialize_object(d, dict).items():
if hasattr(v, '__keylist__'):
out[k] = recursive_asdict(v)
elif isinstance(v, list):
Expand Down Expand Up @@ -139,6 +138,16 @@ def read_only_check(self):
raise IOError("Trying to write to a point configured read only: {0}".format(self.attribute_name))
return True

def get_last_non_none_value(self,lst):
"""
Depends on port number, the result could be a list with None value
get last non-None value as result
"""
for item in reversed(lst):
if item is not None:
return item
return None

def get_register(self, result, method, port_flag=True):
"""Gets correct register from API response.
Expand All @@ -151,9 +160,10 @@ def get_register(self, result, method, port_flag=True):
:return: Correct register value cast to appropriate python type. Returns None if there is an error.
"""
try:
value = getattr(result, self.attribute_name)(self.port)[0] \
_log.debug(f'In get_register, to get {self.attribute_name}, the port_flag is {port_flag}')
value = self.get_last_non_none_value(getattr(result, self.attribute_name)(self.port)) \
if port_flag \
else getattr(result, self.attribute_name)(None)[0]
else self.get_last_non_none_value(getattr(result, self.attribute_name)(None))
return self.sanitize_output(self.data_type, value)
except cps.CPAPIException as exception:
if exception._responseCode not in ['153']:
Expand Down Expand Up @@ -196,7 +206,7 @@ def __init__(self, read_only, point_name, attribute_name, units, data_type, stat
def value(self):
global service
method = service[self.username].getStations
result = async.CPRequest.request(method, self.timeout, stationID=self.station_id)
result = async_service.CPRequest.request(method, self.timeout, stationID=self.station_id)
result.wait()
return self.get_register(result.value, method)

Expand Down Expand Up @@ -237,7 +247,7 @@ def __init__(self, read_only, point_name, attribute_name, units, data_type, stat
def value(self):
global service
method = service[self.username].getLoad
result = async.CPRequest.request(method, self.timeout, stationID=self.station_id)
result = async_service.CPRequest.request(method, self.timeout, stationID=self.station_id)
result.wait()
return self.get_register(result.value, method)

Expand All @@ -263,7 +273,7 @@ def value(self, x):
kwargs = {'stationID': self.station_id}
if self.attribute_name == 'shedState' and not value:
method = service[self.username].clearShedState
result = async.CPRequest.request(method, 0, stationID=self.station_id)
result = async_service.CPRequest.request(method, 0, stationID=self.station_id)
elif self.attribute_name == 'shedState':
_log.error('shedState may only be written with value 0. If you want to shedLoad, write to '
'allowedLoad or percentShed')
Expand All @@ -273,7 +283,7 @@ def value(self, x):
kwargs[self.attribute_name] = value
if self.port:
kwargs['portNumber'] = self.port
result = async.CPRequest.request(method, 0, **kwargs)
result = async_service.CPRequest.request(method, 0, **kwargs)

result.wait()
if result.value.responseCode != "100":
Expand Down Expand Up @@ -322,7 +332,7 @@ def value(self):
if self.port:
kwargs['portNumber'] = self.port

result = async.CPRequest.request(method, self.timeout, **kwargs)
result = async_service.CPRequest.request(method, self.timeout, **kwargs)
result.wait()
return self.get_register(result.value, method, False)

Expand All @@ -348,7 +358,7 @@ def value(self, x):
if self.attribute_name == 'clearAlarms' and value:
kwargs = {'stationID': self.station_id}
method = service[self.username].clearAlarms
result = async.CPRequest.request(method, 0, **kwargs)
result = async_service.CPRequest.request(method, 0, **kwargs)

result.wait()
if result.value.responseCode not in ['100', '153']:
Expand Down Expand Up @@ -383,11 +393,12 @@ def __init__(self, read_only, point_name, attribute_name, units, data_type, stat
def value(self):
global service
method = service[self.username].getChargingSessionData
result = async.CPRequest.request(method, self.timeout, stationID=self.station_id)
result = async_service.CPRequest.request(method, self.timeout, stationID=self.station_id)
result.wait()

# Of Note, due to API limitations, port number is ignored for these calls
return self.get_register(result.value, method, False)
# NOTE: Change this port number for Chargingsession data.
return self.get_register(result.value, method)

@value.setter
def value(self, x):
Expand Down Expand Up @@ -418,7 +429,7 @@ def __init__(self, read_only, point_name, attribute_name, units, data_type, stat
def value(self):
global service
method = service[self.username].getStationStatus
result = async.CPRequest.request(method, self.timeout, self.station_id)
result = async_service.CPRequest.request(method, self.timeout, self.station_id)
result.wait()
return self.get_register(result.value, method)

Expand Down Expand Up @@ -455,7 +466,7 @@ def __init__(self, read_only, point_name, attribute_name, units, data_type, stat
def value(self):
global service
method = service[self.username].getStationRights
result = async.CPRequest.request(method, self.timeout, stationID=self.station_id)
result = async_service.CPRequest.request(method, self.timeout, stationID=self.station_id)
result.wait()

# Note: this does not go through get_register, as it is of a unique type, 'dictionary.'
Expand Down Expand Up @@ -558,7 +569,7 @@ def parse_config(self, config_dict, registry_config_str):
description=description,
port_number=port_num,
username=config_dict['username'],
timeout=config_dict['cacheExpiration']
timeout=config_dict.get('cacheExpiration',0)
)

self.insert_register(register)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,14 @@
import gevent.event
import gevent.queue
import logging
import suds
import zeep
from gevent import monkey
from .service import CPAPIException
from datetime import datetime, timedelta

monkey.patch_all()
_log = logging.getLogger(__name__)
SERVICE_WSDL_URL = "https://webservices.chargepoint.com/cp_api_5.0.wsdl"

SERVICE_WSDL_URL = "https://webservices.chargepoint.com/cp_api_5.1.wsdl"
# Queue for Web API requests and responses. It is managed by the long running
# web_service() greenlet.
web_service_queue = gevent.queue.Queue()
Expand Down Expand Up @@ -253,7 +252,7 @@ def web_service():
web_cache[item_key] = cache_item

if not client_set:
client_set.add(suds.client.Client(SERVICE_WSDL_URL))
client_set.add(zeep.Client(SERVICE_WSDL_URL))
client = client_set.pop()
gevent.spawn(web_call, item, client)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from . import service as cps
import suds
import zeep
import io

station_csv = {
Expand Down Expand Up @@ -176,5 +176,5 @@
else:
print("Some other error happened")

except suds.WebFault as a:
except zeep.exception.Fault as e:
print("Sorry, your API credentials are invalid. Please contact Chargepoint for assistance.")
Original file line number Diff line number Diff line change
@@ -1 +1 @@
suds-jurko==0.6
zeep==4.2.1
Loading

0 comments on commit ba5d6b1

Please sign in to comment.