diff --git a/s3transfer/download.py b/s3transfer/download.py index dc8980d4..1ca57682 100644 --- a/s3transfer/download.py +++ b/s3transfer/download.py @@ -360,13 +360,27 @@ def _submit( response['ContentLength'] ) + # Clamp range_start and range_end to the length of the object + range_start = transfer_future.meta.call_args.range_start + if range_start < 0: + range_start = 0 + range_end = transfer_future.meta.call_args.range_end + if range_end is None or range_end >= transfer_future.meta.size: + range_end = transfer_future.meta.size - 1 + transfer_future.meta.call_args.range_start = range_start + transfer_future.meta.call_args.range_end = range_end + download_output_manager = self._get_download_output_manager_cls( transfer_future, osutil )(osutil, self._transfer_coordinator, io_executor) - # If it is greater than threshold do a ranged download, otherwise - # do a regular GetObject download. - if transfer_future.meta.size < config.multipart_threshold: + # If it is greater than threshold and the whole object do a ranged download, + # otherwise do a regular GetObject download. + if ( + transfer_future.meta.size < config.multipart_threshold + and range_start == 0 + and range_end == transfer_future.meta.size - 1 + ): self._submit_download_request( client, config, @@ -463,7 +477,9 @@ def _submit_ranged_download_request( # Determine the number of parts part_size = config.multipart_chunksize - num_parts = calculate_num_parts(transfer_future.meta.size, part_size) + num_parts = calculate_num_parts( + max(call_args.range_end - call_args.range_start + 1, 0), part_size + ) # Get any associated tags for the get object task. get_object_tag = download_output_manager.get_download_task_tag() @@ -478,7 +494,11 @@ def _submit_ranged_download_request( for i in range(num_parts): # Calculate the range parameter range_parameter = calculate_range_parameter( - part_size, i, num_parts + part_size, + i, + num_parts, + call_args.range_end + 1, + call_args.range_start, ) # Inject the Range parameter to the parameters to be passed in @@ -518,16 +538,6 @@ def _get_final_io_task_submission_callback( self._transfer_coordinator.submit, io_executor, final_task ) - def _calculate_range_param(self, part_size, part_index, num_parts): - # Used to calculate the Range parameter - start_range = part_index * part_size - if part_index == num_parts - 1: - end_range = '' - else: - end_range = start_range + part_size - 1 - range_param = f'bytes={start_range}-{end_range}' - return range_param - class GetObjectTask(Task): def _main( diff --git a/s3transfer/manager.py b/s3transfer/manager.py index b11daeba..fad02709 100644 --- a/s3transfer/manager.py +++ b/s3transfer/manager.py @@ -335,7 +335,15 @@ def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None): ) def download( - self, bucket, key, fileobj, extra_args=None, subscribers=None + self, + bucket, + key, + fileobj, + extra_args=None, + subscribers=None, + *, + range_start=0, + range_end=None ): """Downloads a file from S3 @@ -359,6 +367,14 @@ def download( order provided based on the event emit during the process of the transfer request. + :type range_start: int + :param range_start: The start of the range to download from the object. + + :type range_end: int + :param range_end: The end of the range to download from the object. Default is + the length of the object minus one. Total length is of the download is + `range_end - range_start + 1` + :rtype: s3transfer.futures.TransferFuture :returns: Transfer future representing the download """ @@ -374,6 +390,8 @@ def download( fileobj=fileobj, extra_args=extra_args, subscribers=subscribers, + range_start=range_start, + range_end=range_end, ) extra_main_kwargs = {'io_executor': self._io_executor} if self._bandwidth_limiter: diff --git a/s3transfer/processpool.py b/s3transfer/processpool.py index 017eeb44..cd4ce12e 100644 --- a/s3transfer/processpool.py +++ b/s3transfer/processpool.py @@ -231,6 +231,8 @@ 'filename', # The user-requested download location 'extra_args', # Extra arguments to provide to client calls 'expected_size', # The user-provided expected size of the download + 'range_start', # The user-provided start point for the download + 'range_end', # The user-provided end point for the download ], ) @@ -323,7 +325,15 @@ def __init__(self, client_kwargs=None, config=None): self._workers = [] def download_file( - self, bucket, key, filename, extra_args=None, expected_size=None + self, + bucket, + key, + filename, + extra_args=None, + expected_size=None, + *, + range_start=0, + range_end=None, ): """Downloads the object's contents to a file @@ -346,6 +356,14 @@ def download_file( object's size and use the provided value instead. The size is needed to determine whether to do a multipart download. + :type range_start: int + :param range_start: The start of the range to download from the object. + + :type range_end: int + :param range_end: The end of the range to download from the object. Default is + the length of the object minus one. Total length is of the download is + `range_end - range_start + 1` + :rtype: s3transfer.futures.TransferFuture :returns: Transfer future representing the download """ @@ -361,6 +379,8 @@ def download_file( filename=filename, extra_args=extra_args, expected_size=expected_size, + range_start=range_start, + range_end=range_end, ) logger.debug( 'Submitting download file request: %s.', download_file_request @@ -814,14 +834,27 @@ def _do_run(self): def _submit_get_object_jobs(self, download_file_request): size = self._get_size(download_file_request) - temp_filename = self._allocate_temp_file(download_file_request, size) - if size < self._transfer_config.multipart_threshold: + # Clamp range_start and range_end to the length of the object + range_start = download_file_request.range_start + if range_start < 0: + range_start = 0 + range_end = download_file_request.range_end + if range_end is None or range_end >= size: + range_end = size - 1 + temp_filename = self._allocate_temp_file( + download_file_request, range_end - range_start + 1 + ) + if ( + size < self._transfer_config.multipart_threshold + and range_start == 0 + and range_end == size - 1 + ): self._submit_single_get_object_job( download_file_request, temp_filename ) else: self._submit_ranged_get_object_jobs( - download_file_request, temp_filename, size + download_file_request, temp_filename, range_start, range_end ) def _get_size(self, download_file_request): @@ -856,17 +889,19 @@ def _submit_single_get_object_job( ) def _submit_ranged_get_object_jobs( - self, download_file_request, temp_filename, size + self, download_file_request, temp_filename, range_start, range_end ): part_size = self._transfer_config.multipart_chunksize - num_parts = calculate_num_parts(size, part_size) + num_parts = calculate_num_parts( + max(range_end - range_start + 1, 0), part_size + ) self._notify_jobs_to_complete( download_file_request.transfer_id, num_parts ) for i in range(num_parts): offset = i * part_size range_parameter = calculate_range_parameter( - part_size, i, num_parts + part_size, i, num_parts, range_end + 1, range_start ) get_object_kwargs = {'Range': range_parameter} get_object_kwargs.update(download_file_request.extra_args) diff --git a/s3transfer/utils.py b/s3transfer/utils.py index 61407eba..fb1c79d1 100644 --- a/s3transfer/utils.py +++ b/s3transfer/utils.py @@ -65,7 +65,7 @@ def calculate_num_parts(size, part_size): def calculate_range_parameter( - part_size, part_index, num_parts, total_size=None + part_size, part_index, num_parts, total_size=None, offset=0 ): """Calculate the range parameter for multipart downloads/copies @@ -79,11 +79,17 @@ def calculate_range_parameter( :type num_parts: int :param num_parts: The total number of parts in the transfer + :type total_size: int + :param total_size: The total size of the object be transferred + + :type offset: int + :param offset: The offset from the beginning of the object to start the range + :returns: The value to use for Range parameter on downloads or the CopySourceRange parameter for copies """ # Used to calculate the Range parameter - start_range = part_index * part_size + start_range = part_index * part_size + offset if part_index == num_parts - 1: end_range = '' if total_size is not None: diff --git a/tests/functional/test_download.py b/tests/functional/test_download.py index f458721d..d761b174 100644 --- a/tests/functional/test_download.py +++ b/tests/functional/test_download.py @@ -456,14 +456,42 @@ class TestRangedDownload(BaseDownloadTest): # from the general test base class, which we do not want ran. __test__ = True + CHUNKSIZE = 4 + def setUp(self): super().setUp() self.config = TransferConfig( max_request_concurrency=1, multipart_threshold=1, - multipart_chunksize=4, + multipart_chunksize=self.CHUNKSIZE, ) self._manager = TransferManager(self.client, self.config) + self.get_object_responses = [] + self.create_get_object_responses(0, len(self.content) - 1) + + def generate_get_object_responses(self, range_start, range_end): + for start in range(range_start, range_end + 1, self.CHUNKSIZE): + end = min(start + self.CHUNKSIZE, range_end + 1) + yield { + 'method': 'get_object', + 'service_response': {'Body': BytesIO(self.content[start:end])}, + } + + def create_get_object_responses(self, range_start, range_end): + self.get_object_responses = [ + r + for r in self.generate_get_object_responses(range_start, range_end) + ] + + def generate_expected_ranges(self, range_start, range_end): + for start in range(range_start, range_end + 1, self.CHUNKSIZE): + end = min(start + self.CHUNKSIZE - 1, range_end) + yield f'bytes={start}-{end}' + + def create_expected_ranges(self, range_start, range_end): + return [ + r for r in self.generate_expected_ranges(range_start, range_end) + ] def create_stubbed_responses(self): return [ @@ -471,19 +499,7 @@ def create_stubbed_responses(self): 'method': 'head_object', 'service_response': {'ContentLength': len(self.content)}, }, - { - 'method': 'get_object', - 'service_response': {'Body': BytesIO(self.content[0:4])}, - }, - { - 'method': 'get_object', - 'service_response': {'Body': BytesIO(self.content[4:8])}, - }, - { - 'method': 'get_object', - 'service_response': {'Body': BytesIO(self.content[8:])}, - }, - ] + ] + self.get_object_responses def create_expected_progress_callback_info(self): return [ @@ -494,12 +510,13 @@ def create_expected_progress_callback_info(self): def test_download(self): self.extra_args['RequestPayer'] = 'requester' + self.create_get_object_responses(0, len(self.content) - 1) expected_params = { 'Bucket': self.bucket, 'Key': self.key, 'RequestPayer': 'requester', } - expected_ranges = ['bytes=0-3', 'bytes=4-7', 'bytes=8-'] + expected_ranges = self.create_expected_ranges(0, len(self.content) - 1) self.add_head_object_response(expected_params) self.add_successful_get_object_responses( expected_params, expected_ranges @@ -516,12 +533,13 @@ def test_download(self): def test_download_with_checksum_enabled(self): self.extra_args['ChecksumMode'] = 'ENABLED' + self.create_get_object_responses(0, len(self.content) - 1) expected_params = { 'Bucket': self.bucket, 'Key': self.key, 'ChecksumMode': 'ENABLED', } - expected_ranges = ['bytes=0-3', 'bytes=4-7', 'bytes=8-'] + expected_ranges = self.create_expected_ranges(0, len(self.content) - 1) self.add_head_object_response(expected_params) self.add_successful_get_object_responses( expected_params, expected_ranges @@ -535,3 +553,121 @@ def test_download_with_checksum_enabled(self): # Ensure that the contents are correct with open(self.filename, 'rb') as f: self.assertEqual(self.content, f.read()) + + def test_download_range_clamped_to_size(self): + self.create_get_object_responses(0, len(self.content) - 1) + expected_params = { + 'Bucket': self.bucket, + 'Key': self.key, + } + expected_ranges = self.create_expected_ranges(0, len(self.content) - 1) + self.add_head_object_response(expected_params) + self.add_successful_get_object_responses( + expected_params, expected_ranges + ) + + future = self.manager.download( + self.bucket, + self.key, + self.filename, + self.extra_args, + range_start=-1, + range_end=len(self.content) + 10, + ) + future.result() + + self.stubber.assert_no_pending_responses() + + # Ensure that the contents are correct + with open(self.filename, 'rb') as f: + self.assertEqual(self.content, f.read()) + + def test_download_range_start(self): + range_start = 5 + range_end = len(self.content) - 1 + self.create_get_object_responses(range_start, range_end) + expected_params = { + 'Bucket': self.bucket, + 'Key': self.key, + } + expected_ranges = self.create_expected_ranges(range_start, range_end) + self.add_head_object_response(expected_params) + self.add_successful_get_object_responses( + expected_params, expected_ranges + ) + + future = self.manager.download( + self.bucket, + self.key, + self.filename, + self.extra_args, + range_start=range_start, + ) + future.result() + + self.stubber.assert_no_pending_responses() + + # Ensure that the contents are correct + with open(self.filename, 'rb') as f: + self.assertEqual(self.content[range_start:], f.read()) + + def test_download_range_end(self): + range_start = 0 + range_end = 5 + self.create_get_object_responses(range_start, range_end) + expected_params = { + 'Bucket': self.bucket, + 'Key': self.key, + } + expected_ranges = self.create_expected_ranges(range_start, range_end) + self.add_head_object_response(expected_params) + self.add_successful_get_object_responses( + expected_params, expected_ranges + ) + + future = self.manager.download( + self.bucket, + self.key, + self.filename, + self.extra_args, + range_end=range_end, + ) + future.result() + + self.stubber.assert_no_pending_responses() + + # Ensure that the contents are correct + with open(self.filename, 'rb') as f: + self.assertEqual(self.content[: range_end + 1], f.read()) + + def test_download_range_start_and_end(self): + range_start = 5 + range_end = 5 + self.create_get_object_responses(range_start, range_end) + expected_params = { + 'Bucket': self.bucket, + 'Key': self.key, + } + expected_ranges = self.create_expected_ranges(range_start, range_end) + self.add_head_object_response(expected_params) + self.add_successful_get_object_responses( + expected_params, expected_ranges + ) + + future = self.manager.download( + self.bucket, + self.key, + self.filename, + self.extra_args, + range_start=range_start, + range_end=range_end, + ) + future.result() + + self.stubber.assert_no_pending_responses() + + # Ensure that the contents are correct + with open(self.filename, 'rb') as f: + self.assertEqual( + self.content[range_start : range_end + 1], f.read() + ) diff --git a/tests/functional/test_processpool.py b/tests/functional/test_processpool.py index 7b45d1f3..a6a6c12f 100644 --- a/tests/functional/test_processpool.py +++ b/tests/functional/test_processpool.py @@ -48,6 +48,9 @@ def add_response(self, *args, **kwargs): def add_client_error(self, *args, **kwargs): self._stubber.add_client_error(*args, **kwargs) + def assert_no_pending_responses(self): + self._stubber.assert_no_pending_responses() + class StubbedClientManager(BaseManager): pass @@ -107,17 +110,50 @@ def assert_contents(self, filename, expected_contents): with open(filename, 'rb') as f: self.assertEqual(f.read(), expected_contents) + def add_head_object_response(self, expected_params=None): + head_response = { + 'method': 'head_object', + 'service_response': {'ContentLength': len(self.remote_contents)}, + } + if expected_params: + head_response['expected_params'] = expected_params + self.stubbed_client.add_response(**head_response) + + def add_get_object_response(self, expected_params=None): + get_object_response = { + 'method': 'get_object', + 'service_response': {'Body': self.stream}, + } + if expected_params: + get_object_response['expected_params'] = expected_params + self.stubbed_client.add_response(**get_object_response) + + def add_ranged_get_object_response( + self, range_start, range_end, expected_params=None + ): + expected = {'Range': f'bytes={range_start}-{range_end}'} + if expected_params: + expected.update(expected_params) + response = { + 'method': 'get_object', + 'service_response': { + 'Body': BytesIO( + self.remote_contents[range_start : range_end + 1] + ) + }, + 'expected_params': expected_params, + } + self.stubbed_client.add_response(**response) + def test_download_file(self): - self.stubbed_client.add_response( - 'head_object', {'ContentLength': len(self.remote_contents)} - ) - self.stubbed_client.add_response('get_object', {'Body': self.stream}) + self.add_head_object_response() + self.add_get_object_response() with self.downloader: self.downloader.download_file(self.bucket, self.key, self.filename) self.assert_contents(self.filename, self.remote_contents) def test_download_multiple_files(self): - self.stubbed_client.add_response('get_object', {'Body': self.stream}) + self.add_get_object_response() self.stubbed_client.add_response( 'get_object', {'Body': BytesIO(self.remote_contents)} ) @@ -140,9 +176,7 @@ def test_download_multiple_files(self): def test_download_file_ranged_download(self): half_of_content_length = int(len(self.remote_contents) / 2) - self.stubbed_client.add_response( - 'head_object', {'ContentLength': len(self.remote_contents)} - ) + self.add_head_object_response() self.stubbed_client.add_response( 'get_object', {'Body': BytesIO(self.remote_contents[:half_of_content_length])}, @@ -163,24 +197,13 @@ def test_download_file_ranged_download(self): self.assert_contents(self.filename, self.remote_contents) def test_download_file_extra_args(self): - self.stubbed_client.add_response( - 'head_object', - {'ContentLength': len(self.remote_contents)}, - expected_params={ - 'Bucket': self.bucket, - 'Key': self.key, - 'VersionId': 'versionid', - }, - ) - self.stubbed_client.add_response( - 'get_object', - {'Body': self.stream}, - expected_params={ - 'Bucket': self.bucket, - 'Key': self.key, - 'VersionId': 'versionid', - }, - ) + expected_params = { + 'Bucket': self.bucket, + 'Key': self.key, + 'VersionId': 'versionid', + } + self.add_head_object_response(expected_params) + self.add_get_object_response(expected_params) with self.downloader: self.downloader.download_file( self.bucket, @@ -191,7 +214,7 @@ def test_download_file_extra_args(self): self.assert_contents(self.filename, self.remote_contents) def test_download_file_expected_size(self): - self.stubbed_client.add_response('get_object', {'Body': self.stream}) + self.add_get_object_response() with self.downloader: self.downloader.download_file( self.bucket, @@ -226,7 +249,7 @@ def test_validates_extra_args(self): ) def test_result_with_success(self): - self.stubbed_client.add_response('get_object', {'Body': self.stream}) + self.add_get_object_response() with self.downloader: future = self.downloader.download_file( self.bucket, @@ -249,7 +272,7 @@ def test_result_with_exception(self): future.result() def test_result_with_cancel(self): - self.stubbed_client.add_response('get_object', {'Body': self.stream}) + self.add_get_object_response() with self.downloader: future = self.downloader.download_file( self.bucket, @@ -279,3 +302,80 @@ def test_shutdown_with_no_downloads_and_ctrl_c(self): with self.assertRaises(KeyboardInterrupt): with self.downloader: raise KeyboardInterrupt() + + def test_download_range_clamped_to_size(self): + expected_params = { + 'Bucket': self.bucket, + 'Key': self.key, + } + self.add_head_object_response(expected_params) + self.add_get_object_response(expected_params) + + with self.downloader: + self.downloader.download_file( + self.bucket, + self.key, + self.filename, + range_start=-1, + range_end=len(self.remote_contents) + 10, + ) + + self.stubbed_client.assert_no_pending_responses() + self.assert_contents(self.filename, self.remote_contents) + + def test_download_range_start(self): + range_start = 5 + range_end = len(self.remote_contents) - 1 + self.add_head_object_response() + self.add_ranged_get_object_response(range_start, range_end) + with self.downloader: + self.downloader.download_file( + self.bucket, + self.key, + self.filename, + range_start=range_start, + range_end=range_end, + ) + + self.stubbed_client.assert_no_pending_responses() + self.assert_contents( + self.filename, self.remote_contents[range_start : range_end + 1] + ) + + def test_download_range_end(self): + range_start = 0 + range_end = 5 + self.add_head_object_response() + self.add_ranged_get_object_response(range_start, range_end) + with self.downloader: + self.downloader.download_file( + self.bucket, + self.key, + self.filename, + range_start=range_start, + range_end=range_end, + ) + + self.stubbed_client.assert_no_pending_responses() + self.assert_contents( + self.filename, self.remote_contents[range_start : range_end + 1] + ) + + def test_download_range_start_and_end(self): + range_start = 5 + range_end = 5 + self.add_head_object_response() + self.add_ranged_get_object_response(range_start, range_end) + with self.downloader: + self.downloader.download_file( + self.bucket, + self.key, + self.filename, + range_start=range_start, + range_end=range_end, + ) + + self.stubbed_client.assert_no_pending_responses() + self.assert_contents( + self.filename, self.remote_contents[range_start : range_end + 1] + ) diff --git a/tests/unit/test_download.py b/tests/unit/test_download.py index 1907437d..fdc649cd 100644 --- a/tests/unit/test_download.py +++ b/tests/unit/test_download.py @@ -397,6 +397,8 @@ def setUp(self): self.key = 'mykey' self.extra_args = {} self.subscribers = [] + self.range_start = 0 + self.range_end = None # Create a stream to read from self.content = b'my content' @@ -429,6 +431,8 @@ def get_call_args(self, **kwargs): 'key': self.key, 'extra_args': self.extra_args, 'subscribers': self.subscribers, + 'range_start': self.range_start, + 'range_end': self.range_end, } default_call_args.update(kwargs) return CallArgs(**default_call_args) diff --git a/tests/unit/test_processpool.py b/tests/unit/test_processpool.py index f1bfa010..ab2388a5 100644 --- a/tests/unit/test_processpool.py +++ b/tests/unit/test_processpool.py @@ -337,6 +337,8 @@ def add_download_file_request(self, **override_kwargs): 'filename': self.filename, 'extra_args': self.extra_args, 'expected_size': self.expected_size, + 'range_start': 0, + 'range_end': None, } kwargs.update(override_kwargs) self.download_request_queue.put(DownloadFileRequest(**kwargs)) @@ -393,7 +395,7 @@ def test_run_for_ranged_download(self): key=self.key, temp_filename=self.temp_filename, offset=2, - extra_args={'Range': 'bytes=2-'}, + extra_args={'Range': 'bytes=2-3'}, filename=self.filename, ), ]