Skip to content

remove config extract magic #362

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 29 additions & 31 deletions vcspull/cli/sync.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import logging
import os
import sys
from copy import deepcopy

import click
import click.shell_completion

from libvcs.shortcuts import create_project_from_pip_url
from libvcs.projects.base import BaseProject
from libvcs.projects.constants import DEFAULT_VCS_CLASS_MAP

from ..config import filter_repos, find_config_files, load_configs

Expand All @@ -22,16 +23,20 @@ def get_repo_completions(ctx: click.core.Context, args, incomplete):
repo_terms = [incomplete]

for repo_term in repo_terms:
dir, vcs_url, name = None, None, None
repo_dir, name = None, None
if any(repo_term.startswith(n) for n in ["./", "/", "~", "$HOME"]):
dir = repo_term
elif any(repo_term.startswith(n) for n in ["http", "git", "svn", "hg"]):
vcs_url = repo_term
repo_dir = repo_term
else:
name = repo_term

# collect the repos from the config files
found_repos.extend(filter_repos(configs, dir=dir, vcs_url=vcs_url, name=name))
found_repos.extend(
filter_repos(
configs,
filter_repo_dir=repo_dir,
filter_name=name,
)
)
if len(found_repos) == 0:
found_repos = configs

Expand All @@ -46,10 +51,6 @@ def get_config_file_completions(ctx, args, incomplete):
]


def clamp(n, _min, _max):
return max(_min, min(n, _max))


@click.command(name="sync")
@click.argument(
"repo_terms", type=click.STRING, nargs=-1, shell_complete=get_repo_completions
Expand All @@ -67,40 +68,37 @@ def sync(repo_terms, config):
configs = load_configs([config])
else:
configs = load_configs(find_config_files(include_home=True))
found_repos = []

found_repos = {}

if repo_terms:
for repo_term in repo_terms:
dir, vcs_url, name = None, None, None
repo_dir, name = None, None

if any(repo_term.startswith(n) for n in ["./", "/", "~", "$HOME"]):
dir = repo_term
elif any(repo_term.startswith(n) for n in ["http", "git", "svn", "hg"]):
vcs_url = repo_term
repo_dir = repo_term
else:
name = repo_term

# collect the repos from the config files
found_repos.extend(
filter_repos(configs, dir=dir, vcs_url=vcs_url, name=name)
found_repos |= filter_repos(
configs,
filter_repo_dir=repo_dir,
filter_name=name,
)
else:
found_repos = configs

list(map(update_repo, found_repos))
for path, repos in found_repos.items():
for name, repo in repos.items():
r: BaseProject = DEFAULT_VCS_CLASS_MAP[repo["vcs"]](
repo_dir=os.path.join(path, name),
options=repo["options"],
progress_callback=progress_cb,
)
r.update_repo(set_remotes=True)


def progress_cb(output, timestamp):
sys.stdout.write(output)
sys.stdout.flush()


def update_repo(repo_dict):
repo_dict = deepcopy(repo_dict)
if "pip_url" not in repo_dict:
repo_dict["pip_url"] = repo_dict.pop("url")
repo_dict["progress_callback"] = progress_cb

r = create_project_from_pip_url(**repo_dict) # Creates the repo object
r.update_repo(set_remotes=True) # Creates repo if not exists and fetches

return r
201 changes: 67 additions & 134 deletions vcspull/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from libvcs.projects.git import GitRemote

from . import exc
from .util import get_config_dir, update_dict
from .util import get_config_dir

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -45,75 +45,7 @@ def expand_dir(
return _dir


def extract_repos(config: dict, cwd=pathlib.Path.cwd()) -> list[dict]:
"""Return expanded configuration.

end-user configuration permit inline configuration shortcuts, expand to
identical format for parsing.

Parameters
----------
config : dict
the repo config in :py:class:`dict` format.
cwd : pathlib.Path
current working dir (for deciphering relative paths)

Returns
-------
list : List of normalized repository information
"""
configs = []
for directory, repos in config.items():
for repo, repo_data in repos.items():

conf = {}

"""
repo_name: http://myrepo.com/repo.git

to

repo_name: { url: 'http://myrepo.com/repo.git' }

also assures the repo is a :py:class:`dict`.
"""

if isinstance(repo_data, str):
conf["url"] = repo_data
else:
conf = update_dict(conf, repo_data)

if "repo" in conf:
if "url" not in conf:
conf["url"] = conf.pop("repo")
else:
conf.pop("repo", None)

if "name" not in conf:
conf["name"] = repo
if "parent_dir" not in conf:
conf["parent_dir"] = expand_dir(directory, cwd=cwd)

# repo_dir -> dir in libvcs 0.12.0b25
if "repo_dir" in conf and "dir" not in conf:
conf["dir"] = conf.pop("repo_dir")

if "dir" not in conf:
conf["dir"] = expand_dir(conf["parent_dir"] / conf["name"], cwd)

if "remotes" in conf:
for remote_name, url in conf["remotes"].items():
conf["remotes"][remote_name] = GitRemote(
name=remote_name, fetch_url=url, push_url=url
)
configs.append(conf)

return configs


def find_home_config_files(
filetype: list[str] = ["json", "yaml"]
) -> list[pathlib.Path]:
def find_home_config_files(filetype=["json", "yaml"]):
"""Return configs of ``.vcspull.{yaml,json}`` in user's home directory."""
configs = []

Expand Down Expand Up @@ -195,88 +127,96 @@ def find_config_files(
return configs


def load_configs(files: list[Union[str, pathlib.Path]], cwd=pathlib.Path.cwd()):
def load_configs(files):
"""Return repos from a list of files.

Parameters
----------
files : list
paths to config file
cwd : pathlib.Path
current path (pass down for :func:`extract_repos`

Returns
-------
list of dict :
expanded config dict item
config dict item

Todo
----
Validate scheme, check for duplicate destinations, VCS urls
"""
repos = []
for file in files:
if isinstance(file, str):
file = pathlib.Path(file)
ext = file.suffix.lstrip(".")
conf = kaptan.Kaptan(handler=ext).import_config(str(file))
newrepos = extract_repos(conf.export("dict"), cwd=cwd)

if not repos:
repos.extend(newrepos)
continue
repos = {}
for f in files:
_, ext = os.path.splitext(f)
conf = kaptan.Kaptan(handler=ext.lstrip(".")).import_config(f).export("dict")

newrepos = {}

for path, repo in conf.items():
newrepos[expand_dir(path)] = repo

dupes = detect_duplicate_repos(repos, newrepos)

if dupes:
msg = ("repos with same path + different VCS detected!", dupes)
msg = ("repos for the same parent_dir and repo_name detected!", dupes)
raise exc.VCSPullException(msg)
repos.extend(newrepos)

repos |= newrepos

return repos


def detect_duplicate_repos(repos1: list[dict], repos2: list[dict]):
"""Return duplicate repos dict if repo_dir same and vcs different.
def detect_duplicate_repos(config1, config2):
"""Return duplicate repos dict if repo_dir is the same

Parameters
----------
repos1 : dict
list of repo expanded dicts
config1 : dict
config dict

repos2 : dict
list of repo expanded dicts
config2 : dict
config dict

Returns
-------
list of dict, or None
Duplicate repos
"""
if not config1:
return None

dupes = []
path_dupe_repos = []

curpaths = [r["dir"] for r in repos1]
newpaths = [r["dir"] for r in repos2]
path_duplicates = list(set(curpaths).intersection(newpaths))
for parent_path, repos in config2.items():
if parent_path in config1:
for name, repo in repos.items():
if name in config1[parent_path]:
dupes += (repo, config1[parent_path][name])

if not path_duplicates:
return None
return dupes
Comment on lines +189 to +195
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the area I used in #373 (but changed a lot)


path_dupe_repos.extend(
[r for r in repos2 if any(r["dir"] == p for p in path_duplicates)]
)

if not path_dupe_repos:
return None
def get_repo_dirs(config):
"""return a dict of repo paths with their corresponding repos for each repo
in the config list.

for n in path_dupe_repos:
currepo = next((r for r in repos1 if r["dir"] == n["dir"]), None)
if n["url"] != currepo["url"]:
dupes += (n, currepo)
return dupes
Parameters
----------
config: dict
list of repos

Returns
-------
dict
"""
path_repos = {}
for parent_dir, repos in config.items():
for name, repo in repos.items():
path_repos[os.path.join(parent_dir, name)] = repo

return path_repos

def in_dir(config_dir=None, extensions: list[str] = [".yml", ".yaml", ".json"]):

def in_dir(config_dir, extensions=[".yml", ".yaml", ".json"]):
"""Return a list of configs in ``config_dir``.

Parameters
Expand All @@ -301,46 +241,39 @@ def in_dir(config_dir=None, extensions: list[str] = [".yml", ".yaml", ".json"]):
return configs


def filter_repos(
config: dict,
dir: Union[pathlib.Path, None] = None,
vcs_url: Union[str, None] = None,
name: Union[str, None] = None,
):
"""Return a :py:obj:`list` list of repos from (expanded) config file.
def filter_repos(config, filter_repo_dir=None, filter_name=None):
"""Return a :py:obj:`list` list of repos from config file.

dir, vcs_url and name all support fnmatch.

Parameters
----------
config : dict
the expanded repo config in :py:class:`dict` format.
dir : str, Optional
config : dist
the repo config in :py:class:`dict` format.
filter_repo_dir : str, Optional
directory of checkout location, fnmatch pattern supported
vcs_url : str, Optional
url of vcs remote, fn match pattern supported
name : str, Optional
filter_name : str, Optional
project name, fnmatch pattern supported

Returns
-------
list :
Repos
"""
repo_list = []
matched_repos = {}

if dir:
repo_list.extend([r for r in config if fnmatch.fnmatch(r["parent_dir"], dir)])

if vcs_url:
repo_list.extend(
r for r in config if fnmatch.fnmatch(r.get("url", r.get("repo")), vcs_url)
)
if filter_repo_dir:
for path, repos in config.items():
if fnmatch.fnmatch(path, filter_repo_dir):
matched_repos[filter_repo_dir] = repos

if name:
repo_list.extend([r for r in config if fnmatch.fnmatch(r.get("name"), name)])
if filter_name:
for path, repos in config.items():
for name, repo in repos.items():
if fnmatch.fnmatch(name, filter_name):
matched_repos[path] = {filter_name: repo}

return repo_list
return matched_repos


def is_config_file(filename: str, extensions: list[str] = [".yml", ".yaml", ".json"]):
Expand Down