diff --git a/b2sdk/_internal/transfer/inbound/downloader/simple.py b/b2sdk/_internal/transfer/inbound/downloader/simple.py index 87fabac05..31821abc8 100644 --- a/b2sdk/_internal/transfer/inbound/downloader/simple.py +++ b/b2sdk/_internal/transfer/inbound/downloader/simple.py @@ -12,6 +12,7 @@ import logging from io import IOBase +from requests.exceptions import ChunkedEncodingError, ContentDecodingError from requests.models import Response from b2sdk._internal.encryption.setting import EncryptionSetting @@ -43,10 +44,13 @@ def _download( chunk_size = self._get_chunk_size(actual_size) decoded_bytes_read = 0 - for data in response.iter_content(chunk_size=chunk_size): - file.write(data) - digest.update(data) - decoded_bytes_read += len(data) + try: + for data in response.iter_content(chunk_size=chunk_size): + file.write(data) + digest.update(data) + decoded_bytes_read += len(data) + except (ChunkedEncodingError, ContentDecodingError) as exc: + logger.debug('Stream read error during download, will retry if needed: %s', exc) bytes_read = response.raw.tell() response.close() @@ -58,8 +62,7 @@ def _download( # or something and the server closes connection, while neither tcp or http have a problem # with the truncated output, so we detect it here and try to continue - num_tries = 5 # this is hardcoded because we are going to replace the entire retry interface soon, so we'll avoid deprecation here and keep it private - retries_left = num_tries - 1 + retries_left = 4 # this is hardcoded because we are going to replace the entire retry interface soon, so we'll avoid deprecation here and keep it private while retries_left and bytes_read < download_version.content_length: new_range = self._get_remote_range( response, @@ -79,12 +82,15 @@ def _download( new_range.as_tuple(), encryption=encryption, ) as followup_response: - for data in followup_response.iter_content( - chunk_size=self._get_chunk_size(actual_size) - ): - file.write(data) - digest.update(data) - decoded_bytes_read += len(data) + try: + for data in followup_response.iter_content( + chunk_size=self._get_chunk_size(actual_size) + ): + file.write(data) + digest.update(data) + decoded_bytes_read += len(data) + except (ChunkedEncodingError, ContentDecodingError) as exc: + logger.debug('Stream read error during download, will retry if needed: %s', exc) bytes_read += followup_response.raw.tell() retries_left -= 1 return bytes_read, digest.hexdigest() diff --git a/changelog.d/+read-error-retry.fixed.md b/changelog.d/+read-error-retry.fixed.md new file mode 100644 index 000000000..241a89682 --- /dev/null +++ b/changelog.d/+read-error-retry.fixed.md @@ -0,0 +1 @@ +Retry stream read errors during download in `SimpleDownloader`. diff --git a/test/unit/internal/transfer/downloader/test_simple.py b/test/unit/internal/transfer/downloader/test_simple.py new file mode 100644 index 000000000..a4b18bc95 --- /dev/null +++ b/test/unit/internal/transfer/downloader/test_simple.py @@ -0,0 +1,136 @@ +###################################################################### +# +# File: test/unit/internal/transfer/downloader/test_simple.py +# +# Copyright 2026 Backblaze Inc. All Rights Reserved. +# +# License https://www.backblaze.com/using_b2_code.html +# +###################################################################### +import os +from collections.abc import Callable, Iterator +from io import BytesIO +from itertools import count +from types import ModuleType +from typing import Any + +import pytest +from apiver_deps import B2Api, Bucket, DownloadVersion, SimpleDownloader +from requests.exceptions import ChunkedEncodingError, ContentDecodingError +from requests.models import Response +from urllib3.exceptions import DecodeError, IncompleteRead, ProtocolError + +CHUNKED_ENCODING_ERROR = ChunkedEncodingError( + ProtocolError( + 'Connection broken: IncompleteRead(1 bytes read, 99 more expected)', + IncompleteRead(1, 99), + ) +) +CONTENT_DECODING_ERROR = ContentDecodingError( + DecodeError('Error -3 while decompressing data: incorrect header check') +) + + +@pytest.fixture +def file_size() -> int: + return 100 + + +@pytest.fixture +def file_content(file_size: int) -> bytes: + return os.urandom(file_size) + + +@pytest.fixture +def mock_download_response( + apiver_module: ModuleType, + bucket: Bucket, + file_content: bytes, +) -> tuple[Response, DownloadVersion]: + file_version = bucket.upload_bytes(file_content, f'dummy_file_{len(file_content)}.txt') + + url = bucket.api.session.get_download_url_by_name(bucket.name, file_version.file_name) + response = bucket.api.services.session.download_file_from_url(url).__enter__() + + return ( + response, + apiver_module.DownloadVersionFactory(bucket.api).from_response_headers(response.headers), + ) + + +@pytest.fixture +def output_file() -> BytesIO: + return BytesIO() + + +@pytest.fixture +def downloader(apiver_module: ModuleType) -> SimpleDownloader: + return apiver_module.SimpleDownloader(force_chunk_size=5) + + +def _make_iter_content( + response: Response, + attempts: Iterator[int], + fail_count: int, + stream_error: ChunkedEncodingError | ContentDecodingError, +) -> Callable[..., Iterator[bytes]]: + def iter_content(chunk_size: int = 1, decode_unicode: bool = False) -> Iterator[bytes]: + attempt = next(attempts) + chunk = response.raw.read(1) + if chunk: + yield chunk + if attempt <= fail_count: + raise stream_error + while True: + chunk = response.raw.read(chunk_size) + if not chunk: + break + yield chunk + + return iter_content + + +@pytest.mark.parametrize('fail_count', [0, 1, 2, 4, 5]) +@pytest.mark.parametrize( + 'stream_error', + [ + pytest.param(CHUNKED_ENCODING_ERROR, id='ChunkedEncodingError'), + pytest.param(CONTENT_DECODING_ERROR, id='ContentDecodingError'), + ], +) +def test_download_file__stream_read_error( + b2api: B2Api, + bucket: Bucket, + downloader: SimpleDownloader, + output_file: BytesIO, + file_size: int, + file_content: bytes, + mock_download_response: tuple[Response, DownloadVersion], + fail_count: int, + stream_error: ChunkedEncodingError | ContentDecodingError, +) -> None: + mock_response, download_version = mock_download_response + + attempts = count(1) + mock_response.iter_content = _make_iter_content( + mock_response, attempts, fail_count, stream_error + ) + + download_func = bucket.api.services.session.download_file_from_url + + def download_func_mock(*args: Any, **kwargs: Any) -> Response: + response = download_func(*args, **kwargs).__enter__() + response.iter_content = _make_iter_content(response, attempts, fail_count, stream_error) + return response + + bucket.api.services.session.download_file_from_url = download_func_mock + + bytes_written, _ = downloader.download( + output_file, mock_response, download_version, b2api.session + ) + + if fail_count < 5: + assert bytes_written == file_size + assert output_file.getvalue() == file_content + else: + assert bytes_written == fail_count