Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: Download subrange of an object instead of downloading the whole object #260

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions s3transfer/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,13 +360,27 @@ def _submit(
response['ContentLength']
)

# Clamp range_start and range_end to the length of the object
range_start = transfer_future.meta.call_args.range_start
if range_start < 0:
range_start = 0
range_end = transfer_future.meta.call_args.range_end
if range_end is None or range_end >= transfer_future.meta.size:
range_end = transfer_future.meta.size - 1
transfer_future.meta.call_args.range_start = range_start
transfer_future.meta.call_args.range_end = range_end

download_output_manager = self._get_download_output_manager_cls(
transfer_future, osutil
)(osutil, self._transfer_coordinator, io_executor)

# If it is greater than threshold do a ranged download, otherwise
# do a regular GetObject download.
if transfer_future.meta.size < config.multipart_threshold:
# If it is greater than threshold and the whole object do a ranged download,
# otherwise do a regular GetObject download.
if (
transfer_future.meta.size < config.multipart_threshold
and range_start == 0
and range_end == transfer_future.meta.size - 1
):
self._submit_download_request(
client,
config,
Expand Down Expand Up @@ -463,7 +477,9 @@ def _submit_ranged_download_request(

# Determine the number of parts
part_size = config.multipart_chunksize
num_parts = calculate_num_parts(transfer_future.meta.size, part_size)
num_parts = calculate_num_parts(
max(call_args.range_end - call_args.range_start + 1, 0), part_size
)

# Get any associated tags for the get object task.
get_object_tag = download_output_manager.get_download_task_tag()
Expand All @@ -478,7 +494,11 @@ def _submit_ranged_download_request(
for i in range(num_parts):
# Calculate the range parameter
range_parameter = calculate_range_parameter(
part_size, i, num_parts
part_size,
i,
num_parts,
call_args.range_end + 1,
call_args.range_start,
)

# Inject the Range parameter to the parameters to be passed in
Expand Down Expand Up @@ -518,16 +538,6 @@ def _get_final_io_task_submission_callback(
self._transfer_coordinator.submit, io_executor, final_task
)

def _calculate_range_param(self, part_size, part_index, num_parts):
# Used to calculate the Range parameter
start_range = part_index * part_size
if part_index == num_parts - 1:
end_range = ''
else:
end_range = start_range + part_size - 1
range_param = f'bytes={start_range}-{end_range}'
return range_param


class GetObjectTask(Task):
def _main(
Expand Down
20 changes: 19 additions & 1 deletion s3transfer/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,15 @@ def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None):
)

def download(
self, bucket, key, fileobj, extra_args=None, subscribers=None
self,
bucket,
key,
fileobj,
extra_args=None,
subscribers=None,
*,
range_start=0,
range_end=None
):
"""Downloads a file from S3

Expand All @@ -359,6 +367,14 @@ def download(
order provided based on the event emit during the process of
the transfer request.

:type range_start: int
:param range_start: The start of the range to download from the object.

:type range_end: int
:param range_end: The end of the range to download from the object. Default is
the length of the object minus one. Total length is of the download is
`range_end - range_start + 1`

:rtype: s3transfer.futures.TransferFuture
:returns: Transfer future representing the download
"""
Expand All @@ -374,6 +390,8 @@ def download(
fileobj=fileobj,
extra_args=extra_args,
subscribers=subscribers,
range_start=range_start,
range_end=range_end,
)
extra_main_kwargs = {'io_executor': self._io_executor}
if self._bandwidth_limiter:
Expand Down
49 changes: 42 additions & 7 deletions s3transfer/processpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@
'filename', # The user-requested download location
'extra_args', # Extra arguments to provide to client calls
'expected_size', # The user-provided expected size of the download
'range_start', # The user-provided start point for the download
'range_end', # The user-provided end point for the download
],
)

Expand Down Expand Up @@ -323,7 +325,15 @@ def __init__(self, client_kwargs=None, config=None):
self._workers = []

def download_file(
self, bucket, key, filename, extra_args=None, expected_size=None
self,
bucket,
key,
filename,
extra_args=None,
expected_size=None,
*,
range_start=0,
range_end=None,
):
"""Downloads the object's contents to a file

Expand All @@ -346,6 +356,14 @@ def download_file(
object's size and use the provided value instead. The size is
needed to determine whether to do a multipart download.

:type range_start: int
:param range_start: The start of the range to download from the object.

:type range_end: int
:param range_end: The end of the range to download from the object. Default is
the length of the object minus one. Total length is of the download is
`range_end - range_start + 1`

:rtype: s3transfer.futures.TransferFuture
:returns: Transfer future representing the download
"""
Expand All @@ -361,6 +379,8 @@ def download_file(
filename=filename,
extra_args=extra_args,
expected_size=expected_size,
range_start=range_start,
range_end=range_end,
)
logger.debug(
'Submitting download file request: %s.', download_file_request
Expand Down Expand Up @@ -814,14 +834,27 @@ def _do_run(self):

def _submit_get_object_jobs(self, download_file_request):
size = self._get_size(download_file_request)
temp_filename = self._allocate_temp_file(download_file_request, size)
if size < self._transfer_config.multipart_threshold:
# Clamp range_start and range_end to the length of the object
range_start = download_file_request.range_start
if range_start < 0:
range_start = 0
range_end = download_file_request.range_end
if range_end is None or range_end >= size:
range_end = size - 1
temp_filename = self._allocate_temp_file(
download_file_request, range_end - range_start + 1
)
if (
size < self._transfer_config.multipart_threshold
and range_start == 0
and range_end == size - 1
):
self._submit_single_get_object_job(
download_file_request, temp_filename
)
else:
self._submit_ranged_get_object_jobs(
download_file_request, temp_filename, size
download_file_request, temp_filename, range_start, range_end
)

def _get_size(self, download_file_request):
Expand Down Expand Up @@ -856,17 +889,19 @@ def _submit_single_get_object_job(
)

def _submit_ranged_get_object_jobs(
self, download_file_request, temp_filename, size
self, download_file_request, temp_filename, range_start, range_end
):
part_size = self._transfer_config.multipart_chunksize
num_parts = calculate_num_parts(size, part_size)
num_parts = calculate_num_parts(
max(range_end - range_start + 1, 0), part_size
)
self._notify_jobs_to_complete(
download_file_request.transfer_id, num_parts
)
for i in range(num_parts):
offset = i * part_size
range_parameter = calculate_range_parameter(
part_size, i, num_parts
part_size, i, num_parts, range_end + 1, range_start
)
get_object_kwargs = {'Range': range_parameter}
get_object_kwargs.update(download_file_request.extra_args)
Expand Down
10 changes: 8 additions & 2 deletions s3transfer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def calculate_num_parts(size, part_size):


def calculate_range_parameter(
part_size, part_index, num_parts, total_size=None
part_size, part_index, num_parts, total_size=None, offset=0
):
"""Calculate the range parameter for multipart downloads/copies

Expand All @@ -79,11 +79,17 @@ def calculate_range_parameter(
:type num_parts: int
:param num_parts: The total number of parts in the transfer

:type total_size: int
:param total_size: The total size of the object be transferred

:type offset: int
:param offset: The offset from the beginning of the object to start the range

:returns: The value to use for Range parameter on downloads or
the CopySourceRange parameter for copies
"""
# Used to calculate the Range parameter
start_range = part_index * part_size
start_range = part_index * part_size + offset
if part_index == num_parts - 1:
end_range = ''
if total_size is not None:
Expand Down
Loading