Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions b2sdk/_internal/transfer/inbound/downloader/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import logging
from io import IOBase

from requests.exceptions import ChunkedEncodingError, ContentDecodingError
from requests.models import Response

from b2sdk._internal.encryption.setting import EncryptionSetting
Expand Down Expand Up @@ -43,10 +44,13 @@ def _download(
chunk_size = self._get_chunk_size(actual_size)

decoded_bytes_read = 0
for data in response.iter_content(chunk_size=chunk_size):
file.write(data)
digest.update(data)
decoded_bytes_read += len(data)
try:
for data in response.iter_content(chunk_size=chunk_size):
file.write(data)
digest.update(data)
decoded_bytes_read += len(data)
except (ChunkedEncodingError, ContentDecodingError) as exc:
logger.debug('Stream read error during download, will retry if needed: %s', exc)
bytes_read = response.raw.tell()
response.close()

Expand All @@ -58,8 +62,7 @@ def _download(
# or something and the server closes connection, while neither tcp or http have a problem
# with the truncated output, so we detect it here and try to continue

num_tries = 5 # this is hardcoded because we are going to replace the entire retry interface soon, so we'll avoid deprecation here and keep it private
retries_left = num_tries - 1
retries_left = 4 # this is hardcoded because we are going to replace the entire retry interface soon, so we'll avoid deprecation here and keep it private
while retries_left and bytes_read < download_version.content_length:
new_range = self._get_remote_range(
response,
Expand All @@ -79,12 +82,15 @@ def _download(
new_range.as_tuple(),
encryption=encryption,
) as followup_response:
for data in followup_response.iter_content(
chunk_size=self._get_chunk_size(actual_size)
):
file.write(data)
digest.update(data)
decoded_bytes_read += len(data)
try:
for data in followup_response.iter_content(
chunk_size=self._get_chunk_size(actual_size)
):
file.write(data)
digest.update(data)
decoded_bytes_read += len(data)
except (ChunkedEncodingError, ContentDecodingError) as exc:
logger.debug('Stream read error during download, will retry if needed: %s', exc)
bytes_read += followup_response.raw.tell()
retries_left -= 1
return bytes_read, digest.hexdigest()
Expand Down
1 change: 1 addition & 0 deletions changelog.d/+read-error-retry.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Retry stream read errors during download in `SimpleDownloader`.
136 changes: 136 additions & 0 deletions test/unit/internal/transfer/downloader/test_simple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
######################################################################
#
# File: test/unit/internal/transfer/downloader/test_simple.py
#
# Copyright 2026 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
import os
from collections.abc import Callable, Iterator
from io import BytesIO
from itertools import count
from types import ModuleType
from typing import Any

import pytest
from apiver_deps import B2Api, Bucket, DownloadVersion, SimpleDownloader
from requests.exceptions import ChunkedEncodingError, ContentDecodingError
from requests.models import Response
from urllib3.exceptions import DecodeError, IncompleteRead, ProtocolError

CHUNKED_ENCODING_ERROR = ChunkedEncodingError(
ProtocolError(
'Connection broken: IncompleteRead(1 bytes read, 99 more expected)',
IncompleteRead(1, 99),
)
)
CONTENT_DECODING_ERROR = ContentDecodingError(
DecodeError('Error -3 while decompressing data: incorrect header check')
)


@pytest.fixture
def file_size() -> int:
return 100


@pytest.fixture
def file_content(file_size: int) -> bytes:
return os.urandom(file_size)


@pytest.fixture
def mock_download_response(
apiver_module: ModuleType,
bucket: Bucket,
file_content: bytes,
) -> tuple[Response, DownloadVersion]:
file_version = bucket.upload_bytes(file_content, f'dummy_file_{len(file_content)}.txt')

url = bucket.api.session.get_download_url_by_name(bucket.name, file_version.file_name)
response = bucket.api.services.session.download_file_from_url(url).__enter__()

return (
response,
apiver_module.DownloadVersionFactory(bucket.api).from_response_headers(response.headers),
)


@pytest.fixture
def output_file() -> BytesIO:
return BytesIO()


@pytest.fixture
def downloader(apiver_module: ModuleType) -> SimpleDownloader:
return apiver_module.SimpleDownloader(force_chunk_size=5)


def _make_iter_content(
response: Response,
attempts: Iterator[int],
fail_count: int,
stream_error: ChunkedEncodingError | ContentDecodingError,
) -> Callable[..., Iterator[bytes]]:
def iter_content(chunk_size: int = 1, decode_unicode: bool = False) -> Iterator[bytes]:
attempt = next(attempts)
chunk = response.raw.read(1)
if chunk:
yield chunk
if attempt <= fail_count:
raise stream_error
while True:
chunk = response.raw.read(chunk_size)
if not chunk:
break
yield chunk

return iter_content


@pytest.mark.parametrize('fail_count', [0, 1, 2, 4, 5])
@pytest.mark.parametrize(
'stream_error',
[
pytest.param(CHUNKED_ENCODING_ERROR, id='ChunkedEncodingError'),
pytest.param(CONTENT_DECODING_ERROR, id='ContentDecodingError'),
],
)
def test_download_file__stream_read_error(
b2api: B2Api,
bucket: Bucket,
downloader: SimpleDownloader,
output_file: BytesIO,
file_size: int,
file_content: bytes,
mock_download_response: tuple[Response, DownloadVersion],
fail_count: int,
stream_error: ChunkedEncodingError | ContentDecodingError,
) -> None:
mock_response, download_version = mock_download_response

attempts = count(1)
mock_response.iter_content = _make_iter_content(
mock_response, attempts, fail_count, stream_error
)

download_func = bucket.api.services.session.download_file_from_url

def download_func_mock(*args: Any, **kwargs: Any) -> Response:
response = download_func(*args, **kwargs).__enter__()
response.iter_content = _make_iter_content(response, attempts, fail_count, stream_error)
return response

bucket.api.services.session.download_file_from_url = download_func_mock

bytes_written, _ = downloader.download(
output_file, mock_response, download_version, b2api.session
)

if fail_count < 5:
assert bytes_written == file_size
assert output_file.getvalue() == file_content
else:
assert bytes_written == fail_count
Loading