Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added package and version to plugin description #134

Merged
merged 1 commit into from
May 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 36 additions & 16 deletions streamflow/ext/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,29 +52,49 @@ def _get_type_repr(obj: MutableMapping[str, Any]) -> str | None:
def list_extensions():
if plugins := entry_points(group=PLUGIN_ENTRY_POINT):
plugin_objs = []
max_sizes = [0, 0]
max_sizes = {
"name": 0,
"package": 0,
"version": 0,
"class": 0,
}
for plugin in plugins:
plugin_class = (plugin.load())()
if isinstance(plugin_class, StreamFlowPlugin):
plugin_objs.append(
{
"name": plugin.name,
"package": plugin.dist.name,
"version": plugin.dist.version,
"class": get_class_fullname(type(plugin_class)),
}
)
max_sizes[0] = max(max_sizes[0], len(plugin_objs[-1]["name"]))
max_sizes[1] = max(max_sizes[1], len(plugin_objs[-1]["class"]))
for k in max_sizes:
max_sizes[k] = max(max_sizes[k], len(plugin_objs[-1][k]))
format_string = (
"{:<"
+ str(max(max_sizes[0] + 2, 6))
+ str(max(max_sizes["name"] + 2, 6))
+ "}"
+ "{:<"
+ str(max(max_sizes[1], 10))
+ str(max(max_sizes["package"] + 2, 9))
+ "}"
+ "{:<"
+ str(max(max_sizes["version"] + 2, 9))
+ "}"
+ "{:<"
+ str(max(max_sizes["class"], 10))
+ "}"
)
print(format_string.format("NAME", "CLASS_NAME"))
print(format_string.format("NAME", "PACKAGE", "VERSION", "CLASS_NAME"))
for plugin_obj in plugin_objs:
print(format_string.format(plugin_obj["name"], plugin_obj["class"]))
print(
format_string.format(
plugin_obj["name"],
plugin_obj["package"],
plugin_obj["version"],
plugin_obj["class"],
)
)
else:
print("No StreamFlow plugins detected.", file=sys.stderr)

Expand Down Expand Up @@ -104,6 +124,8 @@ def show_extension(
if isinstance(plugin_class, StreamFlowPlugin):
plugin_class.register()
print(f"NAME: {plugin.name}")
print(f"PACKAGE: {plugin.dist.name}")
print(f"VERSION: {plugin.dist.version}")
print(f"CLASS_NAME: {get_class_fullname(type(plugin_class))}\n")
plugin_classes = plugin_class.classes_
if type_ is not None:
Expand All @@ -129,7 +151,7 @@ def show_extension(
print("It does not provide any StreamFlow extension")
if len(plugin_classes) > 0:
ext_objs = {}
max_sizes = [0, 0]
max_sizes = {"name": 0, "class": 0}
for extension_point, items in plugin_classes.items():
for item in items:
ext_objs.setdefault(extension_point, []).append(
Expand All @@ -138,12 +160,10 @@ def show_extension(
"class": get_class_fullname(item["class"]),
}
)
max_sizes[0] = max(
max_sizes[0], len(ext_objs[extension_point][-1]["name"])
)
max_sizes[1] = max(
max_sizes[1], len(ext_objs[extension_point][-1]["class"])
)
for k in max_sizes:
max_sizes[k] = max(
max_sizes[k], len(ext_objs[extension_point][-1][k])
)
if show_schema:
entity_schema = item["class"].get_schema()
with open(entity_schema) as f:
Expand All @@ -154,10 +174,10 @@ def show_extension(
)
format_string = (
"{:<"
+ str(max(max_sizes[0] + 2, 6))
+ str(max(max_sizes["name"] + 2, 6))
+ "}"
+ "{:<"
+ str(max(max_sizes[1] + 2, 10))
+ str(max(max_sizes["class"] + 2, 10))
+ "}"
)
for extension_point, ext_obj in ext_objs.items():
Expand Down
6 changes: 1 addition & 5 deletions streamflow/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from streamflow.parser import parser
from streamflow.persistence import database_classes
from streamflow.persistence.loading_context import DefaultDatabaseLoadingContext
from streamflow.persistence.sqlite import DEFAULT_SQLITE_CONNECTION
from streamflow.provenance import prov_classes
from streamflow.recovery import checkpoint_manager_classes, failure_manager_classes
from streamflow.scheduling import scheduler_classes
Expand Down Expand Up @@ -204,10 +203,7 @@ def build_context(config: MutableMapping[str, Any]) -> StreamFlowContext:
config,
database_classes,
"database",
{
"context": context,
"connection": DEFAULT_SQLITE_CONNECTION,
},
{"context": context},
)
context.data_manager = _get_instance_from_config(
config, data_manager_classes, "dataManager", {"context": context}
Expand Down
18 changes: 18 additions & 0 deletions streamflow/persistence/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import sys
from abc import ABC

from cachetools import Cache, LRUCache

from streamflow.core.context import StreamFlowContext
from streamflow.core.persistence import Database


class CachedDatabase(Database, ABC):
def __init__(self, context: StreamFlowContext):
super().__init__(context)
self.deployment_cache: Cache = LRUCache(maxsize=sys.maxsize)
self.port_cache: Cache = LRUCache(maxsize=sys.maxsize)
self.step_cache: Cache = LRUCache(maxsize=sys.maxsize)
self.target_cache: Cache = LRUCache(maxsize=sys.maxsize)
self.token_cache: Cache = LRUCache(maxsize=sys.maxsize)
self.workflow_cache: Cache = LRUCache(maxsize=sys.maxsize)
24 changes: 8 additions & 16 deletions streamflow/persistence/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,26 @@

import asyncio
import os
import sys
from abc import ABC
from typing import Any, MutableMapping, MutableSequence

import aiosqlite
import pkg_resources
from cachetools import Cache, LRUCache

from streamflow.core import utils
from streamflow.core.asyncache import cachedmethod
from streamflow.core.context import StreamFlowContext
from streamflow.core.deployment import Target
from streamflow.core.persistence import Database, DependencyType
from streamflow.core.persistence import DependencyType
from streamflow.core.utils import get_date_from_ns
from streamflow.core.workflow import Port, Status, Step, Token
from streamflow.persistence.base import CachedDatabase
from streamflow.version import VERSION

DEFAULT_SQLITE_CONNECTION = os.path.join(
os.path.expanduser("~"), ".streamflow", VERSION, "sqlite.db"
)


class CachedDatabase(Database, ABC):
def __init__(self, context: StreamFlowContext):
super().__init__(context)
self.deployment_cache: Cache = LRUCache(maxsize=sys.maxsize)
self.port_cache: Cache = LRUCache(maxsize=sys.maxsize)
self.step_cache: Cache = LRUCache(maxsize=sys.maxsize)
self.target_cache: Cache = LRUCache(maxsize=sys.maxsize)
self.token_cache: Cache = LRUCache(maxsize=sys.maxsize)
self.workflow_cache: Cache = LRUCache(maxsize=sys.maxsize)


class SqliteConnection:
def __init__(self, connection: str, timeout: int, init_db: bool):
self.connection: str = connection
Expand Down Expand Up @@ -69,7 +56,12 @@ async def close(self):


class SqliteDatabase(CachedDatabase):
def __init__(self, context: StreamFlowContext, connection: str, timeout: int = 20):
def __init__(
self,
context: StreamFlowContext,
connection: str = DEFAULT_SQLITE_CONNECTION,
timeout: int = 20,
):
super().__init__(context)
# Open connection to database
if connection != ":memory:":
Expand Down
16 changes: 10 additions & 6 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,25 @@
import platform
import tempfile
from asyncio.locks import Lock
from collections.abc import Iterable
from typing import Collection

import pkg_resources
import pytest
import pytest_asyncio
from jinja2 import Template

from streamflow.core import utils
from streamflow.main import build_context
from streamflow.core.config import Config
from streamflow.core.deployment import Target
from streamflow.core.context import StreamFlowContext
from streamflow.core.deployment import (
DeploymentConfig,
LOCAL_LOCATION,
Location,
Target,
)
from streamflow.core.persistence import PersistableEntity
from streamflow.core.workflow import Step, Port, Token, Workflow
from streamflow.core.deployment import DeploymentConfig, LOCAL_LOCATION, Location
from streamflow.core.workflow import Port, Step, Token, Workflow
from streamflow.main import build_context
from streamflow.persistence.loading_context import DefaultDatabaseLoadingContext


Expand Down Expand Up @@ -151,7 +155,7 @@ def are_equals(elem1, elem2, obj_compared=None):
if is_primitive_type(elem1):
return elem1 == elem2

if isinstance(elem1, Iterable) and not isinstance(elem1, dict):
if isinstance(elem1, Collection) and not isinstance(elem1, dict):
if len(elem1) != len(elem2):
return False
for e1, e2 in zip(elem1, elem2):
Expand Down