init commit
This commit is contained in:
13
venv/lib64/python3.12/site-packages/pip/__init__.py
Normal file
13
venv/lib64/python3.12/site-packages/pip/__init__.py
Normal file
@@ -0,0 +1,13 @@
|
||||
from typing import List, Optional
|
||||
|
||||
__version__ = "24.0"
|
||||
|
||||
|
||||
def main(args: Optional[List[str]] = None) -> int:
|
||||
"""This is an internal API only meant for use by pip's own console scripts.
|
||||
|
||||
For additional details, see https://github.com/pypa/pip/issues/7498.
|
||||
"""
|
||||
from pip._internal.utils.entrypoints import _wrapper
|
||||
|
||||
return _wrapper(args)
|
||||
24
venv/lib64/python3.12/site-packages/pip/__main__.py
Normal file
24
venv/lib64/python3.12/site-packages/pip/__main__.py
Normal file
@@ -0,0 +1,24 @@
|
||||
import os
|
||||
import sys
|
||||
|
||||
# Remove '' and current working directory from the first entry
|
||||
# of sys.path, if present to avoid using current directory
|
||||
# in pip commands check, freeze, install, list and show,
|
||||
# when invoked as python -m pip <command>
|
||||
if sys.path[0] in ("", os.getcwd()):
|
||||
sys.path.pop(0)
|
||||
|
||||
# If we are running from a wheel, add the wheel to sys.path
|
||||
# This allows the usage python pip-*.whl/pip install pip-*.whl
|
||||
if __package__ == "":
|
||||
# __file__ is pip-*.whl/pip/__main__.py
|
||||
# first dirname call strips of '/__main__.py', second strips off '/pip'
|
||||
# Resulting path is the name of the wheel itself
|
||||
# Add that to sys.path so we can import pip
|
||||
path = os.path.dirname(os.path.dirname(__file__))
|
||||
sys.path.insert(0, path)
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pip._internal.cli.main import main as _main
|
||||
|
||||
sys.exit(_main())
|
||||
50
venv/lib64/python3.12/site-packages/pip/__pip-runner__.py
Normal file
50
venv/lib64/python3.12/site-packages/pip/__pip-runner__.py
Normal file
@@ -0,0 +1,50 @@
|
||||
"""Execute exactly this copy of pip, within a different environment.
|
||||
|
||||
This file is named as it is, to ensure that this module can't be imported via
|
||||
an import statement.
|
||||
"""
|
||||
|
||||
# /!\ This version compatibility check section must be Python 2 compatible. /!\
|
||||
|
||||
import sys
|
||||
|
||||
# Copied from setup.py
|
||||
PYTHON_REQUIRES = (3, 7)
|
||||
|
||||
|
||||
def version_str(version): # type: ignore
|
||||
return ".".join(str(v) for v in version)
|
||||
|
||||
|
||||
if sys.version_info[:2] < PYTHON_REQUIRES:
|
||||
raise SystemExit(
|
||||
"This version of pip does not support python {} (requires >={}).".format(
|
||||
version_str(sys.version_info[:2]), version_str(PYTHON_REQUIRES)
|
||||
)
|
||||
)
|
||||
|
||||
# From here on, we can use Python 3 features, but the syntax must remain
|
||||
# Python 2 compatible.
|
||||
|
||||
import runpy # noqa: E402
|
||||
from importlib.machinery import PathFinder # noqa: E402
|
||||
from os.path import dirname # noqa: E402
|
||||
|
||||
PIP_SOURCES_ROOT = dirname(dirname(__file__))
|
||||
|
||||
|
||||
class PipImportRedirectingFinder:
|
||||
@classmethod
|
||||
def find_spec(self, fullname, path=None, target=None): # type: ignore
|
||||
if fullname != "pip":
|
||||
return None
|
||||
|
||||
spec = PathFinder.find_spec(fullname, [PIP_SOURCES_ROOT], target)
|
||||
assert spec, (PIP_SOURCES_ROOT, fullname)
|
||||
return spec
|
||||
|
||||
|
||||
sys.meta_path.insert(0, PipImportRedirectingFinder())
|
||||
|
||||
assert __name__ == "__main__", "Cannot run __pip-runner__.py as a non-main module"
|
||||
runpy.run_module("pip", run_name="__main__", alter_sys=True)
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,18 @@
|
||||
from typing import List, Optional
|
||||
|
||||
from pip._internal.utils import _log
|
||||
|
||||
# init_logging() must be called before any call to logging.getLogger()
|
||||
# which happens at import of most modules.
|
||||
_log.init_logging()
|
||||
|
||||
|
||||
def main(args: (Optional[List[str]]) = None) -> int:
|
||||
"""This is preserved for old console scripts that may still be referencing
|
||||
it.
|
||||
|
||||
For additional details, see https://github.com/pypa/pip/issues/7498.
|
||||
"""
|
||||
from pip._internal.utils.entrypoints import _wrapper
|
||||
|
||||
return _wrapper(args)
|
||||
311
venv/lib64/python3.12/site-packages/pip/_internal/build_env.py
Normal file
311
venv/lib64/python3.12/site-packages/pip/_internal/build_env.py
Normal file
@@ -0,0 +1,311 @@
|
||||
"""Build Environment used for isolation during sdist building
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import pathlib
|
||||
import site
|
||||
import sys
|
||||
import textwrap
|
||||
from collections import OrderedDict
|
||||
from types import TracebackType
|
||||
from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple, Type, Union
|
||||
|
||||
from pip._vendor.certifi import where
|
||||
from pip._vendor.packaging.requirements import Requirement
|
||||
from pip._vendor.packaging.version import Version
|
||||
|
||||
from pip import __file__ as pip_location
|
||||
from pip._internal.cli.spinners import open_spinner
|
||||
from pip._internal.locations import get_platlib, get_purelib, get_scheme
|
||||
from pip._internal.metadata import get_default_environment, get_environment
|
||||
from pip._internal.utils.subprocess import call_subprocess
|
||||
from pip._internal.utils.temp_dir import TempDirectory, tempdir_kinds
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pip._internal.index.package_finder import PackageFinder
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _dedup(a: str, b: str) -> Union[Tuple[str], Tuple[str, str]]:
|
||||
return (a, b) if a != b else (a,)
|
||||
|
||||
|
||||
class _Prefix:
|
||||
def __init__(self, path: str) -> None:
|
||||
self.path = path
|
||||
self.setup = False
|
||||
scheme = get_scheme("", prefix=path)
|
||||
self.bin_dir = scheme.scripts
|
||||
self.lib_dirs = _dedup(scheme.purelib, scheme.platlib)
|
||||
|
||||
|
||||
def get_runnable_pip() -> str:
|
||||
"""Get a file to pass to a Python executable, to run the currently-running pip.
|
||||
|
||||
This is used to run a pip subprocess, for installing requirements into the build
|
||||
environment.
|
||||
"""
|
||||
source = pathlib.Path(pip_location).resolve().parent
|
||||
|
||||
if not source.is_dir():
|
||||
# This would happen if someone is using pip from inside a zip file. In that
|
||||
# case, we can use that directly.
|
||||
return str(source)
|
||||
|
||||
return os.fsdecode(source / "__pip-runner__.py")
|
||||
|
||||
|
||||
def _get_system_sitepackages() -> Set[str]:
|
||||
"""Get system site packages
|
||||
|
||||
Usually from site.getsitepackages,
|
||||
but fallback on `get_purelib()/get_platlib()` if unavailable
|
||||
(e.g. in a virtualenv created by virtualenv<20)
|
||||
|
||||
Returns normalized set of strings.
|
||||
"""
|
||||
if hasattr(site, "getsitepackages"):
|
||||
system_sites = site.getsitepackages()
|
||||
else:
|
||||
# virtualenv < 20 overwrites site.py without getsitepackages
|
||||
# fallback on get_purelib/get_platlib.
|
||||
# this is known to miss things, but shouldn't in the cases
|
||||
# where getsitepackages() has been removed (inside a virtualenv)
|
||||
system_sites = [get_purelib(), get_platlib()]
|
||||
return {os.path.normcase(path) for path in system_sites}
|
||||
|
||||
|
||||
class BuildEnvironment:
|
||||
"""Creates and manages an isolated environment to install build deps"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
temp_dir = TempDirectory(kind=tempdir_kinds.BUILD_ENV, globally_managed=True)
|
||||
|
||||
self._prefixes = OrderedDict(
|
||||
(name, _Prefix(os.path.join(temp_dir.path, name)))
|
||||
for name in ("normal", "overlay")
|
||||
)
|
||||
|
||||
self._bin_dirs: List[str] = []
|
||||
self._lib_dirs: List[str] = []
|
||||
for prefix in reversed(list(self._prefixes.values())):
|
||||
self._bin_dirs.append(prefix.bin_dir)
|
||||
self._lib_dirs.extend(prefix.lib_dirs)
|
||||
|
||||
# Customize site to:
|
||||
# - ensure .pth files are honored
|
||||
# - prevent access to system site packages
|
||||
system_sites = _get_system_sitepackages()
|
||||
|
||||
self._site_dir = os.path.join(temp_dir.path, "site")
|
||||
if not os.path.exists(self._site_dir):
|
||||
os.mkdir(self._site_dir)
|
||||
with open(
|
||||
os.path.join(self._site_dir, "sitecustomize.py"), "w", encoding="utf-8"
|
||||
) as fp:
|
||||
fp.write(
|
||||
textwrap.dedent(
|
||||
"""
|
||||
import os, site, sys
|
||||
|
||||
# First, drop system-sites related paths.
|
||||
original_sys_path = sys.path[:]
|
||||
known_paths = set()
|
||||
for path in {system_sites!r}:
|
||||
site.addsitedir(path, known_paths=known_paths)
|
||||
system_paths = set(
|
||||
os.path.normcase(path)
|
||||
for path in sys.path[len(original_sys_path):]
|
||||
)
|
||||
original_sys_path = [
|
||||
path for path in original_sys_path
|
||||
if os.path.normcase(path) not in system_paths
|
||||
]
|
||||
sys.path = original_sys_path
|
||||
|
||||
# Second, add lib directories.
|
||||
# ensuring .pth file are processed.
|
||||
for path in {lib_dirs!r}:
|
||||
assert not path in sys.path
|
||||
site.addsitedir(path)
|
||||
"""
|
||||
).format(system_sites=system_sites, lib_dirs=self._lib_dirs)
|
||||
)
|
||||
|
||||
def __enter__(self) -> None:
|
||||
self._save_env = {
|
||||
name: os.environ.get(name, None)
|
||||
for name in ("PATH", "PYTHONNOUSERSITE", "PYTHONPATH")
|
||||
}
|
||||
|
||||
path = self._bin_dirs[:]
|
||||
old_path = self._save_env["PATH"]
|
||||
if old_path:
|
||||
path.extend(old_path.split(os.pathsep))
|
||||
|
||||
pythonpath = [self._site_dir]
|
||||
|
||||
os.environ.update(
|
||||
{
|
||||
"PATH": os.pathsep.join(path),
|
||||
"PYTHONNOUSERSITE": "1",
|
||||
"PYTHONPATH": os.pathsep.join(pythonpath),
|
||||
}
|
||||
)
|
||||
|
||||
def __exit__(
|
||||
self,
|
||||
exc_type: Optional[Type[BaseException]],
|
||||
exc_val: Optional[BaseException],
|
||||
exc_tb: Optional[TracebackType],
|
||||
) -> None:
|
||||
for varname, old_value in self._save_env.items():
|
||||
if old_value is None:
|
||||
os.environ.pop(varname, None)
|
||||
else:
|
||||
os.environ[varname] = old_value
|
||||
|
||||
def check_requirements(
|
||||
self, reqs: Iterable[str]
|
||||
) -> Tuple[Set[Tuple[str, str]], Set[str]]:
|
||||
"""Return 2 sets:
|
||||
- conflicting requirements: set of (installed, wanted) reqs tuples
|
||||
- missing requirements: set of reqs
|
||||
"""
|
||||
missing = set()
|
||||
conflicting = set()
|
||||
if reqs:
|
||||
env = (
|
||||
get_environment(self._lib_dirs)
|
||||
if hasattr(self, "_lib_dirs")
|
||||
else get_default_environment()
|
||||
)
|
||||
for req_str in reqs:
|
||||
req = Requirement(req_str)
|
||||
# We're explicitly evaluating with an empty extra value, since build
|
||||
# environments are not provided any mechanism to select specific extras.
|
||||
if req.marker is not None and not req.marker.evaluate({"extra": ""}):
|
||||
continue
|
||||
dist = env.get_distribution(req.name)
|
||||
if not dist:
|
||||
missing.add(req_str)
|
||||
continue
|
||||
if isinstance(dist.version, Version):
|
||||
installed_req_str = f"{req.name}=={dist.version}"
|
||||
else:
|
||||
installed_req_str = f"{req.name}==={dist.version}"
|
||||
if not req.specifier.contains(dist.version, prereleases=True):
|
||||
conflicting.add((installed_req_str, req_str))
|
||||
# FIXME: Consider direct URL?
|
||||
return conflicting, missing
|
||||
|
||||
def install_requirements(
|
||||
self,
|
||||
finder: "PackageFinder",
|
||||
requirements: Iterable[str],
|
||||
prefix_as_string: str,
|
||||
*,
|
||||
kind: str,
|
||||
) -> None:
|
||||
prefix = self._prefixes[prefix_as_string]
|
||||
assert not prefix.setup
|
||||
prefix.setup = True
|
||||
if not requirements:
|
||||
return
|
||||
self._install_requirements(
|
||||
get_runnable_pip(),
|
||||
finder,
|
||||
requirements,
|
||||
prefix,
|
||||
kind=kind,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _install_requirements(
|
||||
pip_runnable: str,
|
||||
finder: "PackageFinder",
|
||||
requirements: Iterable[str],
|
||||
prefix: _Prefix,
|
||||
*,
|
||||
kind: str,
|
||||
) -> None:
|
||||
args: List[str] = [
|
||||
sys.executable,
|
||||
pip_runnable,
|
||||
"install",
|
||||
"--ignore-installed",
|
||||
"--no-user",
|
||||
"--prefix",
|
||||
prefix.path,
|
||||
"--no-warn-script-location",
|
||||
]
|
||||
if logger.getEffectiveLevel() <= logging.DEBUG:
|
||||
args.append("-v")
|
||||
for format_control in ("no_binary", "only_binary"):
|
||||
formats = getattr(finder.format_control, format_control)
|
||||
args.extend(
|
||||
(
|
||||
"--" + format_control.replace("_", "-"),
|
||||
",".join(sorted(formats or {":none:"})),
|
||||
)
|
||||
)
|
||||
|
||||
index_urls = finder.index_urls
|
||||
if index_urls:
|
||||
args.extend(["-i", index_urls[0]])
|
||||
for extra_index in index_urls[1:]:
|
||||
args.extend(["--extra-index-url", extra_index])
|
||||
else:
|
||||
args.append("--no-index")
|
||||
for link in finder.find_links:
|
||||
args.extend(["--find-links", link])
|
||||
|
||||
for host in finder.trusted_hosts:
|
||||
args.extend(["--trusted-host", host])
|
||||
if finder.allow_all_prereleases:
|
||||
args.append("--pre")
|
||||
if finder.prefer_binary:
|
||||
args.append("--prefer-binary")
|
||||
args.append("--")
|
||||
args.extend(requirements)
|
||||
extra_environ = {"_PIP_STANDALONE_CERT": where()}
|
||||
with open_spinner(f"Installing {kind}") as spinner:
|
||||
call_subprocess(
|
||||
args,
|
||||
command_desc=f"pip subprocess to install {kind}",
|
||||
spinner=spinner,
|
||||
extra_environ=extra_environ,
|
||||
)
|
||||
|
||||
|
||||
class NoOpBuildEnvironment(BuildEnvironment):
|
||||
"""A no-op drop-in replacement for BuildEnvironment"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
pass
|
||||
|
||||
def __enter__(self) -> None:
|
||||
pass
|
||||
|
||||
def __exit__(
|
||||
self,
|
||||
exc_type: Optional[Type[BaseException]],
|
||||
exc_val: Optional[BaseException],
|
||||
exc_tb: Optional[TracebackType],
|
||||
) -> None:
|
||||
pass
|
||||
|
||||
def cleanup(self) -> None:
|
||||
pass
|
||||
|
||||
def install_requirements(
|
||||
self,
|
||||
finder: "PackageFinder",
|
||||
requirements: Iterable[str],
|
||||
prefix_as_string: str,
|
||||
*,
|
||||
kind: str,
|
||||
) -> None:
|
||||
raise NotImplementedError()
|
||||
290
venv/lib64/python3.12/site-packages/pip/_internal/cache.py
Normal file
290
venv/lib64/python3.12/site-packages/pip/_internal/cache.py
Normal file
@@ -0,0 +1,290 @@
|
||||
"""Cache Management
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from pip._vendor.packaging.tags import Tag, interpreter_name, interpreter_version
|
||||
from pip._vendor.packaging.utils import canonicalize_name
|
||||
|
||||
from pip._internal.exceptions import InvalidWheelFilename
|
||||
from pip._internal.models.direct_url import DirectUrl
|
||||
from pip._internal.models.link import Link
|
||||
from pip._internal.models.wheel import Wheel
|
||||
from pip._internal.utils.temp_dir import TempDirectory, tempdir_kinds
|
||||
from pip._internal.utils.urls import path_to_url
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
ORIGIN_JSON_NAME = "origin.json"
|
||||
|
||||
|
||||
def _hash_dict(d: Dict[str, str]) -> str:
|
||||
"""Return a stable sha224 of a dictionary."""
|
||||
s = json.dumps(d, sort_keys=True, separators=(",", ":"), ensure_ascii=True)
|
||||
return hashlib.sha224(s.encode("ascii")).hexdigest()
|
||||
|
||||
|
||||
class Cache:
|
||||
"""An abstract class - provides cache directories for data from links
|
||||
|
||||
:param cache_dir: The root of the cache.
|
||||
"""
|
||||
|
||||
def __init__(self, cache_dir: str) -> None:
|
||||
super().__init__()
|
||||
assert not cache_dir or os.path.isabs(cache_dir)
|
||||
self.cache_dir = cache_dir or None
|
||||
|
||||
def _get_cache_path_parts(self, link: Link) -> List[str]:
|
||||
"""Get parts of part that must be os.path.joined with cache_dir"""
|
||||
|
||||
# We want to generate an url to use as our cache key, we don't want to
|
||||
# just re-use the URL because it might have other items in the fragment
|
||||
# and we don't care about those.
|
||||
key_parts = {"url": link.url_without_fragment}
|
||||
if link.hash_name is not None and link.hash is not None:
|
||||
key_parts[link.hash_name] = link.hash
|
||||
if link.subdirectory_fragment:
|
||||
key_parts["subdirectory"] = link.subdirectory_fragment
|
||||
|
||||
# Include interpreter name, major and minor version in cache key
|
||||
# to cope with ill-behaved sdists that build a different wheel
|
||||
# depending on the python version their setup.py is being run on,
|
||||
# and don't encode the difference in compatibility tags.
|
||||
# https://github.com/pypa/pip/issues/7296
|
||||
key_parts["interpreter_name"] = interpreter_name()
|
||||
key_parts["interpreter_version"] = interpreter_version()
|
||||
|
||||
# Encode our key url with sha224, we'll use this because it has similar
|
||||
# security properties to sha256, but with a shorter total output (and
|
||||
# thus less secure). However the differences don't make a lot of
|
||||
# difference for our use case here.
|
||||
hashed = _hash_dict(key_parts)
|
||||
|
||||
# We want to nest the directories some to prevent having a ton of top
|
||||
# level directories where we might run out of sub directories on some
|
||||
# FS.
|
||||
parts = [hashed[:2], hashed[2:4], hashed[4:6], hashed[6:]]
|
||||
|
||||
return parts
|
||||
|
||||
def _get_candidates(self, link: Link, canonical_package_name: str) -> List[Any]:
|
||||
can_not_cache = not self.cache_dir or not canonical_package_name or not link
|
||||
if can_not_cache:
|
||||
return []
|
||||
|
||||
path = self.get_path_for_link(link)
|
||||
if os.path.isdir(path):
|
||||
return [(candidate, path) for candidate in os.listdir(path)]
|
||||
return []
|
||||
|
||||
def get_path_for_link(self, link: Link) -> str:
|
||||
"""Return a directory to store cached items in for link."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def get(
|
||||
self,
|
||||
link: Link,
|
||||
package_name: Optional[str],
|
||||
supported_tags: List[Tag],
|
||||
) -> Link:
|
||||
"""Returns a link to a cached item if it exists, otherwise returns the
|
||||
passed link.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class SimpleWheelCache(Cache):
|
||||
"""A cache of wheels for future installs."""
|
||||
|
||||
def __init__(self, cache_dir: str) -> None:
|
||||
super().__init__(cache_dir)
|
||||
|
||||
def get_path_for_link(self, link: Link) -> str:
|
||||
"""Return a directory to store cached wheels for link
|
||||
|
||||
Because there are M wheels for any one sdist, we provide a directory
|
||||
to cache them in, and then consult that directory when looking up
|
||||
cache hits.
|
||||
|
||||
We only insert things into the cache if they have plausible version
|
||||
numbers, so that we don't contaminate the cache with things that were
|
||||
not unique. E.g. ./package might have dozens of installs done for it
|
||||
and build a version of 0.0...and if we built and cached a wheel, we'd
|
||||
end up using the same wheel even if the source has been edited.
|
||||
|
||||
:param link: The link of the sdist for which this will cache wheels.
|
||||
"""
|
||||
parts = self._get_cache_path_parts(link)
|
||||
assert self.cache_dir
|
||||
# Store wheels within the root cache_dir
|
||||
return os.path.join(self.cache_dir, "wheels", *parts)
|
||||
|
||||
def get(
|
||||
self,
|
||||
link: Link,
|
||||
package_name: Optional[str],
|
||||
supported_tags: List[Tag],
|
||||
) -> Link:
|
||||
candidates = []
|
||||
|
||||
if not package_name:
|
||||
return link
|
||||
|
||||
canonical_package_name = canonicalize_name(package_name)
|
||||
for wheel_name, wheel_dir in self._get_candidates(link, canonical_package_name):
|
||||
try:
|
||||
wheel = Wheel(wheel_name)
|
||||
except InvalidWheelFilename:
|
||||
continue
|
||||
if canonicalize_name(wheel.name) != canonical_package_name:
|
||||
logger.debug(
|
||||
"Ignoring cached wheel %s for %s as it "
|
||||
"does not match the expected distribution name %s.",
|
||||
wheel_name,
|
||||
link,
|
||||
package_name,
|
||||
)
|
||||
continue
|
||||
if not wheel.supported(supported_tags):
|
||||
# Built for a different python/arch/etc
|
||||
continue
|
||||
candidates.append(
|
||||
(
|
||||
wheel.support_index_min(supported_tags),
|
||||
wheel_name,
|
||||
wheel_dir,
|
||||
)
|
||||
)
|
||||
|
||||
if not candidates:
|
||||
return link
|
||||
|
||||
_, wheel_name, wheel_dir = min(candidates)
|
||||
return Link(path_to_url(os.path.join(wheel_dir, wheel_name)))
|
||||
|
||||
|
||||
class EphemWheelCache(SimpleWheelCache):
|
||||
"""A SimpleWheelCache that creates it's own temporary cache directory"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._temp_dir = TempDirectory(
|
||||
kind=tempdir_kinds.EPHEM_WHEEL_CACHE,
|
||||
globally_managed=True,
|
||||
)
|
||||
|
||||
super().__init__(self._temp_dir.path)
|
||||
|
||||
|
||||
class CacheEntry:
|
||||
def __init__(
|
||||
self,
|
||||
link: Link,
|
||||
persistent: bool,
|
||||
):
|
||||
self.link = link
|
||||
self.persistent = persistent
|
||||
self.origin: Optional[DirectUrl] = None
|
||||
origin_direct_url_path = Path(self.link.file_path).parent / ORIGIN_JSON_NAME
|
||||
if origin_direct_url_path.exists():
|
||||
try:
|
||||
self.origin = DirectUrl.from_json(
|
||||
origin_direct_url_path.read_text(encoding="utf-8")
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Ignoring invalid cache entry origin file %s for %s (%s)",
|
||||
origin_direct_url_path,
|
||||
link.filename,
|
||||
e,
|
||||
)
|
||||
|
||||
|
||||
class WheelCache(Cache):
|
||||
"""Wraps EphemWheelCache and SimpleWheelCache into a single Cache
|
||||
|
||||
This Cache allows for gracefully degradation, using the ephem wheel cache
|
||||
when a certain link is not found in the simple wheel cache first.
|
||||
"""
|
||||
|
||||
def __init__(self, cache_dir: str) -> None:
|
||||
super().__init__(cache_dir)
|
||||
self._wheel_cache = SimpleWheelCache(cache_dir)
|
||||
self._ephem_cache = EphemWheelCache()
|
||||
|
||||
def get_path_for_link(self, link: Link) -> str:
|
||||
return self._wheel_cache.get_path_for_link(link)
|
||||
|
||||
def get_ephem_path_for_link(self, link: Link) -> str:
|
||||
return self._ephem_cache.get_path_for_link(link)
|
||||
|
||||
def get(
|
||||
self,
|
||||
link: Link,
|
||||
package_name: Optional[str],
|
||||
supported_tags: List[Tag],
|
||||
) -> Link:
|
||||
cache_entry = self.get_cache_entry(link, package_name, supported_tags)
|
||||
if cache_entry is None:
|
||||
return link
|
||||
return cache_entry.link
|
||||
|
||||
def get_cache_entry(
|
||||
self,
|
||||
link: Link,
|
||||
package_name: Optional[str],
|
||||
supported_tags: List[Tag],
|
||||
) -> Optional[CacheEntry]:
|
||||
"""Returns a CacheEntry with a link to a cached item if it exists or
|
||||
None. The cache entry indicates if the item was found in the persistent
|
||||
or ephemeral cache.
|
||||
"""
|
||||
retval = self._wheel_cache.get(
|
||||
link=link,
|
||||
package_name=package_name,
|
||||
supported_tags=supported_tags,
|
||||
)
|
||||
if retval is not link:
|
||||
return CacheEntry(retval, persistent=True)
|
||||
|
||||
retval = self._ephem_cache.get(
|
||||
link=link,
|
||||
package_name=package_name,
|
||||
supported_tags=supported_tags,
|
||||
)
|
||||
if retval is not link:
|
||||
return CacheEntry(retval, persistent=False)
|
||||
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def record_download_origin(cache_dir: str, download_info: DirectUrl) -> None:
|
||||
origin_path = Path(cache_dir) / ORIGIN_JSON_NAME
|
||||
if origin_path.exists():
|
||||
try:
|
||||
origin = DirectUrl.from_json(origin_path.read_text(encoding="utf-8"))
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Could not read origin file %s in cache entry (%s). "
|
||||
"Will attempt to overwrite it.",
|
||||
origin_path,
|
||||
e,
|
||||
)
|
||||
else:
|
||||
# TODO: use DirectUrl.equivalent when
|
||||
# https://github.com/pypa/pip/pull/10564 is merged.
|
||||
if origin.url != download_info.url:
|
||||
logger.warning(
|
||||
"Origin URL %s in cache entry %s does not match download URL "
|
||||
"%s. This is likely a pip bug or a cache corruption issue. "
|
||||
"Will overwrite it with the new value.",
|
||||
origin.url,
|
||||
cache_dir,
|
||||
download_info.url,
|
||||
)
|
||||
origin_path.write_text(download_info.to_json(), encoding="utf-8")
|
||||
@@ -0,0 +1,383 @@
|
||||
"""Configuration management setup
|
||||
|
||||
Some terminology:
|
||||
- name
|
||||
As written in config files.
|
||||
- value
|
||||
Value associated with a name
|
||||
- key
|
||||
Name combined with it's section (section.name)
|
||||
- variant
|
||||
A single word describing where the configuration key-value pair came from
|
||||
"""
|
||||
|
||||
import configparser
|
||||
import locale
|
||||
import os
|
||||
import sys
|
||||
from typing import Any, Dict, Iterable, List, NewType, Optional, Tuple
|
||||
|
||||
from pip._internal.exceptions import (
|
||||
ConfigurationError,
|
||||
ConfigurationFileCouldNotBeLoaded,
|
||||
)
|
||||
from pip._internal.utils import appdirs
|
||||
from pip._internal.utils.compat import WINDOWS
|
||||
from pip._internal.utils.logging import getLogger
|
||||
from pip._internal.utils.misc import ensure_dir, enum
|
||||
|
||||
RawConfigParser = configparser.RawConfigParser # Shorthand
|
||||
Kind = NewType("Kind", str)
|
||||
|
||||
CONFIG_BASENAME = "pip.ini" if WINDOWS else "pip.conf"
|
||||
ENV_NAMES_IGNORED = "version", "help"
|
||||
|
||||
# The kinds of configurations there are.
|
||||
kinds = enum(
|
||||
USER="user", # User Specific
|
||||
GLOBAL="global", # System Wide
|
||||
SITE="site", # [Virtual] Environment Specific
|
||||
ENV="env", # from PIP_CONFIG_FILE
|
||||
ENV_VAR="env-var", # from Environment Variables
|
||||
)
|
||||
OVERRIDE_ORDER = kinds.GLOBAL, kinds.USER, kinds.SITE, kinds.ENV, kinds.ENV_VAR
|
||||
VALID_LOAD_ONLY = kinds.USER, kinds.GLOBAL, kinds.SITE
|
||||
|
||||
logger = getLogger(__name__)
|
||||
|
||||
|
||||
# NOTE: Maybe use the optionx attribute to normalize keynames.
|
||||
def _normalize_name(name: str) -> str:
|
||||
"""Make a name consistent regardless of source (environment or file)"""
|
||||
name = name.lower().replace("_", "-")
|
||||
if name.startswith("--"):
|
||||
name = name[2:] # only prefer long opts
|
||||
return name
|
||||
|
||||
|
||||
def _disassemble_key(name: str) -> List[str]:
|
||||
if "." not in name:
|
||||
error_message = (
|
||||
"Key does not contain dot separated section and key. "
|
||||
f"Perhaps you wanted to use 'global.{name}' instead?"
|
||||
)
|
||||
raise ConfigurationError(error_message)
|
||||
return name.split(".", 1)
|
||||
|
||||
|
||||
def get_configuration_files() -> Dict[Kind, List[str]]:
|
||||
global_config_files = [
|
||||
os.path.join(path, CONFIG_BASENAME) for path in appdirs.site_config_dirs("pip")
|
||||
]
|
||||
|
||||
site_config_file = os.path.join(sys.prefix, CONFIG_BASENAME)
|
||||
legacy_config_file = os.path.join(
|
||||
os.path.expanduser("~"),
|
||||
"pip" if WINDOWS else ".pip",
|
||||
CONFIG_BASENAME,
|
||||
)
|
||||
new_config_file = os.path.join(appdirs.user_config_dir("pip"), CONFIG_BASENAME)
|
||||
return {
|
||||
kinds.GLOBAL: global_config_files,
|
||||
kinds.SITE: [site_config_file],
|
||||
kinds.USER: [legacy_config_file, new_config_file],
|
||||
}
|
||||
|
||||
|
||||
class Configuration:
|
||||
"""Handles management of configuration.
|
||||
|
||||
Provides an interface to accessing and managing configuration files.
|
||||
|
||||
This class converts provides an API that takes "section.key-name" style
|
||||
keys and stores the value associated with it as "key-name" under the
|
||||
section "section".
|
||||
|
||||
This allows for a clean interface wherein the both the section and the
|
||||
key-name are preserved in an easy to manage form in the configuration files
|
||||
and the data stored is also nice.
|
||||
"""
|
||||
|
||||
def __init__(self, isolated: bool, load_only: Optional[Kind] = None) -> None:
|
||||
super().__init__()
|
||||
|
||||
if load_only is not None and load_only not in VALID_LOAD_ONLY:
|
||||
raise ConfigurationError(
|
||||
"Got invalid value for load_only - should be one of {}".format(
|
||||
", ".join(map(repr, VALID_LOAD_ONLY))
|
||||
)
|
||||
)
|
||||
self.isolated = isolated
|
||||
self.load_only = load_only
|
||||
|
||||
# Because we keep track of where we got the data from
|
||||
self._parsers: Dict[Kind, List[Tuple[str, RawConfigParser]]] = {
|
||||
variant: [] for variant in OVERRIDE_ORDER
|
||||
}
|
||||
self._config: Dict[Kind, Dict[str, Any]] = {
|
||||
variant: {} for variant in OVERRIDE_ORDER
|
||||
}
|
||||
self._modified_parsers: List[Tuple[str, RawConfigParser]] = []
|
||||
|
||||
def load(self) -> None:
|
||||
"""Loads configuration from configuration files and environment"""
|
||||
self._load_config_files()
|
||||
if not self.isolated:
|
||||
self._load_environment_vars()
|
||||
|
||||
def get_file_to_edit(self) -> Optional[str]:
|
||||
"""Returns the file with highest priority in configuration"""
|
||||
assert self.load_only is not None, "Need to be specified a file to be editing"
|
||||
|
||||
try:
|
||||
return self._get_parser_to_modify()[0]
|
||||
except IndexError:
|
||||
return None
|
||||
|
||||
def items(self) -> Iterable[Tuple[str, Any]]:
|
||||
"""Returns key-value pairs like dict.items() representing the loaded
|
||||
configuration
|
||||
"""
|
||||
return self._dictionary.items()
|
||||
|
||||
def get_value(self, key: str) -> Any:
|
||||
"""Get a value from the configuration."""
|
||||
orig_key = key
|
||||
key = _normalize_name(key)
|
||||
try:
|
||||
return self._dictionary[key]
|
||||
except KeyError:
|
||||
# disassembling triggers a more useful error message than simply
|
||||
# "No such key" in the case that the key isn't in the form command.option
|
||||
_disassemble_key(key)
|
||||
raise ConfigurationError(f"No such key - {orig_key}")
|
||||
|
||||
def set_value(self, key: str, value: Any) -> None:
|
||||
"""Modify a value in the configuration."""
|
||||
key = _normalize_name(key)
|
||||
self._ensure_have_load_only()
|
||||
|
||||
assert self.load_only
|
||||
fname, parser = self._get_parser_to_modify()
|
||||
|
||||
if parser is not None:
|
||||
section, name = _disassemble_key(key)
|
||||
|
||||
# Modify the parser and the configuration
|
||||
if not parser.has_section(section):
|
||||
parser.add_section(section)
|
||||
parser.set(section, name, value)
|
||||
|
||||
self._config[self.load_only][key] = value
|
||||
self._mark_as_modified(fname, parser)
|
||||
|
||||
def unset_value(self, key: str) -> None:
|
||||
"""Unset a value in the configuration."""
|
||||
orig_key = key
|
||||
key = _normalize_name(key)
|
||||
self._ensure_have_load_only()
|
||||
|
||||
assert self.load_only
|
||||
if key not in self._config[self.load_only]:
|
||||
raise ConfigurationError(f"No such key - {orig_key}")
|
||||
|
||||
fname, parser = self._get_parser_to_modify()
|
||||
|
||||
if parser is not None:
|
||||
section, name = _disassemble_key(key)
|
||||
if not (
|
||||
parser.has_section(section) and parser.remove_option(section, name)
|
||||
):
|
||||
# The option was not removed.
|
||||
raise ConfigurationError(
|
||||
"Fatal Internal error [id=1]. Please report as a bug."
|
||||
)
|
||||
|
||||
# The section may be empty after the option was removed.
|
||||
if not parser.items(section):
|
||||
parser.remove_section(section)
|
||||
self._mark_as_modified(fname, parser)
|
||||
|
||||
del self._config[self.load_only][key]
|
||||
|
||||
def save(self) -> None:
|
||||
"""Save the current in-memory state."""
|
||||
self._ensure_have_load_only()
|
||||
|
||||
for fname, parser in self._modified_parsers:
|
||||
logger.info("Writing to %s", fname)
|
||||
|
||||
# Ensure directory exists.
|
||||
ensure_dir(os.path.dirname(fname))
|
||||
|
||||
# Ensure directory's permission(need to be writeable)
|
||||
try:
|
||||
with open(fname, "w") as f:
|
||||
parser.write(f)
|
||||
except OSError as error:
|
||||
raise ConfigurationError(
|
||||
f"An error occurred while writing to the configuration file "
|
||||
f"{fname}: {error}"
|
||||
)
|
||||
|
||||
#
|
||||
# Private routines
|
||||
#
|
||||
|
||||
def _ensure_have_load_only(self) -> None:
|
||||
if self.load_only is None:
|
||||
raise ConfigurationError("Needed a specific file to be modifying.")
|
||||
logger.debug("Will be working with %s variant only", self.load_only)
|
||||
|
||||
@property
|
||||
def _dictionary(self) -> Dict[str, Any]:
|
||||
"""A dictionary representing the loaded configuration."""
|
||||
# NOTE: Dictionaries are not populated if not loaded. So, conditionals
|
||||
# are not needed here.
|
||||
retval = {}
|
||||
|
||||
for variant in OVERRIDE_ORDER:
|
||||
retval.update(self._config[variant])
|
||||
|
||||
return retval
|
||||
|
||||
def _load_config_files(self) -> None:
|
||||
"""Loads configuration from configuration files"""
|
||||
config_files = dict(self.iter_config_files())
|
||||
if config_files[kinds.ENV][0:1] == [os.devnull]:
|
||||
logger.debug(
|
||||
"Skipping loading configuration files due to "
|
||||
"environment's PIP_CONFIG_FILE being os.devnull"
|
||||
)
|
||||
return
|
||||
|
||||
for variant, files in config_files.items():
|
||||
for fname in files:
|
||||
# If there's specific variant set in `load_only`, load only
|
||||
# that variant, not the others.
|
||||
if self.load_only is not None and variant != self.load_only:
|
||||
logger.debug("Skipping file '%s' (variant: %s)", fname, variant)
|
||||
continue
|
||||
|
||||
parser = self._load_file(variant, fname)
|
||||
|
||||
# Keeping track of the parsers used
|
||||
self._parsers[variant].append((fname, parser))
|
||||
|
||||
def _load_file(self, variant: Kind, fname: str) -> RawConfigParser:
|
||||
logger.verbose("For variant '%s', will try loading '%s'", variant, fname)
|
||||
parser = self._construct_parser(fname)
|
||||
|
||||
for section in parser.sections():
|
||||
items = parser.items(section)
|
||||
self._config[variant].update(self._normalized_keys(section, items))
|
||||
|
||||
return parser
|
||||
|
||||
def _construct_parser(self, fname: str) -> RawConfigParser:
|
||||
parser = configparser.RawConfigParser()
|
||||
# If there is no such file, don't bother reading it but create the
|
||||
# parser anyway, to hold the data.
|
||||
# Doing this is useful when modifying and saving files, where we don't
|
||||
# need to construct a parser.
|
||||
if os.path.exists(fname):
|
||||
locale_encoding = locale.getpreferredencoding(False)
|
||||
try:
|
||||
parser.read(fname, encoding=locale_encoding)
|
||||
except UnicodeDecodeError:
|
||||
# See https://github.com/pypa/pip/issues/4963
|
||||
raise ConfigurationFileCouldNotBeLoaded(
|
||||
reason=f"contains invalid {locale_encoding} characters",
|
||||
fname=fname,
|
||||
)
|
||||
except configparser.Error as error:
|
||||
# See https://github.com/pypa/pip/issues/4893
|
||||
raise ConfigurationFileCouldNotBeLoaded(error=error)
|
||||
return parser
|
||||
|
||||
def _load_environment_vars(self) -> None:
|
||||
"""Loads configuration from environment variables"""
|
||||
self._config[kinds.ENV_VAR].update(
|
||||
self._normalized_keys(":env:", self.get_environ_vars())
|
||||
)
|
||||
|
||||
def _normalized_keys(
|
||||
self, section: str, items: Iterable[Tuple[str, Any]]
|
||||
) -> Dict[str, Any]:
|
||||
"""Normalizes items to construct a dictionary with normalized keys.
|
||||
|
||||
This routine is where the names become keys and are made the same
|
||||
regardless of source - configuration files or environment.
|
||||
"""
|
||||
normalized = {}
|
||||
for name, val in items:
|
||||
key = section + "." + _normalize_name(name)
|
||||
normalized[key] = val
|
||||
return normalized
|
||||
|
||||
def get_environ_vars(self) -> Iterable[Tuple[str, str]]:
|
||||
"""Returns a generator with all environmental vars with prefix PIP_"""
|
||||
for key, val in os.environ.items():
|
||||
if key.startswith("PIP_"):
|
||||
name = key[4:].lower()
|
||||
if name not in ENV_NAMES_IGNORED:
|
||||
yield name, val
|
||||
|
||||
# XXX: This is patched in the tests.
|
||||
def iter_config_files(self) -> Iterable[Tuple[Kind, List[str]]]:
|
||||
"""Yields variant and configuration files associated with it.
|
||||
|
||||
This should be treated like items of a dictionary. The order
|
||||
here doesn't affect what gets overridden. That is controlled
|
||||
by OVERRIDE_ORDER. However this does control the order they are
|
||||
displayed to the user. It's probably most ergononmic to display
|
||||
things in the same order as OVERRIDE_ORDER
|
||||
"""
|
||||
# SMELL: Move the conditions out of this function
|
||||
|
||||
env_config_file = os.environ.get("PIP_CONFIG_FILE", None)
|
||||
config_files = get_configuration_files()
|
||||
|
||||
yield kinds.GLOBAL, config_files[kinds.GLOBAL]
|
||||
|
||||
# per-user config is not loaded when env_config_file exists
|
||||
should_load_user_config = not self.isolated and not (
|
||||
env_config_file and os.path.exists(env_config_file)
|
||||
)
|
||||
if should_load_user_config:
|
||||
# The legacy config file is overridden by the new config file
|
||||
yield kinds.USER, config_files[kinds.USER]
|
||||
|
||||
# virtualenv config
|
||||
yield kinds.SITE, config_files[kinds.SITE]
|
||||
|
||||
if env_config_file is not None:
|
||||
yield kinds.ENV, [env_config_file]
|
||||
else:
|
||||
yield kinds.ENV, []
|
||||
|
||||
def get_values_in_config(self, variant: Kind) -> Dict[str, Any]:
|
||||
"""Get values present in a config file"""
|
||||
return self._config[variant]
|
||||
|
||||
def _get_parser_to_modify(self) -> Tuple[str, RawConfigParser]:
|
||||
# Determine which parser to modify
|
||||
assert self.load_only
|
||||
parsers = self._parsers[self.load_only]
|
||||
if not parsers:
|
||||
# This should not happen if everything works correctly.
|
||||
raise ConfigurationError(
|
||||
"Fatal Internal error [id=2]. Please report as a bug."
|
||||
)
|
||||
|
||||
# Use the highest priority parser.
|
||||
return parsers[-1]
|
||||
|
||||
# XXX: This is patched in the tests.
|
||||
def _mark_as_modified(self, fname: str, parser: RawConfigParser) -> None:
|
||||
file_parser_tuple = (fname, parser)
|
||||
if file_parser_tuple not in self._modified_parsers:
|
||||
self._modified_parsers.append(file_parser_tuple)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"{self.__class__.__name__}({self._dictionary!r})"
|
||||
728
venv/lib64/python3.12/site-packages/pip/_internal/exceptions.py
Normal file
728
venv/lib64/python3.12/site-packages/pip/_internal/exceptions.py
Normal file
@@ -0,0 +1,728 @@
|
||||
"""Exceptions used throughout package.
|
||||
|
||||
This module MUST NOT try to import from anything within `pip._internal` to
|
||||
operate. This is expected to be importable from any/all files within the
|
||||
subpackage and, thus, should not depend on them.
|
||||
"""
|
||||
|
||||
import configparser
|
||||
import contextlib
|
||||
import locale
|
||||
import logging
|
||||
import pathlib
|
||||
import re
|
||||
import sys
|
||||
from itertools import chain, groupby, repeat
|
||||
from typing import TYPE_CHECKING, Dict, Iterator, List, Optional, Union
|
||||
|
||||
from pip._vendor.requests.models import Request, Response
|
||||
from pip._vendor.rich.console import Console, ConsoleOptions, RenderResult
|
||||
from pip._vendor.rich.markup import escape
|
||||
from pip._vendor.rich.text import Text
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from hashlib import _Hash
|
||||
from typing import Literal
|
||||
|
||||
from pip._internal.metadata import BaseDistribution
|
||||
from pip._internal.req.req_install import InstallRequirement
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
#
|
||||
# Scaffolding
|
||||
#
|
||||
def _is_kebab_case(s: str) -> bool:
|
||||
return re.match(r"^[a-z]+(-[a-z]+)*$", s) is not None
|
||||
|
||||
|
||||
def _prefix_with_indent(
|
||||
s: Union[Text, str],
|
||||
console: Console,
|
||||
*,
|
||||
prefix: str,
|
||||
indent: str,
|
||||
) -> Text:
|
||||
if isinstance(s, Text):
|
||||
text = s
|
||||
else:
|
||||
text = console.render_str(s)
|
||||
|
||||
return console.render_str(prefix, overflow="ignore") + console.render_str(
|
||||
f"\n{indent}", overflow="ignore"
|
||||
).join(text.split(allow_blank=True))
|
||||
|
||||
|
||||
class PipError(Exception):
|
||||
"""The base pip error."""
|
||||
|
||||
|
||||
class DiagnosticPipError(PipError):
|
||||
"""An error, that presents diagnostic information to the user.
|
||||
|
||||
This contains a bunch of logic, to enable pretty presentation of our error
|
||||
messages. Each error gets a unique reference. Each error can also include
|
||||
additional context, a hint and/or a note -- which are presented with the
|
||||
main error message in a consistent style.
|
||||
|
||||
This is adapted from the error output styling in `sphinx-theme-builder`.
|
||||
"""
|
||||
|
||||
reference: str
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
kind: 'Literal["error", "warning"]' = "error",
|
||||
reference: Optional[str] = None,
|
||||
message: Union[str, Text],
|
||||
context: Optional[Union[str, Text]],
|
||||
hint_stmt: Optional[Union[str, Text]],
|
||||
note_stmt: Optional[Union[str, Text]] = None,
|
||||
link: Optional[str] = None,
|
||||
) -> None:
|
||||
# Ensure a proper reference is provided.
|
||||
if reference is None:
|
||||
assert hasattr(self, "reference"), "error reference not provided!"
|
||||
reference = self.reference
|
||||
assert _is_kebab_case(reference), "error reference must be kebab-case!"
|
||||
|
||||
self.kind = kind
|
||||
self.reference = reference
|
||||
|
||||
self.message = message
|
||||
self.context = context
|
||||
|
||||
self.note_stmt = note_stmt
|
||||
self.hint_stmt = hint_stmt
|
||||
|
||||
self.link = link
|
||||
|
||||
super().__init__(f"<{self.__class__.__name__}: {self.reference}>")
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
f"<{self.__class__.__name__}("
|
||||
f"reference={self.reference!r}, "
|
||||
f"message={self.message!r}, "
|
||||
f"context={self.context!r}, "
|
||||
f"note_stmt={self.note_stmt!r}, "
|
||||
f"hint_stmt={self.hint_stmt!r}"
|
||||
")>"
|
||||
)
|
||||
|
||||
def __rich_console__(
|
||||
self,
|
||||
console: Console,
|
||||
options: ConsoleOptions,
|
||||
) -> RenderResult:
|
||||
colour = "red" if self.kind == "error" else "yellow"
|
||||
|
||||
yield f"[{colour} bold]{self.kind}[/]: [bold]{self.reference}[/]"
|
||||
yield ""
|
||||
|
||||
if not options.ascii_only:
|
||||
# Present the main message, with relevant context indented.
|
||||
if self.context is not None:
|
||||
yield _prefix_with_indent(
|
||||
self.message,
|
||||
console,
|
||||
prefix=f"[{colour}]×[/] ",
|
||||
indent=f"[{colour}]│[/] ",
|
||||
)
|
||||
yield _prefix_with_indent(
|
||||
self.context,
|
||||
console,
|
||||
prefix=f"[{colour}]╰─>[/] ",
|
||||
indent=f"[{colour}] [/] ",
|
||||
)
|
||||
else:
|
||||
yield _prefix_with_indent(
|
||||
self.message,
|
||||
console,
|
||||
prefix="[red]×[/] ",
|
||||
indent=" ",
|
||||
)
|
||||
else:
|
||||
yield self.message
|
||||
if self.context is not None:
|
||||
yield ""
|
||||
yield self.context
|
||||
|
||||
if self.note_stmt is not None or self.hint_stmt is not None:
|
||||
yield ""
|
||||
|
||||
if self.note_stmt is not None:
|
||||
yield _prefix_with_indent(
|
||||
self.note_stmt,
|
||||
console,
|
||||
prefix="[magenta bold]note[/]: ",
|
||||
indent=" ",
|
||||
)
|
||||
if self.hint_stmt is not None:
|
||||
yield _prefix_with_indent(
|
||||
self.hint_stmt,
|
||||
console,
|
||||
prefix="[cyan bold]hint[/]: ",
|
||||
indent=" ",
|
||||
)
|
||||
|
||||
if self.link is not None:
|
||||
yield ""
|
||||
yield f"Link: {self.link}"
|
||||
|
||||
|
||||
#
|
||||
# Actual Errors
|
||||
#
|
||||
class ConfigurationError(PipError):
|
||||
"""General exception in configuration"""
|
||||
|
||||
|
||||
class InstallationError(PipError):
|
||||
"""General exception during installation"""
|
||||
|
||||
|
||||
class UninstallationError(PipError):
|
||||
"""General exception during uninstallation"""
|
||||
|
||||
|
||||
class MissingPyProjectBuildRequires(DiagnosticPipError):
|
||||
"""Raised when pyproject.toml has `build-system`, but no `build-system.requires`."""
|
||||
|
||||
reference = "missing-pyproject-build-system-requires"
|
||||
|
||||
def __init__(self, *, package: str) -> None:
|
||||
super().__init__(
|
||||
message=f"Can not process {escape(package)}",
|
||||
context=Text(
|
||||
"This package has an invalid pyproject.toml file.\n"
|
||||
"The [build-system] table is missing the mandatory `requires` key."
|
||||
),
|
||||
note_stmt="This is an issue with the package mentioned above, not pip.",
|
||||
hint_stmt=Text("See PEP 518 for the detailed specification."),
|
||||
)
|
||||
|
||||
|
||||
class InvalidPyProjectBuildRequires(DiagnosticPipError):
|
||||
"""Raised when pyproject.toml an invalid `build-system.requires`."""
|
||||
|
||||
reference = "invalid-pyproject-build-system-requires"
|
||||
|
||||
def __init__(self, *, package: str, reason: str) -> None:
|
||||
super().__init__(
|
||||
message=f"Can not process {escape(package)}",
|
||||
context=Text(
|
||||
"This package has an invalid `build-system.requires` key in "
|
||||
f"pyproject.toml.\n{reason}"
|
||||
),
|
||||
note_stmt="This is an issue with the package mentioned above, not pip.",
|
||||
hint_stmt=Text("See PEP 518 for the detailed specification."),
|
||||
)
|
||||
|
||||
|
||||
class NoneMetadataError(PipError):
|
||||
"""Raised when accessing a Distribution's "METADATA" or "PKG-INFO".
|
||||
|
||||
This signifies an inconsistency, when the Distribution claims to have
|
||||
the metadata file (if not, raise ``FileNotFoundError`` instead), but is
|
||||
not actually able to produce its content. This may be due to permission
|
||||
errors.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
dist: "BaseDistribution",
|
||||
metadata_name: str,
|
||||
) -> None:
|
||||
"""
|
||||
:param dist: A Distribution object.
|
||||
:param metadata_name: The name of the metadata being accessed
|
||||
(can be "METADATA" or "PKG-INFO").
|
||||
"""
|
||||
self.dist = dist
|
||||
self.metadata_name = metadata_name
|
||||
|
||||
def __str__(self) -> str:
|
||||
# Use `dist` in the error message because its stringification
|
||||
# includes more information, like the version and location.
|
||||
return f"None {self.metadata_name} metadata found for distribution: {self.dist}"
|
||||
|
||||
|
||||
class UserInstallationInvalid(InstallationError):
|
||||
"""A --user install is requested on an environment without user site."""
|
||||
|
||||
def __str__(self) -> str:
|
||||
return "User base directory is not specified"
|
||||
|
||||
|
||||
class InvalidSchemeCombination(InstallationError):
|
||||
def __str__(self) -> str:
|
||||
before = ", ".join(str(a) for a in self.args[:-1])
|
||||
return f"Cannot set {before} and {self.args[-1]} together"
|
||||
|
||||
|
||||
class DistributionNotFound(InstallationError):
|
||||
"""Raised when a distribution cannot be found to satisfy a requirement"""
|
||||
|
||||
|
||||
class RequirementsFileParseError(InstallationError):
|
||||
"""Raised when a general error occurs parsing a requirements file line."""
|
||||
|
||||
|
||||
class BestVersionAlreadyInstalled(PipError):
|
||||
"""Raised when the most up-to-date version of a package is already
|
||||
installed."""
|
||||
|
||||
|
||||
class BadCommand(PipError):
|
||||
"""Raised when virtualenv or a command is not found"""
|
||||
|
||||
|
||||
class CommandError(PipError):
|
||||
"""Raised when there is an error in command-line arguments"""
|
||||
|
||||
|
||||
class PreviousBuildDirError(PipError):
|
||||
"""Raised when there's a previous conflicting build directory"""
|
||||
|
||||
|
||||
class NetworkConnectionError(PipError):
|
||||
"""HTTP connection error"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
error_msg: str,
|
||||
response: Optional[Response] = None,
|
||||
request: Optional[Request] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Initialize NetworkConnectionError with `request` and `response`
|
||||
objects.
|
||||
"""
|
||||
self.response = response
|
||||
self.request = request
|
||||
self.error_msg = error_msg
|
||||
if (
|
||||
self.response is not None
|
||||
and not self.request
|
||||
and hasattr(response, "request")
|
||||
):
|
||||
self.request = self.response.request
|
||||
super().__init__(error_msg, response, request)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return str(self.error_msg)
|
||||
|
||||
|
||||
class InvalidWheelFilename(InstallationError):
|
||||
"""Invalid wheel filename."""
|
||||
|
||||
|
||||
class UnsupportedWheel(InstallationError):
|
||||
"""Unsupported wheel."""
|
||||
|
||||
|
||||
class InvalidWheel(InstallationError):
|
||||
"""Invalid (e.g. corrupt) wheel."""
|
||||
|
||||
def __init__(self, location: str, name: str):
|
||||
self.location = location
|
||||
self.name = name
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"Wheel '{self.name}' located at {self.location} is invalid."
|
||||
|
||||
|
||||
class MetadataInconsistent(InstallationError):
|
||||
"""Built metadata contains inconsistent information.
|
||||
|
||||
This is raised when the metadata contains values (e.g. name and version)
|
||||
that do not match the information previously obtained from sdist filename,
|
||||
user-supplied ``#egg=`` value, or an install requirement name.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, ireq: "InstallRequirement", field: str, f_val: str, m_val: str
|
||||
) -> None:
|
||||
self.ireq = ireq
|
||||
self.field = field
|
||||
self.f_val = f_val
|
||||
self.m_val = m_val
|
||||
|
||||
def __str__(self) -> str:
|
||||
return (
|
||||
f"Requested {self.ireq} has inconsistent {self.field}: "
|
||||
f"expected {self.f_val!r}, but metadata has {self.m_val!r}"
|
||||
)
|
||||
|
||||
|
||||
class InstallationSubprocessError(DiagnosticPipError, InstallationError):
|
||||
"""A subprocess call failed."""
|
||||
|
||||
reference = "subprocess-exited-with-error"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
command_description: str,
|
||||
exit_code: int,
|
||||
output_lines: Optional[List[str]],
|
||||
) -> None:
|
||||
if output_lines is None:
|
||||
output_prompt = Text("See above for output.")
|
||||
else:
|
||||
output_prompt = (
|
||||
Text.from_markup(f"[red][{len(output_lines)} lines of output][/]\n")
|
||||
+ Text("".join(output_lines))
|
||||
+ Text.from_markup(R"[red]\[end of output][/]")
|
||||
)
|
||||
|
||||
super().__init__(
|
||||
message=(
|
||||
f"[green]{escape(command_description)}[/] did not run successfully.\n"
|
||||
f"exit code: {exit_code}"
|
||||
),
|
||||
context=output_prompt,
|
||||
hint_stmt=None,
|
||||
note_stmt=(
|
||||
"This error originates from a subprocess, and is likely not a "
|
||||
"problem with pip."
|
||||
),
|
||||
)
|
||||
|
||||
self.command_description = command_description
|
||||
self.exit_code = exit_code
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"{self.command_description} exited with {self.exit_code}"
|
||||
|
||||
|
||||
class MetadataGenerationFailed(InstallationSubprocessError, InstallationError):
|
||||
reference = "metadata-generation-failed"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
package_details: str,
|
||||
) -> None:
|
||||
super(InstallationSubprocessError, self).__init__(
|
||||
message="Encountered error while generating package metadata.",
|
||||
context=escape(package_details),
|
||||
hint_stmt="See above for details.",
|
||||
note_stmt="This is an issue with the package mentioned above, not pip.",
|
||||
)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return "metadata generation failed"
|
||||
|
||||
|
||||
class HashErrors(InstallationError):
|
||||
"""Multiple HashError instances rolled into one for reporting"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.errors: List["HashError"] = []
|
||||
|
||||
def append(self, error: "HashError") -> None:
|
||||
self.errors.append(error)
|
||||
|
||||
def __str__(self) -> str:
|
||||
lines = []
|
||||
self.errors.sort(key=lambda e: e.order)
|
||||
for cls, errors_of_cls in groupby(self.errors, lambda e: e.__class__):
|
||||
lines.append(cls.head)
|
||||
lines.extend(e.body() for e in errors_of_cls)
|
||||
if lines:
|
||||
return "\n".join(lines)
|
||||
return ""
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
return bool(self.errors)
|
||||
|
||||
|
||||
class HashError(InstallationError):
|
||||
"""
|
||||
A failure to verify a package against known-good hashes
|
||||
|
||||
:cvar order: An int sorting hash exception classes by difficulty of
|
||||
recovery (lower being harder), so the user doesn't bother fretting
|
||||
about unpinned packages when he has deeper issues, like VCS
|
||||
dependencies, to deal with. Also keeps error reports in a
|
||||
deterministic order.
|
||||
:cvar head: A section heading for display above potentially many
|
||||
exceptions of this kind
|
||||
:ivar req: The InstallRequirement that triggered this error. This is
|
||||
pasted on after the exception is instantiated, because it's not
|
||||
typically available earlier.
|
||||
|
||||
"""
|
||||
|
||||
req: Optional["InstallRequirement"] = None
|
||||
head = ""
|
||||
order: int = -1
|
||||
|
||||
def body(self) -> str:
|
||||
"""Return a summary of me for display under the heading.
|
||||
|
||||
This default implementation simply prints a description of the
|
||||
triggering requirement.
|
||||
|
||||
:param req: The InstallRequirement that provoked this error, with
|
||||
its link already populated by the resolver's _populate_link().
|
||||
|
||||
"""
|
||||
return f" {self._requirement_name()}"
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"{self.head}\n{self.body()}"
|
||||
|
||||
def _requirement_name(self) -> str:
|
||||
"""Return a description of the requirement that triggered me.
|
||||
|
||||
This default implementation returns long description of the req, with
|
||||
line numbers
|
||||
|
||||
"""
|
||||
return str(self.req) if self.req else "unknown package"
|
||||
|
||||
|
||||
class VcsHashUnsupported(HashError):
|
||||
"""A hash was provided for a version-control-system-based requirement, but
|
||||
we don't have a method for hashing those."""
|
||||
|
||||
order = 0
|
||||
head = (
|
||||
"Can't verify hashes for these requirements because we don't "
|
||||
"have a way to hash version control repositories:"
|
||||
)
|
||||
|
||||
|
||||
class DirectoryUrlHashUnsupported(HashError):
|
||||
"""A hash was provided for a version-control-system-based requirement, but
|
||||
we don't have a method for hashing those."""
|
||||
|
||||
order = 1
|
||||
head = (
|
||||
"Can't verify hashes for these file:// requirements because they "
|
||||
"point to directories:"
|
||||
)
|
||||
|
||||
|
||||
class HashMissing(HashError):
|
||||
"""A hash was needed for a requirement but is absent."""
|
||||
|
||||
order = 2
|
||||
head = (
|
||||
"Hashes are required in --require-hashes mode, but they are "
|
||||
"missing from some requirements. Here is a list of those "
|
||||
"requirements along with the hashes their downloaded archives "
|
||||
"actually had. Add lines like these to your requirements files to "
|
||||
"prevent tampering. (If you did not enable --require-hashes "
|
||||
"manually, note that it turns on automatically when any package "
|
||||
"has a hash.)"
|
||||
)
|
||||
|
||||
def __init__(self, gotten_hash: str) -> None:
|
||||
"""
|
||||
:param gotten_hash: The hash of the (possibly malicious) archive we
|
||||
just downloaded
|
||||
"""
|
||||
self.gotten_hash = gotten_hash
|
||||
|
||||
def body(self) -> str:
|
||||
# Dodge circular import.
|
||||
from pip._internal.utils.hashes import FAVORITE_HASH
|
||||
|
||||
package = None
|
||||
if self.req:
|
||||
# In the case of URL-based requirements, display the original URL
|
||||
# seen in the requirements file rather than the package name,
|
||||
# so the output can be directly copied into the requirements file.
|
||||
package = (
|
||||
self.req.original_link
|
||||
if self.req.is_direct
|
||||
# In case someone feeds something downright stupid
|
||||
# to InstallRequirement's constructor.
|
||||
else getattr(self.req, "req", None)
|
||||
)
|
||||
return " {} --hash={}:{}".format(
|
||||
package or "unknown package", FAVORITE_HASH, self.gotten_hash
|
||||
)
|
||||
|
||||
|
||||
class HashUnpinned(HashError):
|
||||
"""A requirement had a hash specified but was not pinned to a specific
|
||||
version."""
|
||||
|
||||
order = 3
|
||||
head = (
|
||||
"In --require-hashes mode, all requirements must have their "
|
||||
"versions pinned with ==. These do not:"
|
||||
)
|
||||
|
||||
|
||||
class HashMismatch(HashError):
|
||||
"""
|
||||
Distribution file hash values don't match.
|
||||
|
||||
:ivar package_name: The name of the package that triggered the hash
|
||||
mismatch. Feel free to write to this after the exception is raise to
|
||||
improve its error message.
|
||||
|
||||
"""
|
||||
|
||||
order = 4
|
||||
head = (
|
||||
"THESE PACKAGES DO NOT MATCH THE HASHES FROM THE REQUIREMENTS "
|
||||
"FILE. If you have updated the package versions, please update "
|
||||
"the hashes. Otherwise, examine the package contents carefully; "
|
||||
"someone may have tampered with them."
|
||||
)
|
||||
|
||||
def __init__(self, allowed: Dict[str, List[str]], gots: Dict[str, "_Hash"]) -> None:
|
||||
"""
|
||||
:param allowed: A dict of algorithm names pointing to lists of allowed
|
||||
hex digests
|
||||
:param gots: A dict of algorithm names pointing to hashes we
|
||||
actually got from the files under suspicion
|
||||
"""
|
||||
self.allowed = allowed
|
||||
self.gots = gots
|
||||
|
||||
def body(self) -> str:
|
||||
return f" {self._requirement_name()}:\n{self._hash_comparison()}"
|
||||
|
||||
def _hash_comparison(self) -> str:
|
||||
"""
|
||||
Return a comparison of actual and expected hash values.
|
||||
|
||||
Example::
|
||||
|
||||
Expected sha256 abcdeabcdeabcdeabcdeabcdeabcdeabcdeabcdeabcde
|
||||
or 123451234512345123451234512345123451234512345
|
||||
Got bcdefbcdefbcdefbcdefbcdefbcdefbcdefbcdefbcdef
|
||||
|
||||
"""
|
||||
|
||||
def hash_then_or(hash_name: str) -> "chain[str]":
|
||||
# For now, all the decent hashes have 6-char names, so we can get
|
||||
# away with hard-coding space literals.
|
||||
return chain([hash_name], repeat(" or"))
|
||||
|
||||
lines: List[str] = []
|
||||
for hash_name, expecteds in self.allowed.items():
|
||||
prefix = hash_then_or(hash_name)
|
||||
lines.extend((f" Expected {next(prefix)} {e}") for e in expecteds)
|
||||
lines.append(
|
||||
f" Got {self.gots[hash_name].hexdigest()}\n"
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
class UnsupportedPythonVersion(InstallationError):
|
||||
"""Unsupported python version according to Requires-Python package
|
||||
metadata."""
|
||||
|
||||
|
||||
class ConfigurationFileCouldNotBeLoaded(ConfigurationError):
|
||||
"""When there are errors while loading a configuration file"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
reason: str = "could not be loaded",
|
||||
fname: Optional[str] = None,
|
||||
error: Optional[configparser.Error] = None,
|
||||
) -> None:
|
||||
super().__init__(error)
|
||||
self.reason = reason
|
||||
self.fname = fname
|
||||
self.error = error
|
||||
|
||||
def __str__(self) -> str:
|
||||
if self.fname is not None:
|
||||
message_part = f" in {self.fname}."
|
||||
else:
|
||||
assert self.error is not None
|
||||
message_part = f".\n{self.error}\n"
|
||||
return f"Configuration file {self.reason}{message_part}"
|
||||
|
||||
|
||||
_DEFAULT_EXTERNALLY_MANAGED_ERROR = f"""\
|
||||
The Python environment under {sys.prefix} is managed externally, and may not be
|
||||
manipulated by the user. Please use specific tooling from the distributor of
|
||||
the Python installation to interact with this environment instead.
|
||||
"""
|
||||
|
||||
|
||||
class ExternallyManagedEnvironment(DiagnosticPipError):
|
||||
"""The current environment is externally managed.
|
||||
|
||||
This is raised when the current environment is externally managed, as
|
||||
defined by `PEP 668`_. The ``EXTERNALLY-MANAGED`` configuration is checked
|
||||
and displayed when the error is bubbled up to the user.
|
||||
|
||||
:param error: The error message read from ``EXTERNALLY-MANAGED``.
|
||||
"""
|
||||
|
||||
reference = "externally-managed-environment"
|
||||
|
||||
def __init__(self, error: Optional[str]) -> None:
|
||||
if error is None:
|
||||
context = Text(_DEFAULT_EXTERNALLY_MANAGED_ERROR)
|
||||
else:
|
||||
context = Text(error)
|
||||
super().__init__(
|
||||
message="This environment is externally managed",
|
||||
context=context,
|
||||
note_stmt=(
|
||||
"If you believe this is a mistake, please contact your "
|
||||
"Python installation or OS distribution provider. "
|
||||
"You can override this, at the risk of breaking your Python "
|
||||
"installation or OS, by passing --break-system-packages."
|
||||
),
|
||||
hint_stmt=Text("See PEP 668 for the detailed specification."),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _iter_externally_managed_error_keys() -> Iterator[str]:
|
||||
# LC_MESSAGES is in POSIX, but not the C standard. The most common
|
||||
# platform that does not implement this category is Windows, where
|
||||
# using other categories for console message localization is equally
|
||||
# unreliable, so we fall back to the locale-less vendor message. This
|
||||
# can always be re-evaluated when a vendor proposes a new alternative.
|
||||
try:
|
||||
category = locale.LC_MESSAGES
|
||||
except AttributeError:
|
||||
lang: Optional[str] = None
|
||||
else:
|
||||
lang, _ = locale.getlocale(category)
|
||||
if lang is not None:
|
||||
yield f"Error-{lang}"
|
||||
for sep in ("-", "_"):
|
||||
before, found, _ = lang.partition(sep)
|
||||
if not found:
|
||||
continue
|
||||
yield f"Error-{before}"
|
||||
yield "Error"
|
||||
|
||||
@classmethod
|
||||
def from_config(
|
||||
cls,
|
||||
config: Union[pathlib.Path, str],
|
||||
) -> "ExternallyManagedEnvironment":
|
||||
parser = configparser.ConfigParser(interpolation=None)
|
||||
try:
|
||||
parser.read(config, encoding="utf-8")
|
||||
section = parser["externally-managed"]
|
||||
for key in cls._iter_externally_managed_error_keys():
|
||||
with contextlib.suppress(KeyError):
|
||||
return cls(section[key])
|
||||
except KeyError:
|
||||
pass
|
||||
except (OSError, UnicodeDecodeError, configparser.ParsingError):
|
||||
from pip._internal.utils._log import VERBOSE
|
||||
|
||||
exc_info = logger.isEnabledFor(VERBOSE)
|
||||
logger.warning("Failed to read %s", config, exc_info=exc_info)
|
||||
return cls(None)
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user