|
"""Download files with progress indicators.""" |
|
|
|
from __future__ import annotations |
|
|
|
import email.message |
|
import logging |
|
import mimetypes |
|
import os |
|
from collections.abc import Iterable, Mapping |
|
from dataclasses import dataclass |
|
from http import HTTPStatus |
|
from typing import BinaryIO |
|
|
|
from pip._vendor.requests import PreparedRequest |
|
from pip._vendor.requests.models import Response |
|
from pip._vendor.urllib3 import HTTPResponse as URLlib3Response |
|
from pip._vendor.urllib3._collections import HTTPHeaderDict |
|
from pip._vendor.urllib3.exceptions import ReadTimeoutError |
|
|
|
from pip._internal.cli.progress_bars import BarType, get_download_progress_renderer |
|
from pip._internal.exceptions import IncompleteDownloadError, NetworkConnectionError |
|
from pip._internal.models.index import PyPI |
|
from pip._internal.models.link import Link |
|
from pip._internal.network.cache import SafeFileCache, is_from_cache |
|
from pip._internal.network.session import CacheControlAdapter, PipSession |
|
from pip._internal.network.utils import HEADERS, raise_for_status, response_chunks |
|
from pip._internal.utils.misc import format_size, redact_auth_from_url, splitext |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def _get_http_response_size(resp: Response) -> int | None: |
|
try: |
|
return int(resp.headers["content-length"]) |
|
except (ValueError, KeyError, TypeError): |
|
return None |
|
|
|
|
|
def _get_http_response_etag_or_last_modified(resp: Response) -> str | None: |
|
""" |
|
Return either the ETag or Last-Modified header (or None if neither exists). |
|
The return value can be used in an If-Range header. |
|
""" |
|
return resp.headers.get("etag", resp.headers.get("last-modified")) |
|
|
|
|
|
def _log_download( |
|
resp: Response, |
|
link: Link, |
|
progress_bar: BarType, |
|
total_length: int | None, |
|
range_start: int | None = 0, |
|
) -> Iterable[bytes]: |
|
if link.netloc == PyPI.file_storage_domain: |
|
url = link.show_url |
|
else: |
|
url = link.url_without_fragment |
|
|
|
logged_url = redact_auth_from_url(url) |
|
|
|
if total_length: |
|
if range_start: |
|
logged_url = ( |
|
f"{logged_url} ({format_size(range_start)}/{format_size(total_length)})" |
|
) |
|
else: |
|
logged_url = f"{logged_url} ({format_size(total_length)})" |
|
|
|
if is_from_cache(resp): |
|
logger.info("Using cached %s", logged_url) |
|
elif range_start: |
|
logger.info("Resuming download %s", logged_url) |
|
else: |
|
logger.info("Downloading %s", logged_url) |
|
|
|
if logger.getEffectiveLevel() > logging.INFO: |
|
show_progress = False |
|
elif is_from_cache(resp): |
|
show_progress = False |
|
elif not total_length: |
|
show_progress = True |
|
elif total_length > (512 * 1024): |
|
show_progress = True |
|
else: |
|
show_progress = False |
|
|
|
chunks = response_chunks(resp) |
|
|
|
if not show_progress: |
|
return chunks |
|
|
|
renderer = get_download_progress_renderer( |
|
bar_type=progress_bar, size=total_length, initial_progress=range_start |
|
) |
|
return renderer(chunks) |
|
|
|
|
|
def sanitize_content_filename(filename: str) -> str: |
|
""" |
|
Sanitize the "filename" value from a Content-Disposition header. |
|
""" |
|
return os.path.basename(filename) |
|
|
|
|
|
def parse_content_disposition(content_disposition: str, default_filename: str) -> str: |
|
""" |
|
Parse the "filename" value from a Content-Disposition header, and |
|
return the default filename if the result is empty. |
|
""" |
|
m = email.message.Message() |
|
m["content-type"] = content_disposition |
|
filename = m.get_param("filename") |
|
if filename: |
|
|
|
|
|
filename = sanitize_content_filename(str(filename)) |
|
return filename or default_filename |
|
|
|
|
|
def _get_http_response_filename(resp: Response, link: Link) -> str: |
|
"""Get an ideal filename from the given HTTP response, falling back to |
|
the link filename if not provided. |
|
""" |
|
filename = link.filename |
|
|
|
content_disposition = resp.headers.get("content-disposition") |
|
if content_disposition: |
|
filename = parse_content_disposition(content_disposition, filename) |
|
ext: str | None = splitext(filename)[1] |
|
if not ext: |
|
ext = mimetypes.guess_extension(resp.headers.get("content-type", "")) |
|
if ext: |
|
filename += ext |
|
if not ext and link.url != resp.url: |
|
ext = os.path.splitext(resp.url)[1] |
|
if ext: |
|
filename += ext |
|
return filename |
|
|
|
|
|
@dataclass |
|
class _FileDownload: |
|
"""Stores the state of a single link download.""" |
|
|
|
link: Link |
|
output_file: BinaryIO |
|
size: int | None |
|
bytes_received: int = 0 |
|
reattempts: int = 0 |
|
|
|
def is_incomplete(self) -> bool: |
|
return bool(self.size is not None and self.bytes_received < self.size) |
|
|
|
def write_chunk(self, data: bytes) -> None: |
|
self.bytes_received += len(data) |
|
self.output_file.write(data) |
|
|
|
def reset_file(self) -> None: |
|
"""Delete any saved data and reset progress to zero.""" |
|
self.output_file.seek(0) |
|
self.output_file.truncate() |
|
self.bytes_received = 0 |
|
|
|
|
|
class Downloader: |
|
def __init__( |
|
self, |
|
session: PipSession, |
|
progress_bar: BarType, |
|
resume_retries: int, |
|
) -> None: |
|
assert ( |
|
resume_retries >= 0 |
|
), "Number of max resume retries must be bigger or equal to zero" |
|
self._session = session |
|
self._progress_bar = progress_bar |
|
self._resume_retries = resume_retries |
|
|
|
def batch( |
|
self, links: Iterable[Link], location: str |
|
) -> Iterable[tuple[Link, tuple[str, str]]]: |
|
"""Convenience method to download multiple links.""" |
|
for link in links: |
|
filepath, content_type = self(link, location) |
|
yield link, (filepath, content_type) |
|
|
|
def __call__(self, link: Link, location: str) -> tuple[str, str]: |
|
"""Download a link and save it under location.""" |
|
resp = self._http_get(link) |
|
download_size = _get_http_response_size(resp) |
|
|
|
filepath = os.path.join(location, _get_http_response_filename(resp, link)) |
|
with open(filepath, "wb") as content_file: |
|
download = _FileDownload(link, content_file, download_size) |
|
self._process_response(download, resp) |
|
if download.is_incomplete(): |
|
self._attempt_resumes_or_redownloads(download, resp) |
|
|
|
content_type = resp.headers.get("Content-Type", "") |
|
return filepath, content_type |
|
|
|
def _process_response(self, download: _FileDownload, resp: Response) -> None: |
|
"""Download and save chunks from a response.""" |
|
chunks = _log_download( |
|
resp, |
|
download.link, |
|
self._progress_bar, |
|
download.size, |
|
range_start=download.bytes_received, |
|
) |
|
try: |
|
for chunk in chunks: |
|
download.write_chunk(chunk) |
|
except ReadTimeoutError as e: |
|
|
|
if download.size is None: |
|
raise e |
|
|
|
logger.warning("Connection timed out while downloading.") |
|
|
|
def _attempt_resumes_or_redownloads( |
|
self, download: _FileDownload, first_resp: Response |
|
) -> None: |
|
"""Attempt to resume/restart the download if connection was dropped.""" |
|
|
|
while download.reattempts < self._resume_retries and download.is_incomplete(): |
|
assert download.size is not None |
|
download.reattempts += 1 |
|
logger.warning( |
|
"Attempting to resume incomplete download (%s/%s, attempt %d)", |
|
format_size(download.bytes_received), |
|
format_size(download.size), |
|
download.reattempts, |
|
) |
|
|
|
try: |
|
resume_resp = self._http_get_resume(download, should_match=first_resp) |
|
|
|
|
|
|
|
must_restart = resume_resp.status_code != HTTPStatus.PARTIAL_CONTENT |
|
if must_restart: |
|
download.reset_file() |
|
download.size = _get_http_response_size(resume_resp) |
|
first_resp = resume_resp |
|
|
|
self._process_response(download, resume_resp) |
|
except (ConnectionError, ReadTimeoutError, OSError): |
|
continue |
|
|
|
|
|
if download.is_incomplete(): |
|
os.remove(download.output_file.name) |
|
raise IncompleteDownloadError(download) |
|
|
|
|
|
|
|
if download.reattempts > 0: |
|
self._cache_resumed_download(download, first_resp) |
|
|
|
def _cache_resumed_download( |
|
self, download: _FileDownload, original_response: Response |
|
) -> None: |
|
""" |
|
Manually cache a file that was successfully downloaded via resume retries. |
|
|
|
cachecontrol doesn't cache 206 (Partial Content) responses, since they |
|
are not complete files. This method manually adds the final file to the |
|
cache as though it was downloaded in a single request, so that future |
|
requests can use the cache. |
|
""" |
|
url = download.link.url_without_fragment |
|
adapter = self._session.get_adapter(url) |
|
|
|
|
|
if not isinstance(adapter, CacheControlAdapter): |
|
logger.debug( |
|
"Skipping resume download caching: no cache controller for %s", url |
|
) |
|
return |
|
|
|
|
|
assert isinstance( |
|
adapter.cache, SafeFileCache |
|
), "separate body cache not in use!" |
|
|
|
synthetic_request = PreparedRequest() |
|
synthetic_request.prepare(method="GET", url=url, headers={}) |
|
|
|
synthetic_response_headers = HTTPHeaderDict() |
|
for key, value in original_response.headers.items(): |
|
if key.lower() not in ["content-range", "content-length"]: |
|
synthetic_response_headers[key] = value |
|
synthetic_response_headers["content-length"] = str(download.size) |
|
|
|
synthetic_response = URLlib3Response( |
|
body="", |
|
headers=synthetic_response_headers, |
|
status=200, |
|
preload_content=False, |
|
) |
|
|
|
|
|
cache_url = adapter.controller.cache_url(url) |
|
metadata_blob = adapter.controller.serializer.dumps( |
|
synthetic_request, synthetic_response, b"" |
|
) |
|
adapter.cache.set(cache_url, metadata_blob) |
|
download.output_file.flush() |
|
with open(download.output_file.name, "rb") as f: |
|
adapter.cache.set_body_from_io(cache_url, f) |
|
|
|
logger.debug( |
|
"Cached resumed download as complete response for future use: %s", url |
|
) |
|
|
|
def _http_get_resume( |
|
self, download: _FileDownload, should_match: Response |
|
) -> Response: |
|
"""Issue a HTTP range request to resume the download.""" |
|
|
|
|
|
headers = HEADERS.copy() |
|
headers["Range"] = f"bytes={download.bytes_received}-" |
|
|
|
|
|
if identifier := _get_http_response_etag_or_last_modified(should_match): |
|
headers["If-Range"] = identifier |
|
return self._http_get(download.link, headers) |
|
|
|
def _http_get(self, link: Link, headers: Mapping[str, str] = HEADERS) -> Response: |
|
target_url = link.url_without_fragment |
|
try: |
|
resp = self._session.get(target_url, headers=headers, stream=True) |
|
raise_for_status(resp) |
|
except NetworkConnectionError as e: |
|
assert e.response is not None |
|
logger.critical( |
|
"HTTP error %s while getting %s", e.response.status_code, link |
|
) |
|
raise |
|
return resp |
|
|