Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 72 additions & 69 deletions pycomm3/logix_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def __init__(
*args,
init_tags: bool = True,
init_program_tags: bool = True,
tag_namespace_filter: str = '',
**kwargs,
):
"""
Expand Down Expand Up @@ -149,6 +150,7 @@ def __init__(
self._init_args = {
"init_tags": init_tags,
"init_program_tags": init_program_tags,
"tag_namespace_filter": tag_namespace_filter
}

def __str__(self):
Expand All @@ -165,7 +167,7 @@ def open(self):
self._initialize_driver(**self._init_args)
return ret

def _initialize_driver(self, init_tags, init_program_tags):
def _initialize_driver(self, init_tags, init_program_tags, tag_namespace_filter=''):
self.__log.info("Initializing driver...")

target_identity = self._list_identity()
Expand All @@ -189,7 +191,7 @@ def _initialize_driver(self, init_tags, init_program_tags):
) # strip off backplane/0 segment, not used for these processors

if init_tags:
self.get_tag_list(program="*" if init_program_tags else None)
self.get_tag_list(tag_namespace_filter=tag_namespace_filter, program="*" if init_program_tags else None)

self.__log.info("Initialization complete.")

Expand Down Expand Up @@ -385,7 +387,7 @@ def set_plc_time(self, microseconds: Optional[int] = None) -> Tag:
)

@with_forward_open
def get_tag_list(self, program: str = None, cache: bool = True) -> List[dict]:
def get_tag_list(self, program: str = None, cache: bool = True, tag_namespace_filter: str = '') -> List[dict]:
"""
Reads the tag list from the controller and the definition for each tag. Definitions include tag name, tag type
(atomic vs struct), data type (including nested definitions for structs), external access, dimensions defined (0-3)
Expand Down Expand Up @@ -416,14 +418,14 @@ def get_tag_list(self, program: str = None, cache: bool = True) -> List[dict]:
self._info["programs"] = {}
self._info["tasks"] = {}
self._info["modules"] = {}

self.__log.info("Starting tag list upload...")
if program == "*":
tags = self._get_tag_list()
tags = self._get_tag_list(tag_namespace_filter=tag_namespace_filter)
for prog in self._info["programs"]:
tags += self._get_tag_list(prog)
tags += self._get_tag_list(prog, tag_namespace_filter=tag_namespace_filter)
else:
tags = self._get_tag_list(program)
tags = self._get_tag_list(program, tag_namespace_filter=tag_namespace_filter)

if cache:
self._tags = {tag["tag_name"]: tag for tag in tags}
Expand All @@ -433,11 +435,11 @@ def get_tag_list(self, program: str = None, cache: bool = True) -> List[dict]:
self.__log.info(f"Completed tag list upload. Uploaded {len(self._tags)} tags.")
return tags

def _get_tag_list(self, program=None):
def _get_tag_list(self, program=None, tag_namespace_filter=''):
self.__log.info(f'Beginning upload of {program or "controller"} tags...')
all_tags = self._get_instance_attribute_list_service(program)
self.__log.info(f'Completed upload of {program or "controller"} tags')
return self._isolate_user_tags(all_tags, program)
return self._isolate_user_tags(all_tags, program, tag_namespace_filter)

def _get_instance_attribute_list_service(self, program=None):
"""Step 1: Finding user-created controller scope tags in a Logix5000 controller
Expand Down Expand Up @@ -559,76 +561,77 @@ def _parse_instance_attribute_list(self, response, tag_list):
self.__log.warning("unknown status during _parse_instance_attribute_list")
return -1

def _isolate_user_tags(self, all_tags, program=None):
def _isolate_user_tags(self, all_tags, program=None, tag_namespace_filter=''):
try:
user_tags = []
self.__log.debug(f'Isolating user tags for {program or "controller"} ...')
for tag in all_tags:
io_tag = False
name = tag["tag_name"]
#breakpoint()
if name.startswith(tag_namespace_filter):
if name.startswith("Program:"):
prog_name = name.replace("Program:", "")
self._info["programs"][prog_name] = {
"instance_id": tag["instance_id"],
"routines": [],
}
continue

if name.startswith("Routine:"):
rtn_name = name.replace("Routine:", "")
_program = self._info["programs"].get(program)
if _program is None:
self.__log.error(f"Program {program} not defined in tag list")
else:
_program["routines"].append(rtn_name)
continue

if name.startswith("Task:"):
self._info["tasks"][name.replace("Task:", "")] = {
"instance_id": tag["instance_id"]
}
continue

# system tags that may interfere w/ finding I/O modules
if "Map:" in name or "Cxn:" in name:
continue

# I/O module tags
# Logix 5000 Controllers I/O and Tag Data, page 17 (1756-pm004_-en-p.pdf)
if any(x in name for x in (":I", ":O", ":C", ":S")):
io_tag = True
mod = name.split(":")
mod_name = mod[0]
if mod_name not in self._info["modules"]:
self._info["modules"][mod_name] = {"slots": {}}
if len(mod) == 3 and mod[1].isdigit():
mod_slot = int(mod[1])
if mod_slot not in self._info["modules"][mod_name]:
self._info["modules"][mod_name]["slots"][mod_slot] = {"types": []}
self._info["modules"][mod_name]["slots"][mod_slot]["types"].append(mod[2])
elif len(mod) == 2:
if "types" not in self._info["modules"][mod_name]:
self._info["modules"][mod_name]["types"] = []
self._info["modules"][mod_name]["types"].append(mod[1])
# Not sure if this branch will ever be hit, but added to see if above branches may need additional work
else:
if "__UNKNOWN__" not in self._info["modules"][mod_name]:
self._info["modules"][mod_name]["__UNKNOWN__"] = []
self._info["modules"][mod_name]["__UNKNOWN__"].append(":".join(mod[1:]))

if name.startswith("Program:"):
prog_name = name.replace("Program:", "")
self._info["programs"][prog_name] = {
"instance_id": tag["instance_id"],
"routines": [],
}
continue

if name.startswith("Routine:"):
rtn_name = name.replace("Routine:", "")
_program = self._info["programs"].get(program)
if _program is None:
self.__log.error(f"Program {program} not defined in tag list")
else:
_program["routines"].append(rtn_name)
continue

if name.startswith("Task:"):
self._info["tasks"][name.replace("Task:", "")] = {
"instance_id": tag["instance_id"]
}
continue

# system tags that may interfere w/ finding I/O modules
if "Map:" in name or "Cxn:" in name:
continue

# I/O module tags
# Logix 5000 Controllers I/O and Tag Data, page 17 (1756-pm004_-en-p.pdf)
if any(x in name for x in (":I", ":O", ":C", ":S")):
io_tag = True
mod = name.split(":")
mod_name = mod[0]
if mod_name not in self._info["modules"]:
self._info["modules"][mod_name] = {"slots": {}}
if len(mod) == 3 and mod[1].isdigit():
mod_slot = int(mod[1])
if mod_slot not in self._info["modules"][mod_name]:
self._info["modules"][mod_name]["slots"][mod_slot] = {"types": []}
self._info["modules"][mod_name]["slots"][mod_slot]["types"].append(mod[2])
elif len(mod) == 2:
if "types" not in self._info["modules"][mod_name]:
self._info["modules"][mod_name]["types"] = []
self._info["modules"][mod_name]["types"].append(mod[1])
# Not sure if this branch will ever be hit, but added to see if above branches may need additional work
else:
if "__UNKNOWN__" not in self._info["modules"][mod_name]:
self._info["modules"][mod_name]["__UNKNOWN__"] = []
self._info["modules"][mod_name]["__UNKNOWN__"].append(":".join(mod[1:]))

# other system or junk tags
if (not io_tag and ":" in name) or name.startswith("__"):
continue
if tag["symbol_type"] & 0b0001_0000_0000_0000:
continue
# other system or junk tags
if (not io_tag and ":" in name) or name.startswith("__"):
continue
if tag["symbol_type"] & 0b0001_0000_0000_0000:
continue

if program is not None:
name = f"Program:{program}.{name}"
if program is not None:
name = f"Program:{program}.{name}"

self._cache["tag_name:id"][name] = tag["instance_id"]
self._cache["tag_name:id"][name] = tag["instance_id"]

user_tags.append(self._create_tag(name, tag))
user_tags.append(self._create_tag(name, tag))

self.__log.debug(f'Finished isolating tags for {program or "controller"}')
return user_tags
Expand Down