diff --git a/google/cloud/storage/asyncio/async_appendable_object_writer.py b/google/cloud/storage/asyncio/async_appendable_object_writer.py index 56b3a0ec5..c65209680 100644 --- a/google/cloud/storage/asyncio/async_appendable_object_writer.py +++ b/google/cloud/storage/asyncio/async_appendable_object_writer.py @@ -43,7 +43,6 @@ _extract_bidi_writes_redirect_proto, ) - _MAX_CHUNK_SIZE_BYTES = 2 * 1024 * 1024 # 2 MiB _DEFAULT_FLUSH_INTERVAL_BYTES = 16 * 1024 * 1024 # 16 MiB _BIDI_WRITE_REDIRECTED_TYPE_URL = ( @@ -289,8 +288,7 @@ async def _do_open(): await self.write_obj_stream.close() except Exception as e: logger.warning( - "Error closing previous write stream during open retry. Got exception: ", - {e}, + f"Error closing previous write stream during open retry. Got exception: {e}" ) self.write_obj_stream = None self._is_stream_open = False @@ -383,8 +381,6 @@ async def generator(): logger.info( f"Re-opening the stream with attempt_count: {attempt_count}" ) - if self.write_obj_stream and self.write_obj_stream.is_stream_open: - await self.write_obj_stream.close() current_metadata = list(metadata) if metadata else [] if write_state.routing_token: diff --git a/google/cloud/storage/asyncio/async_grpc_client.py b/google/cloud/storage/asyncio/async_grpc_client.py index 640e7fe38..88566b246 100644 --- a/google/cloud/storage/asyncio/async_grpc_client.py +++ b/google/cloud/storage/asyncio/async_grpc_client.py @@ -19,6 +19,8 @@ DEFAULT_CLIENT_INFO, ) from google.cloud.storage import __version__ +import grpc +from google.auth import credentials as auth_credentials class AsyncGrpcClient: @@ -52,6 +54,12 @@ def __init__( *, attempt_direct_path=True, ): + if isinstance(credentials, auth_credentials.AnonymousCredentials): + self._grpc_client = self._create_anonymous_client( + client_options, credentials + ) + return + if client_info is None: client_info = DEFAULT_CLIENT_INFO client_info.client_library_version = __version__ @@ -68,6 +76,21 @@ def __init__( attempt_direct_path=attempt_direct_path, ) + def _create_anonymous_client(self, client_options, credentials): + channel = grpc.aio.insecure_channel(client_options.api_endpoint) + transport = storage_v2.services.storage.transports.StorageGrpcAsyncIOTransport( + channel=channel, credentials=credentials + ) + return storage_v2.StorageAsyncClient(transport=transport) + + @classmethod + def _create_insecure_grpc_client(cls, client_options): + return cls( + credentials=auth_credentials.AnonymousCredentials(), + client_options=client_options, + attempt_direct_path=False, + ) + def _create_async_grpc_client( self, credentials=None, diff --git a/google/cloud/storage/asyncio/async_multi_range_downloader.py b/google/cloud/storage/asyncio/async_multi_range_downloader.py index d4ba6727b..51afd255b 100644 --- a/google/cloud/storage/asyncio/async_multi_range_downloader.py +++ b/google/cloud/storage/asyncio/async_multi_range_downloader.py @@ -41,7 +41,6 @@ from google.cloud import _storage_v2 from google.cloud.storage._helpers import generate_random_56_bit_integer - _MAX_READ_RANGES_PER_BIDI_READ_REQUEST = 100 _BIDI_READ_REDIRECTED_TYPE_URL = ( "type.googleapis.com/google.storage.v2.BidiReadObjectRedirectedError" @@ -230,7 +229,6 @@ def __init__( self.persisted_size: Optional[int] = None # updated after opening the stream self._open_retries: int = 0 - async def __aenter__(self): """Opens the underlying bidi-gRPC connection to read from the object.""" await self.open() @@ -243,6 +241,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): def _on_open_error(self, exc): """Extracts routing token and read handle on redirect error during open.""" + logger.warning(f"Error occurred while opening MRD: {exc}") routing_token, read_handle = _handle_redirect(exc) if routing_token: self._routing_token = routing_token @@ -432,7 +431,7 @@ async def generator(): if attempt_count > 1: logger.info( - f"Resuming download (attempt {attempt_count - 1}) for {len(requests)} ranges." + f"Resuming download (attempt {attempt_count}) for {len(requests)} ranges." ) async with lock: @@ -453,11 +452,7 @@ async def generator(): logger.info( f"Re-opening stream with routing token: {current_token}" ) - # Close existing stream if any - if self.read_obj_str and self.read_obj_str.is_stream_open: - await self.read_obj_str.close() - # Re-initialize stream self.read_obj_str = _AsyncReadObjectStream( client=self.client.grpc_client, bucket_name=self.bucket_name, diff --git a/google/cloud/storage/asyncio/retry/bidi_stream_retry_manager.py b/google/cloud/storage/asyncio/retry/bidi_stream_retry_manager.py index 23bffb63d..947ee74c1 100644 --- a/google/cloud/storage/asyncio/retry/bidi_stream_retry_manager.py +++ b/google/cloud/storage/asyncio/retry/bidi_stream_retry_manager.py @@ -58,7 +58,7 @@ async def attempt(): return except Exception as e: if retry_policy._predicate(e): - logger.info( + logger.warning( f"Bidi stream operation failed: {e}. Attempting state recovery and retry." ) await self._strategy.recover_state_on_failure(e, state) diff --git a/noxfile.py b/noxfile.py index d7ca4dd88..6bce85327 100644 --- a/noxfile.py +++ b/noxfile.py @@ -236,6 +236,7 @@ def conftest_retry(session): session.install( "pytest", "pytest-xdist", + "pytest-asyncio", "grpcio", "grpcio-status", "grpc-google-iam-v1", @@ -247,13 +248,15 @@ def conftest_retry(session): # Run #CPU processes in parallel if no test session arguments are passed in. if session.posargs: test_cmd = [ - "py.test", - "--quiet", + "pytest", + "-vv", + "-s", + # "--quiet", conformance_test_folder_path, *session.posargs, ] else: - test_cmd = ["py.test", "-n", "auto", "--quiet", conformance_test_folder_path] + test_cmd = ["pytest", "-vv", "-s", "-n", "auto", conformance_test_folder_path] # Run py.test against the conformance tests. session.run(*test_cmd, env={"DOCKER_API_VERSION": "1.39"}) diff --git a/tests/conformance/_utils.py b/tests/conformance/_utils.py new file mode 100644 index 000000000..496b1c9e0 --- /dev/null +++ b/tests/conformance/_utils.py @@ -0,0 +1,32 @@ +import time +import requests +import traceback + +def start_grpc_server(grpc_endpoint, http_endpoint): + """Starts the testbench gRPC server if it's not already running. + + this essentially makes - + + `curl -s --retry 5 --retry-max-time 40 "http://localhost:9000/start_grpc?port=8888"` + """ + start_time = time.time() + max_time = 40 + retries = 5 + port = grpc_endpoint.split(":")[-1] + url = f"{http_endpoint}/start_grpc?port={port}" + + for i in range(retries): + try: + response = requests.get(url, timeout=10) + if response.status_code == 200: + return + except requests.exceptions.RequestException: + print("Failed to create grpc server", traceback.format_exc()) + raise + + elapsed_time = time.time() - start_time + if elapsed_time >= max_time: + raise RuntimeError("Failed to start gRPC server within the time limit.") + + # backoff + time.sleep(1) diff --git a/tests/conformance/test_bidi_reads.py b/tests/conformance/test_bidi_reads.py index 4157182cb..935d9bffa 100644 --- a/tests/conformance/test_bidi_reads.py +++ b/tests/conformance/test_bidi_reads.py @@ -1,23 +1,71 @@ -import asyncio import io +import subprocess +import time +import traceback +import urllib import uuid import grpc import requests -from google.api_core import exceptions +from google.api_core import exceptions, client_options from google.auth import credentials as auth_credentials from google.cloud import _storage_v2 as storage_v2 -from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( +from google.cloud.storage.asyncio.async_multi_range_downloader import ( AsyncMultiRangeDownloader, ) +from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient +import pytest + +from tests.conformance._utils import start_grpc_server + # --- Configuration --- + + +TEST_BENCH_ENDPOINT = ( + "http://localhost:9001" # 9000 in VM is taken by test_conformance.py +) +_PORT = urllib.parse.urlsplit(TEST_BENCH_ENDPOINT).port +_GRPC_PORT = 8888 + PROJECT_NUMBER = "12345" # A dummy project number is fine for the testbench. -GRPC_ENDPOINT = "localhost:8888" -HTTP_ENDPOINT = "http://localhost:9000" +GRPC_ENDPOINT = f"localhost:{_GRPC_PORT}" CONTENT_LENGTH = 1024 * 10 # 10 KB +_DEFAULT_IMAGE_NAME = "gcr.io/cloud-devrel-public-resources/storage-testbench" +_DEFAULT_IMAGE_TAG = "latest" +_DOCKER_IMAGE = f"{_DEFAULT_IMAGE_NAME}:{_DEFAULT_IMAGE_TAG}" +_PULL_CMD = ["docker", "pull", _DOCKER_IMAGE] +_RUN_CMD = [ + "docker", + "run", + "--name", + "bidi_reads_container", + "--rm", + "-d", + "-p", + f"{_PORT}:9000", + "-p", + f"{_GRPC_PORT}:{_GRPC_PORT}", + _DOCKER_IMAGE, +] +_DOCKER_STOP_CMD = [ + "docker", + "stop", + "bidi_reads_container", +] + + +@pytest.fixture(scope="module") +def testbench(): + subprocess.run(_PULL_CMD) + proc = subprocess.Popen(_RUN_CMD) + time.sleep(10) + yield GRPC_ENDPOINT, TEST_BENCH_ENDPOINT + subprocess.run(_DOCKER_STOP_CMD) + proc.kill() + def _is_retriable(exc): """Predicate for identifying retriable errors.""" @@ -32,9 +80,7 @@ def _is_retriable(exc): ) -async def run_test_scenario( - gapic_client, http_client, bucket_name, object_name, scenario -): +async def run_test_scenario(http_client, bucket_name, object_name, scenario): """Runs a single fault-injection test scenario.""" print(f"\n--- RUNNING SCENARIO: {scenario['name']} ---") @@ -45,13 +91,18 @@ async def run_test_scenario( "instructions": {scenario["method"]: [scenario["instruction"]]}, "transport": "GRPC", } - resp = http_client.post(f"{HTTP_ENDPOINT}/retry_test", json=retry_test_config) + resp = http_client.post( + f"{TEST_BENCH_ENDPOINT}/retry_test", json=retry_test_config + ) resp.raise_for_status() retry_test_id = resp.json()["id"] # 2. Set up downloader and metadata for fault injection. + grpc_client = AsyncGrpcClient._create_insecure_grpc_client( + client_options=client_options.ClientOptions(api_endpoint=GRPC_ENDPOINT), + ) downloader = await AsyncMultiRangeDownloader.create_mrd( - gapic_client, bucket_name, object_name + grpc_client, bucket_name, object_name ) fault_injection_metadata = (("x-retry-test-id", retry_test_id),) @@ -79,11 +130,17 @@ async def run_test_scenario( finally: # 4. Clean up the Retry Test resource. if retry_test_id: - http_client.delete(f"{HTTP_ENDPOINT}/retry_test/{retry_test_id}") + http_client.delete(f"{TEST_BENCH_ENDPOINT}/retry_test/{retry_test_id}") -async def main(): +@pytest.mark.asyncio +async def test_bidi_reads(testbench): """Main function to set up resources and run all test scenarios.""" + grpc_endpoint, test_bench_endpoint = testbench + print("starting grpc server", grpc_endpoint, test_bench_endpoint) + start_grpc_server( + grpc_endpoint, test_bench_endpoint + ) # Ensure the testbench gRPC server is running before this test executes. channel = grpc.aio.insecure_channel(GRPC_ENDPOINT) creds = auth_credentials.AnonymousCredentials() transport = storage_v2.services.storage.transports.StorageGrpcAsyncIOTransport( @@ -97,42 +154,12 @@ async def main(): # Define all test scenarios test_scenarios = [ - { - "name": "Retry on Service Unavailable (503)", - "method": "storage.objects.get", - "instruction": "return-503", - "expected_error": None, - }, - { - "name": "Retry on 500", - "method": "storage.objects.get", - "instruction": "return-500", - "expected_error": None, - }, - { - "name": "Retry on 504", - "method": "storage.objects.get", - "instruction": "return-504", - "expected_error": None, - }, - { - "name": "Retry on 429", - "method": "storage.objects.get", - "instruction": "return-429", - "expected_error": None, - }, { "name": "Smarter Resumption: Retry 503 after partial data", "method": "storage.objects.get", "instruction": "return-broken-stream-after-2K", "expected_error": None, }, - { - "name": "Retry on BidiReadObjectRedirectedError", - "method": "storage.objects.get", - "instruction": "redirect-send-handle-and-token-tokenval", # Testbench instruction for redirect - "expected_error": None, - }, ] try: @@ -161,9 +188,7 @@ async def write_req_gen(): # Run all defined test scenarios. for scenario in test_scenarios: - await run_test_scenario( - gapic_client, http_client, bucket_name, object_name, scenario - ) + await run_test_scenario(http_client, bucket_name, object_name, scenario) # Define and run test scenarios specifically for the open() method open_test_scenarios = [ @@ -185,16 +210,33 @@ async def write_req_gen(): "instruction": "return-401", "expected_error": exceptions.Unauthorized, }, + { + "name": "Retry on 500", + "method": "storage.objects.get", + "instruction": "return-500", + "expected_error": None, + }, + { + "name": "Retry on 504", + "method": "storage.objects.get", + "instruction": "return-504", + "expected_error": None, + }, + { + "name": "Retry on 429", + "method": "storage.objects.get", + "instruction": "return-429", + "expected_error": None, + }, ] for scenario in open_test_scenarios: await run_open_test_scenario( - gapic_client, http_client, bucket_name, object_name, scenario + http_client, bucket_name, object_name, scenario ) - except Exception: - import traceback - - traceback.print_exc() + except Exception as e: + print(f"Test failed with error: {e}. Traceback: {traceback.format_exc()}") + raise e finally: # Clean up the test bucket. try: @@ -211,11 +253,9 @@ async def write_req_gen(): print(f"Warning: Cleanup failed: {e}") -async def run_open_test_scenario( - gapic_client, http_client, bucket_name, object_name, scenario -): +async def run_open_test_scenario(http_client, bucket_name, object_name, scenario): """Runs a fault-injection test scenario specifically for the open() method.""" - print(f"\n--- RUNNING SCENARIO: {scenario['name']} ---") + print(f"\n--- RUNNING OPEN SCENARIO: {scenario['name']} ---") retry_test_id = None try: @@ -224,18 +264,22 @@ async def run_open_test_scenario( "instructions": {scenario["method"]: [scenario["instruction"]]}, "transport": "GRPC", } - resp = http_client.post(f"{HTTP_ENDPOINT}/retry_test", json=retry_test_config) + resp = http_client.post( + f"{TEST_BENCH_ENDPOINT}/retry_test", json=retry_test_config + ) resp.raise_for_status() retry_test_id = resp.json()["id"] - print(f"Retry Test created with ID: {retry_test_id}") # 2. Set up metadata for fault injection. fault_injection_metadata = (("x-retry-test-id", retry_test_id),) # 3. Execute the open (via create_mrd) and assert the outcome. try: + grpc_client = AsyncGrpcClient._create_insecure_grpc_client( + client_options=client_options.ClientOptions(api_endpoint=GRPC_ENDPOINT), + ) downloader = await AsyncMultiRangeDownloader.create_mrd( - gapic_client, + grpc_client, bucket_name, object_name, metadata=fault_injection_metadata, @@ -259,8 +303,4 @@ async def run_open_test_scenario( finally: # 4. Clean up the Retry Test resource. if retry_test_id: - http_client.delete(f"{HTTP_ENDPOINT}/retry_test/{retry_test_id}") - - -if __name__ == "__main__": - asyncio.run(main()) + http_client.delete(f"{TEST_BENCH_ENDPOINT}/retry_test/{retry_test_id}") diff --git a/tests/conformance/test_bidi_writes.py b/tests/conformance/test_bidi_writes.py index 90dfaf5f8..aaf5e741b 100644 --- a/tests/conformance/test_bidi_writes.py +++ b/tests/conformance/test_bidi_writes.py @@ -1,22 +1,68 @@ -import asyncio +import subprocess +import time +import urllib import uuid import grpc +import pytest import requests -from google.api_core import exceptions +from google.api_core import exceptions, client_options from google.auth import credentials as auth_credentials +from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient from google.cloud import _storage_v2 as storage_v2 from google.api_core.retry_async import AsyncRetry -from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( +from google.cloud.storage.asyncio.async_appendable_object_writer import ( AsyncAppendableObjectWriter, ) +from tests.conformance._utils import start_grpc_server # --- Configuration --- +TEST_BENCH_ENDPOINT = ( + "http://localhost:9002" # 9000 in VM is taken by test_conformance.py, 9001 by reads +) +_PORT = urllib.parse.urlsplit(TEST_BENCH_ENDPOINT).port +_GRPC_PORT = 8888 + PROJECT_NUMBER = "12345" # A dummy project number is fine for the testbench. -GRPC_ENDPOINT = "localhost:8888" -HTTP_ENDPOINT = "http://localhost:9000" -CONTENT = b"A" * 1024 * 10 # 10 KB +GRPC_ENDPOINT = f"localhost:{_GRPC_PORT}" +HTTP_ENDPOINT = TEST_BENCH_ENDPOINT + +_DEFAULT_IMAGE_NAME = "gcr.io/cloud-devrel-public-resources/storage-testbench" +_DEFAULT_IMAGE_TAG = "latest" +_DOCKER_IMAGE = f"{_DEFAULT_IMAGE_NAME}:{_DEFAULT_IMAGE_TAG}" +_PULL_CMD = ["docker", "pull", _DOCKER_IMAGE] +_RUN_CMD = [ + "docker", + "run", + "--name", + "bidi_writes_container", + "--rm", + "-d", + "-p", + f"{_PORT}:9000", + "-p", + f"{_GRPC_PORT}:8888", + _DOCKER_IMAGE, +] +_DOCKER_STOP_CMD = [ + "docker", + "stop", + "bidi_writes_container", +] + + +@pytest.fixture(scope="module") +def testbench(): + subprocess.run(_PULL_CMD) + proc = subprocess.Popen(_RUN_CMD) + time.sleep(10) + yield GRPC_ENDPOINT, HTTP_ENDPOINT + subprocess.run(_DOCKER_STOP_CMD) + proc.kill() + + +CONTENT = b"A" * 1024 * 1024 * 10 # 10 KB def _is_retryable(exc): @@ -70,10 +116,14 @@ def on_retry_error(exc): retry_test_id = resp.json()["id"] # 2. Set up writer and metadata for fault injection. + grpc_client = AsyncGrpcClient._create_insecure_grpc_client( + client_options=client_options.ClientOptions(api_endpoint=GRPC_ENDPOINT), + ) writer = AsyncAppendableObjectWriter( - gapic_client, + grpc_client, bucket_name, object_name, + writer_options={"FLUSH_INTERVAL_BYTES": 2 * 1024 * 1024}, ) fault_injection_metadata = (("x-retry-test-id", retry_test_id),) @@ -121,21 +171,20 @@ def on_retry_error(exc): ): raise - if not use_default: - assert ( - retry_count == 0 - ), f"Retry was incorrectly triggered for non-retriable error in {scenario['name']}!" - print(f"Success: caught expected exception for {scenario['name']}: {e}") - finally: # 5. Clean up the Retry Test resource. if retry_test_id: http_client.delete(f"{HTTP_ENDPOINT}/retry_test/{retry_test_id}") -async def main(): +@pytest.mark.asyncio +async def test_bidi_writes(testbench): """Main function to set up resources and run all test scenarios.""" - channel = grpc.aio.insecure_channel(GRPC_ENDPOINT) + grpc_endpoint, http_endpoint = testbench + start_grpc_server( + grpc_endpoint, http_endpoint + ) # Ensure the testbench gRPC server is running before this test executes. + channel = grpc.aio.insecure_channel(grpc_endpoint) creds = auth_credentials.AnonymousCredentials() transport = storage_v2.services.storage.transports.StorageGrpcAsyncIOTransport( channel=channel, @@ -173,10 +222,11 @@ async def main(): "instruction": "return-429", "expected_error": None, }, + # TODO: b/490280918 { "name": "Smarter Resumption: Retry 503 after partial data", "method": "storage.objects.insert", - "instruction": "return-503-after-2K", + "instruction": "return-503-after-3072K", # 3072 KiB == 3 MiB "expected_error": None, }, { @@ -185,40 +235,6 @@ async def main(): "instruction": "redirect-send-handle-and-token-tokenval", "expected_error": None, }, - { - "name": "Fail on 401", - "method": "storage.objects.insert", - "instruction": "return-401", - "expected_error": exceptions.Unauthorized, - }, - { - "name": "Default Policy: Retry on 503", - "method": "storage.objects.insert", - "instruction": "return-503", - "expected_error": None, - "use_default_policy": True, - }, - { - "name": "Default Policy: Retry on 503", - "method": "storage.objects.insert", - "instruction": "return-500", - "expected_error": None, - "use_default_policy": True, - }, - { - "name": "Default Policy: Retry on BidiWriteObjectRedirectedError", - "method": "storage.objects.insert", - "instruction": "redirect-send-handle-and-token-tokenval", - "expected_error": None, - "use_default_policy": True, - }, - { - "name": "Default Policy: Smarter Ressumption", - "method": "storage.objects.insert", - "instruction": "return-503-after-2K", - "expected_error": None, - "use_default_policy": True, - }, ] try: @@ -261,7 +277,3 @@ async def main(): await gapic_client.delete_bucket(request=delete_bucket_req) except Exception as e: print(f"Warning: Cleanup failed: {e}") - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/tests/unit/asyncio/test_async_appendable_object_writer.py b/tests/unit/asyncio/test_async_appendable_object_writer.py index 0c8fe4375..51ce43e6e 100644 --- a/tests/unit/asyncio/test_async_appendable_object_writer.py +++ b/tests/unit/asyncio/test_async_appendable_object_writer.py @@ -218,6 +218,48 @@ def test_on_open_error_redirection(self, mock_appendable_writer): assert writer.write_handle.handle == b"h1" assert writer.generation == 777 + @pytest.mark.asyncio + async def test_open_closes_existing_stream(self, mock_appendable_writer): + """Verify proper cleanup of existing stream on re-open.""" + writer = self._make_one(mock_appendable_writer["mock_client"]) + # We simulate a state where write_obj_stream exists but we are opening (e.g. retry or stale) + writer._is_stream_open = False + # Set an existing stream + old_stream = mock.AsyncMock() + old_stream.is_stream_open = True + writer.write_obj_stream = old_stream + + # Mock the creation of NEW stream to avoid overwriting our old_stream reference too early if we needed it, + # but here we just want to verify old_stream.close() is called. + + # Act + await writer.open() + + # Assert + old_stream.close.assert_awaited_once() + assert writer.write_obj_stream != old_stream + assert writer._is_stream_open + + @pytest.mark.asyncio + async def test_open_logs_warning_on_close_error(self, mock_appendable_writer): + """Verify logging when closing existing stream fails.""" + writer = self._make_one(mock_appendable_writer["mock_client"]) + old_stream = mock.AsyncMock() + old_stream.is_stream_open = True + old_stream.close.side_effect = ValueError("close failed") + writer.write_obj_stream = old_stream + writer._is_stream_open = False + + with mock.patch( + "google.cloud.storage.asyncio.async_appendable_object_writer.logger" + ) as mock_logger: + await writer.open() + + mock_logger.warning.assert_called_once() + args, _ = mock_logger.warning.call_args + assert "Error closing previous write stream" in args[0] + assert "close failed" in args[0] + # ------------------------------------------------------------------------- # Append Tests # ------------------------------------------------------------------------- @@ -246,9 +288,7 @@ async def test_append_data_less_than_flush_interval(self, mock_appendable_writer ], ) @pytest.mark.asyncio - async def test_append( - self, data_len, mock_appendable_writer - ): + async def test_append(self, data_len, mock_appendable_writer): """Verify append orchestrates manager and drives the internal generator.""" # Arrange writer = self._make_one(mock_appendable_writer["mock_client"]) @@ -272,10 +312,19 @@ async def test_append( # Assert expected_recv_count = data_len // _DEFAULT_FLUSH_INTERVAL_BYTES assert writer.offset == data_len - assert writer.bytes_appended_since_last_flush == data_len % _DEFAULT_FLUSH_INTERVAL_BYTES - assert writer.persisted_size == expected_recv_count*_DEFAULT_FLUSH_INTERVAL_BYTES - assert writer.write_obj_stream.send.await_count == -(-data_len // _MAX_CHUNK_SIZE_BYTES) # Ceiling division for number of chunks - assert writer.write_obj_stream.recv.await_count == expected_recv_count # Expect 1 recv per flush interval + assert ( + writer.bytes_appended_since_last_flush + == data_len % _DEFAULT_FLUSH_INTERVAL_BYTES + ) + assert ( + writer.persisted_size == expected_recv_count * _DEFAULT_FLUSH_INTERVAL_BYTES + ) + assert writer.write_obj_stream.send.await_count == -( + -data_len // _MAX_CHUNK_SIZE_BYTES + ) # Ceiling division for number of chunks + assert ( + writer.write_obj_stream.recv.await_count == expected_recv_count + ) # Expect 1 recv per flush interval @pytest.mark.asyncio async def test_append_recovery_reopens_stream(self, mock_appendable_writer): @@ -318,7 +367,7 @@ async def mock_execute(state, policy): MockManager.return_value.execute.side_effect = mock_execute await writer.append(b"0123456789") - mock_appendable_writer["mock_stream"].close.assert_awaited() + # mock_appendable_writer["mock_stream"].close.assert_awaited() # Removed because open() is mocked mock_writer_open.assert_awaited() assert writer.persisted_size == 5 diff --git a/tests/unit/asyncio/test_async_grpc_client.py b/tests/unit/asyncio/test_async_grpc_client.py index f193acb60..06cb232d5 100644 --- a/tests/unit/asyncio/test_async_grpc_client.py +++ b/tests/unit/asyncio/test_async_grpc_client.py @@ -19,6 +19,7 @@ from google.api_core import client_info as client_info_lib from google.cloud.storage.asyncio import async_grpc_client from google.cloud.storage import __version__ +from google.api_core import client_options def _make_credentials(spec=None): @@ -157,36 +158,31 @@ def test_grpc_client_property(self, mock_grpc_gapic_client): assert retrieved_client is mock_grpc_gapic_client.return_value @mock.patch("google.cloud._storage_v2.StorageAsyncClient") - def test_grpc_client_with_anon_creds(self, mock_grpc_gapic_client): + @mock.patch( + "google.cloud.storage.asyncio.async_grpc_client.grpc.aio.insecure_channel" + ) + def test_grpc_client_with_anon_creds( + self, mock_insecure_channel, mock_async_storage_client + ): # Arrange - mock_transport_cls = mock.MagicMock() - mock_grpc_gapic_client.get_transport_class.return_value = mock_transport_cls - channel_sentinel = mock.sentinel.channel - - mock_transport_cls.create_channel.return_value = channel_sentinel - mock_transport_cls.return_value = mock.sentinel.transport + mock_channel = mock.MagicMock() + mock_insecure_channel.return_value = mock_channel # Act - anonymous_creds = AnonymousCredentials() - client = async_grpc_client.AsyncGrpcClient(credentials=anonymous_creds) - retrieved_client = client.grpc_client + client = async_grpc_client.AsyncGrpcClient( + client_options=client_options.ClientOptions( + api_endpoint="my-grpc-endpoint" + ), + credentials=AnonymousCredentials(), + ) # Assert - assert retrieved_client is mock_grpc_gapic_client.return_value - - kwargs = mock_grpc_gapic_client.call_args.kwargs - client_info = kwargs["client_info"] - agent_version = f"gcloud-python/{__version__}" - assert agent_version in client_info.user_agent - primary_user_agent = client_info.to_user_agent() - expected_options = (("grpc.primary_user_agent", primary_user_agent),) + assert client.grpc_client is mock_async_storage_client.return_value + mock_insecure_channel.assert_called_once_with("my-grpc-endpoint") - mock_transport_cls.create_channel.assert_called_once_with( - attempt_direct_path=True, - credentials=anonymous_creds, - options=expected_options, - ) - mock_transport_cls.assert_called_once_with(channel=channel_sentinel) + kwargs = mock_async_storage_client.call_args.kwargs + transport = kwargs["transport"] + assert isinstance(transport._credentials, AnonymousCredentials) @mock.patch("google.cloud._storage_v2.StorageAsyncClient") def test_user_agent_with_custom_client_info(self, mock_async_storage_client): @@ -221,9 +217,7 @@ async def test_delete_object(self, mock_async_storage_client): mock_gapic_client = mock.AsyncMock() mock_async_storage_client.return_value = mock_gapic_client - client = async_grpc_client.AsyncGrpcClient( - credentials=_make_credentials(spec=AnonymousCredentials) - ) + client = async_grpc_client.AsyncGrpcClient(credentials=_make_credentials()) bucket_name = "bucket" object_name = "object" @@ -264,9 +258,7 @@ async def test_get_object(self, mock_async_storage_client): mock_gapic_client = mock.AsyncMock() mock_async_storage_client.return_value = mock_gapic_client - client = async_grpc_client.AsyncGrpcClient( - credentials=_make_credentials(spec=AnonymousCredentials) - ) + client = async_grpc_client.AsyncGrpcClient(credentials=_make_credentials()) bucket_name = "bucket" object_name = "object" @@ -293,9 +285,7 @@ async def test_get_object_with_all_parameters(self, mock_async_storage_client): mock_gapic_client = mock.AsyncMock() mock_async_storage_client.return_value = mock_gapic_client - client = async_grpc_client.AsyncGrpcClient( - credentials=_make_credentials(spec=AnonymousCredentials) - ) + client = async_grpc_client.AsyncGrpcClient(credentials=_make_credentials()) bucket_name = "bucket" object_name = "object" diff --git a/tests/unit/asyncio/test_async_multi_range_downloader.py b/tests/unit/asyncio/test_async_multi_range_downloader.py index 379e6410b..f813c8a39 100644 --- a/tests/unit/asyncio/test_async_multi_range_downloader.py +++ b/tests/unit/asyncio/test_async_multi_range_downloader.py @@ -27,7 +27,6 @@ from io import BytesIO from google.cloud.storage.exceptions import DataCorruption - _TEST_BUCKET_NAME = "test-bucket" _TEST_OBJECT_NAME = "test-object" _TEST_OBJECT_SIZE = 1024 * 1024 # 1 MiB @@ -93,6 +92,7 @@ async def test_create_mrd(self, mock_cls_async_read_object_stream): assert mrd.read_handle == _TEST_READ_HANDLE assert mrd.persisted_size == _TEST_OBJECT_SIZE assert mrd.is_stream_open + assert mrd._open_retries == 0 @mock.patch( "google.cloud.storage.asyncio.async_multi_range_downloader.generate_random_56_bit_integer" @@ -446,3 +446,133 @@ async def test_create_mrd_with_both_generation_and_generation_number(self): generation=_TEST_GENERATION_NUMBER, generation_number=_TEST_GENERATION_NUMBER, ) + + @mock.patch("google.cloud.storage.asyncio.async_multi_range_downloader.AsyncRetry") + @mock.patch( + "google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream" + ) + @pytest.mark.asyncio + async def test_open_retries_increment( + self, mock_cls_async_read_object_stream, mock_async_retry + ): + # Arrange + # Configure AsyncRetry mock to return a pass-through decorator so we can await the result + mock_policy = mock.MagicMock() + mock_policy.side_effect = lambda f: f + mock_async_retry.return_value = mock_policy + + mrd, _ = await self._make_mock_mrd(mock_cls_async_read_object_stream) + # _make_mock_mrd calls create_mrd -> open. + # We need to test logic where retry happens. + + # Create fresh MRD + mock_client = mock.MagicMock() + mock_client.grpc_client = mock.AsyncMock() + mrd = AsyncMultiRangeDownloader( + mock_client, _TEST_BUCKET_NAME, _TEST_OBJECT_NAME + ) + # Mock stream + mock_stream = mock_cls_async_read_object_stream.return_value + mock_stream.open = AsyncMock() + + # Action: We want to capture the on_error passed to AsyncRetry + await mrd.open() + + # Assert + # Check that AsyncRetry was initialized with a wrapper + call_args = mock_async_retry.call_args + assert call_args is not None + _, kwargs = call_args + on_error = kwargs.get("on_error") + assert on_error is not None + + # Simulate error to trigger increment + assert mrd._open_retries == 0 + on_error(ValueError("test")) + assert mrd._open_retries == 1 + + @mock.patch("google.cloud.storage.asyncio.async_multi_range_downloader.logger") + @pytest.mark.asyncio + async def test_on_open_error_logs_warning(self, mock_logger): + # Arrange + mock_client = mock.MagicMock() + mrd = AsyncMultiRangeDownloader( + mock_client, _TEST_BUCKET_NAME, _TEST_OBJECT_NAME + ) + exc = ValueError("test error") + + # Act + mrd._on_open_error(exc) + + # Assert + mock_logger.warning.assert_called_once_with( + f"Error occurred while opening MRD: {exc}" + ) + + @mock.patch("google.cloud.storage.asyncio.async_multi_range_downloader.logger") + @mock.patch( + "google.cloud.storage.asyncio.async_multi_range_downloader.generate_random_56_bit_integer" + ) + @mock.patch( + "google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream" + ) + @pytest.mark.asyncio + async def test_download_ranges_resumption_logging( + self, mock_cls_async_read_object_stream, mock_random_int, mock_logger + ): + # Arrange + mock_mrd, _ = await self._make_mock_mrd(mock_cls_async_read_object_stream) + + mock_mrd.read_obj_str.send = AsyncMock() + mock_mrd.read_obj_str.recv = AsyncMock() + + from google.api_core import exceptions as core_exceptions + + retryable_exc = core_exceptions.ServiceUnavailable("Retry me") + + # mock send to raise exception ONCE then succeed + mock_mrd.read_obj_str.send.side_effect = [ + retryable_exc, + None, # Success on second try + ] + + # mock recv for second try + mock_mrd.read_obj_str.recv.side_effect = [ + _storage_v2.BidiReadObjectResponse( + object_data_ranges=[ + _storage_v2.ObjectRangeData( + checksummed_data=_storage_v2.ChecksummedData( + content=b"data", crc32c=123 + ), + range_end=True, + read_range=_storage_v2.ReadRange( + read_offset=0, read_length=4, read_id=123 + ), + ) + ] + ), + None, + ] + + mock_random_int.return_value = 123 + + # Act + buffer = BytesIO() + # Patch Checksum where it is likely used (reads_resumption_strategy or similar), + # but actually if we use google_crc32c directly, we should patch that or provide valid CRC. + # Since we can't reliably predict where Checksum is imported/used without more digging, + # let's provide a valid CRC for b"data". + # Checksum(b"data").digest() -> needs to match crc32c=123. + # But we can't force b"data" to have crc=123. + # So we MUST patch Checksum. + # It is used in google.cloud.storage.asyncio.retry.reads_resumption_strategy + + with mock.patch( + "google.cloud.storage.asyncio.retry.reads_resumption_strategy.Checksum" + ) as mock_chk: + mock_chk.return_value.digest.return_value = (123).to_bytes(4, "big") + + await mock_mrd.download_ranges([(0, 4, buffer)]) + + # Assert + mock_logger.info.assert_any_call("Resuming download (attempt 2) for 1 ranges.")