1 | """Helper functions for working with version control systems.""" |
||
2 | import logging |
||
3 | import os |
||
4 | import subprocess # nosec |
||
5 | from pathlib import Path |
||
6 | from shutil import which |
||
7 | from typing import Optional |
||
8 | |||
9 | from cookiecutter.exceptions import ( |
||
10 | RepositoryCloneFailed, |
||
11 | RepositoryNotFound, |
||
12 | UnknownRepoType, |
||
13 | VCSNotInstalled, |
||
14 | ) |
||
15 | from cookiecutter.utils import make_sure_path_exists, prompt_and_delete |
||
16 | |||
17 | logger = logging.getLogger(__name__) |
||
18 | |||
19 | |||
20 | BRANCH_ERRORS = [ |
||
21 | 'error: pathspec', |
||
22 | 'unknown revision', |
||
23 | ] |
||
24 | |||
25 | |||
26 | def identify_repo(repo_url): |
||
27 | """Determine if `repo_url` should be treated as a URL to a git or hg repo. |
||
28 | |||
29 | Repos can be identified by prepending "hg+" or "git+" to the repo URL. |
||
30 | |||
31 | :param repo_url: Repo URL of unknown type. |
||
32 | :returns: ('git', repo_url), ('hg', repo_url), or None. |
||
33 | """ |
||
34 | repo_url_values = repo_url.split('+') |
||
35 | if len(repo_url_values) == 2: |
||
36 | repo_type = repo_url_values[0] |
||
37 | if repo_type in ["git", "hg"]: |
||
38 | return repo_type, repo_url_values[1] |
||
39 | else: |
||
40 | raise UnknownRepoType |
||
41 | else: |
||
42 | if 'git' in repo_url: |
||
43 | return 'git', repo_url |
||
44 | elif 'bitbucket' in repo_url: |
||
45 | return 'hg', repo_url |
||
46 | else: |
||
47 | raise UnknownRepoType |
||
48 | |||
49 | |||
50 | def is_vcs_installed(repo_type): |
||
51 | """ |
||
52 | Check if the version control system for a repo type is installed. |
||
53 | |||
54 | :param repo_type: |
||
55 | """ |
||
56 | return bool(which(repo_type)) |
||
57 | |||
58 | |||
59 | def clone( |
||
60 | repo_url: str, |
||
61 | checkout: Optional[str] = None, |
||
62 | clone_to_dir: "os.PathLike[str]" = ".", |
||
63 | no_input: bool = False, |
||
64 | ): |
||
65 | """Clone a repo to the current directory. |
||
66 | |||
67 | :param repo_url: Repo URL of unknown type. |
||
68 | :param checkout: The branch, tag or commit ID to checkout after clone. |
||
69 | :param clone_to_dir: The directory to clone to. |
||
70 | Defaults to the current directory. |
||
71 | :param no_input: Do not prompt for user input and eventually force a refresh of |
||
72 | cached resources. |
||
73 | :returns: str with path to the new directory of the repository. |
||
74 | """ |
||
75 | # Ensure that clone_to_dir exists |
||
76 | clone_to_dir = Path(clone_to_dir).expanduser() |
||
77 | make_sure_path_exists(clone_to_dir) |
||
78 | |||
79 | # identify the repo_type |
||
80 | repo_type, repo_url = identify_repo(repo_url) |
||
81 | |||
82 | # check that the appropriate VCS for the repo_type is installed |
||
83 | if not is_vcs_installed(repo_type): |
||
84 | msg = f"'{repo_type}' is not installed." |
||
85 | raise VCSNotInstalled(msg) |
||
86 | |||
87 | repo_url = repo_url.rstrip('/') |
||
88 | repo_name = os.path.split(repo_url)[1] |
||
89 | if repo_type == 'git': |
||
90 | repo_name = repo_name.split(':')[-1].rsplit('.git')[0] |
||
91 | repo_dir = os.path.normpath(os.path.join(clone_to_dir, repo_name)) |
||
92 | if repo_type == 'hg': |
||
93 | repo_dir = os.path.normpath(os.path.join(clone_to_dir, repo_name)) |
||
94 | logger.debug(f'repo_dir is {repo_dir}') |
||
95 | |||
96 | if os.path.isdir(repo_dir): |
||
0 ignored issues
–
show
introduced
by
Loading history...
|
|||
97 | clone = prompt_and_delete(repo_dir, no_input=no_input) |
||
98 | else: |
||
99 | clone = True |
||
100 | |||
101 | if clone: |
||
102 | try: |
||
103 | subprocess.check_output( # nosec |
||
104 | [repo_type, 'clone', repo_url], |
||
105 | cwd=clone_to_dir, |
||
106 | stderr=subprocess.STDOUT, |
||
107 | ) |
||
108 | if checkout is not None: |
||
109 | checkout_params = [checkout] |
||
110 | # Avoid Mercurial "--config" and "--debugger" injection vulnerability |
||
111 | if repo_type == "hg": |
||
112 | checkout_params.insert(0, "--") |
||
113 | subprocess.check_output( # nosec |
||
114 | [repo_type, 'checkout', *checkout_params], |
||
115 | cwd=repo_dir, |
||
116 | stderr=subprocess.STDOUT, |
||
117 | ) |
||
118 | except subprocess.CalledProcessError as clone_error: |
||
119 | output = clone_error.output.decode('utf-8') |
||
120 | if 'not found' in output.lower(): |
||
121 | raise RepositoryNotFound( |
||
122 | f'The repository {repo_url} could not be found, ' |
||
123 | 'have you made a typo?' |
||
124 | ) from clone_error |
||
125 | if any(error in output for error in BRANCH_ERRORS): |
||
126 | raise RepositoryCloneFailed( |
||
127 | f'The {checkout} branch of repository ' |
||
128 | f'{repo_url} could not found, have you made a typo?' |
||
129 | ) from clone_error |
||
130 | logger.error('git clone failed with error: %s', output) |
||
131 | raise |
||
132 | |||
133 | return repo_dir |
||
134 |