From 24fc90653b9670ae68a414d917e3d2cd1fa0420f Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 20 Feb 2026 09:32:40 +0000 Subject: [PATCH 01/16] fix: pass token correctly, '&' instead of ',' --- google/cloud/storage/asyncio/async_appendable_object_writer.py | 2 +- google/cloud/storage/asyncio/async_read_object_stream.py | 2 +- google/cloud/storage/asyncio/async_write_object_stream.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/google/cloud/storage/asyncio/async_appendable_object_writer.py b/google/cloud/storage/asyncio/async_appendable_object_writer.py index a7544c203..56b3a0ec5 100644 --- a/google/cloud/storage/asyncio/async_appendable_object_writer.py +++ b/google/cloud/storage/asyncio/async_appendable_object_writer.py @@ -310,7 +310,7 @@ async def _do_open(): ) await self.write_obj_stream.open( - metadata=current_metadata if metadata else None + metadata=current_metadata if current_metadata else None ) if self.write_obj_stream.generation_number: diff --git a/google/cloud/storage/asyncio/async_read_object_stream.py b/google/cloud/storage/asyncio/async_read_object_stream.py index d456f16cc..b6cf69901 100644 --- a/google/cloud/storage/asyncio/async_read_object_stream.py +++ b/google/cloud/storage/asyncio/async_read_object_stream.py @@ -115,7 +115,7 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None: other_metadata.append((key, value)) current_metadata = other_metadata - current_metadata.append(("x-goog-request-params", ",".join(request_params))) + current_metadata.append(("x-goog-request-params", "&".join(reversed(request_params)))) self.socket_like_rpc = AsyncBidiRpc( self.rpc, diff --git a/google/cloud/storage/asyncio/async_write_object_stream.py b/google/cloud/storage/asyncio/async_write_object_stream.py index de4be3820..4729cfd20 100644 --- a/google/cloud/storage/asyncio/async_write_object_stream.py +++ b/google/cloud/storage/asyncio/async_write_object_stream.py @@ -145,7 +145,7 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None: else: final_metadata.append((key, value)) - final_metadata.append(("x-goog-request-params", ",".join(request_param_values))) + final_metadata.append(("x-goog-request-params", "&".join(request_param_values))) self.socket_like_rpc = AsyncBidiRpc( self.rpc, From dece2f7eaa74db8c390baaa8994899698a6b7378 Mon Sep 17 00:00:00 2001 From: Chandra Shekhar Sirimala Date: Fri, 20 Feb 2026 15:14:17 +0530 Subject: [PATCH 02/16] remove reversed reversed not needed Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- google/cloud/storage/asyncio/async_read_object_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/storage/asyncio/async_read_object_stream.py b/google/cloud/storage/asyncio/async_read_object_stream.py index b6cf69901..bde6c1651 100644 --- a/google/cloud/storage/asyncio/async_read_object_stream.py +++ b/google/cloud/storage/asyncio/async_read_object_stream.py @@ -115,7 +115,7 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None: other_metadata.append((key, value)) current_metadata = other_metadata - current_metadata.append(("x-goog-request-params", "&".join(reversed(request_params)))) + current_metadata.append(("x-goog-request-params", "&".join(request_params))) self.socket_like_rpc = AsyncBidiRpc( self.rpc, From 47704d9f0f102f15fd8c99da79d8d41eca410eaa Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 20 Feb 2026 17:00:54 +0000 Subject: [PATCH 03/16] trigger bidi_writes and bidi_reads conformance tests --- .../storage/asyncio/async_grpc_client.py | 17 +++++++ tests/conformance/_utils.py | 30 +++++++++++ tests/conformance/test_bidi_reads.py | 46 +++++++++++------ tests/conformance/test_bidi_writes.py | 50 +++++++++++-------- 4 files changed, 105 insertions(+), 38 deletions(-) create mode 100644 tests/conformance/_utils.py diff --git a/google/cloud/storage/asyncio/async_grpc_client.py b/google/cloud/storage/asyncio/async_grpc_client.py index 640e7fe38..e4446729f 100644 --- a/google/cloud/storage/asyncio/async_grpc_client.py +++ b/google/cloud/storage/asyncio/async_grpc_client.py @@ -19,6 +19,8 @@ DEFAULT_CLIENT_INFO, ) from google.cloud.storage import __version__ +import grpc +from google.auth import credentials as auth_credentials class AsyncGrpcClient: @@ -51,7 +53,12 @@ def __init__( client_options=None, *, attempt_direct_path=True, + create_insecure_channel=False, # only for testing against testbench. ): + if create_insecure_channel: + self._grpc_client = self._create_insecure_grpc_client(client_options) + return + if client_info is None: client_info = DEFAULT_CLIENT_INFO client_info.client_library_version = __version__ @@ -68,6 +75,16 @@ def __init__( attempt_direct_path=attempt_direct_path, ) + def _create_anonymous_client(self, client_options): + channel = grpc.aio.insecure_channel(client_options.api_endpoint) + transport = storage_v2.services.storage.transports.StorageGrpcAsyncIOTransport( + channel=channel, credentials=auth_credentials.AnonymousCredentials() + ) + return storage_v2.StorageAsyncClient(transport=transport) + + def _create_insecure_grpc_client(self, client_options): + return self._create_anonymous_client(client_options) + def _create_async_grpc_client( self, credentials=None, diff --git a/tests/conformance/_utils.py b/tests/conformance/_utils.py new file mode 100644 index 000000000..2c4d9a89c --- /dev/null +++ b/tests/conformance/_utils.py @@ -0,0 +1,30 @@ +import time +import requests + +def start_grpc_server(grpc_endpoint, http_endpoint): + """Starts the testbench gRPC server if it's not already running. + + this essentially makes - + + `curl -s --retry 5 --retry-max-time 40 "http://localhost:9000/start_grpc?port=8888"` + """ + start_time = time.time() + max_time = 40 + retries = 5 + port = grpc_endpoint.split(":")[-1] + url = f"{http_endpoint}/start_grpc?port={port}" + + for i in range(retries): + try: + response = requests.get(url, timeout=10) + if response.status_code == 200: + return + except requests.exceptions.RequestException: + pass + + elapsed_time = time.time() - start_time + if elapsed_time >= max_time: + raise RuntimeError("Failed to start gRPC server within the time limit.") + + # backoff + time.sleep(1) diff --git a/tests/conformance/test_bidi_reads.py b/tests/conformance/test_bidi_reads.py index 4157182cb..6ea01e522 100644 --- a/tests/conformance/test_bidi_reads.py +++ b/tests/conformance/test_bidi_reads.py @@ -4,14 +4,19 @@ import grpc import requests -from google.api_core import exceptions +from google.api_core import exceptions, client_options from google.auth import credentials as auth_credentials from google.cloud import _storage_v2 as storage_v2 -from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( +from google.cloud.storage.asyncio.async_multi_range_downloader import ( AsyncMultiRangeDownloader, ) +from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient +import pytest + +from tests.conformance._utils import start_grpc_server + # --- Configuration --- PROJECT_NUMBER = "12345" # A dummy project number is fine for the testbench. GRPC_ENDPOINT = "localhost:8888" @@ -50,8 +55,12 @@ async def run_test_scenario( retry_test_id = resp.json()["id"] # 2. Set up downloader and metadata for fault injection. + grpc_client = AsyncGrpcClient( + create_insecure_channel=True, + client_options=client_options.ClientOptions(api_endpoint=GRPC_ENDPOINT), + ) downloader = await AsyncMultiRangeDownloader.create_mrd( - gapic_client, bucket_name, object_name + grpc_client, bucket_name, object_name ) fault_injection_metadata = (("x-retry-test-id", retry_test_id),) @@ -82,8 +91,12 @@ async def run_test_scenario( http_client.delete(f"{HTTP_ENDPOINT}/retry_test/{retry_test_id}") -async def main(): +@pytest.mark.asyncio +async def test_bidi_reads(): """Main function to set up resources and run all test scenarios.""" + start_grpc_server( + GRPC_ENDPOINT, HTTP_ENDPOINT + ) # Ensure the testbench gRPC server is running before this test executes. channel = grpc.aio.insecure_channel(GRPC_ENDPOINT) creds = auth_credentials.AnonymousCredentials() transport = storage_v2.services.storage.transports.StorageGrpcAsyncIOTransport( @@ -121,12 +134,12 @@ async def main(): "instruction": "return-429", "expected_error": None, }, - { - "name": "Smarter Resumption: Retry 503 after partial data", - "method": "storage.objects.get", - "instruction": "return-broken-stream-after-2K", - "expected_error": None, - }, + # { + # "name": "Smarter Resumption: Retry 503 after partial data", + # "method": "storage.objects.get", + # "instruction": "return-broken-stream-after-2K", + # "expected_error": None, + # }, { "name": "Retry on BidiReadObjectRedirectedError", "method": "storage.objects.get", @@ -227,15 +240,20 @@ async def run_open_test_scenario( resp = http_client.post(f"{HTTP_ENDPOINT}/retry_test", json=retry_test_config) resp.raise_for_status() retry_test_id = resp.json()["id"] - print(f"Retry Test created with ID: {retry_test_id}") # 2. Set up metadata for fault injection. fault_injection_metadata = (("x-retry-test-id", retry_test_id),) # 3. Execute the open (via create_mrd) and assert the outcome. try: + grpc_client = AsyncGrpcClient( + create_insecure_channel=True, + client_options=client_options.ClientOptions( + api_endpoint=GRPC_ENDPOINT + ), + ) downloader = await AsyncMultiRangeDownloader.create_mrd( - gapic_client, + grpc_client, bucket_name, object_name, metadata=fault_injection_metadata, @@ -260,7 +278,3 @@ async def run_open_test_scenario( # 4. Clean up the Retry Test resource. if retry_test_id: http_client.delete(f"{HTTP_ENDPOINT}/retry_test/{retry_test_id}") - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/tests/conformance/test_bidi_writes.py b/tests/conformance/test_bidi_writes.py index 90dfaf5f8..5dea0032e 100644 --- a/tests/conformance/test_bidi_writes.py +++ b/tests/conformance/test_bidi_writes.py @@ -1,16 +1,18 @@ -import asyncio import uuid import grpc +import pytest import requests -from google.api_core import exceptions +from google.api_core import exceptions, client_options from google.auth import credentials as auth_credentials +from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient from google.cloud import _storage_v2 as storage_v2 from google.api_core.retry_async import AsyncRetry -from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( +from google.cloud.storage.asyncio.async_appendable_object_writer import ( AsyncAppendableObjectWriter, ) +from tests.conformance._utils import start_grpc_server # --- Configuration --- PROJECT_NUMBER = "12345" # A dummy project number is fine for the testbench. @@ -70,8 +72,12 @@ def on_retry_error(exc): retry_test_id = resp.json()["id"] # 2. Set up writer and metadata for fault injection. + grpc_client = AsyncGrpcClient( + create_insecure_channel=True, + client_options=client_options.ClientOptions(api_endpoint=GRPC_ENDPOINT), + ) writer = AsyncAppendableObjectWriter( - gapic_client, + grpc_client, bucket_name, object_name, ) @@ -133,8 +139,12 @@ def on_retry_error(exc): http_client.delete(f"{HTTP_ENDPOINT}/retry_test/{retry_test_id}") -async def main(): +@pytest.mark.asyncio +async def test_bidi_writes(): """Main function to set up resources and run all test scenarios.""" + start_grpc_server( + GRPC_ENDPOINT, HTTP_ENDPOINT + ) # Ensure the testbench gRPC server is running before this test executes. channel = grpc.aio.insecure_channel(GRPC_ENDPOINT) creds = auth_credentials.AnonymousCredentials() transport = storage_v2.services.storage.transports.StorageGrpcAsyncIOTransport( @@ -173,12 +183,12 @@ async def main(): "instruction": "return-429", "expected_error": None, }, - { - "name": "Smarter Resumption: Retry 503 after partial data", - "method": "storage.objects.insert", - "instruction": "return-503-after-2K", - "expected_error": None, - }, + # { + # "name": "Smarter Resumption: Retry 503 after partial data", + # "method": "storage.objects.insert", + # "instruction": "return-503-after-2K", + # "expected_error": None, + # }, { "name": "Retry on BidiWriteObjectRedirectedError", "method": "storage.objects.insert", @@ -212,13 +222,13 @@ async def main(): "expected_error": None, "use_default_policy": True, }, - { - "name": "Default Policy: Smarter Ressumption", - "method": "storage.objects.insert", - "instruction": "return-503-after-2K", - "expected_error": None, - "use_default_policy": True, - }, + # { + # "name": "Default Policy: Smarter Ressumption", + # "method": "storage.objects.insert", + # "instruction": "return-503-after-2K", + # "expected_error": None, + # "use_default_policy": True, + # }, ] try: @@ -261,7 +271,3 @@ async def main(): await gapic_client.delete_bucket(request=delete_bucket_req) except Exception as e: print(f"Warning: Cleanup failed: {e}") - - -if __name__ == "__main__": - asyncio.run(main()) From 7403b709fb72070b1ea4aeb6d494affde16a969c Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 20 Feb 2026 17:31:56 +0000 Subject: [PATCH 04/16] create anonymous client via private classmethod --- .../storage/asyncio/async_grpc_client.py | 20 ++++++++++++------- noxfile.py | 1 + tests/conformance/test_bidi_reads.py | 11 +++------- tests/conformance/test_bidi_writes.py | 3 +-- 4 files changed, 18 insertions(+), 17 deletions(-) diff --git a/google/cloud/storage/asyncio/async_grpc_client.py b/google/cloud/storage/asyncio/async_grpc_client.py index e4446729f..88566b246 100644 --- a/google/cloud/storage/asyncio/async_grpc_client.py +++ b/google/cloud/storage/asyncio/async_grpc_client.py @@ -53,10 +53,11 @@ def __init__( client_options=None, *, attempt_direct_path=True, - create_insecure_channel=False, # only for testing against testbench. ): - if create_insecure_channel: - self._grpc_client = self._create_insecure_grpc_client(client_options) + if isinstance(credentials, auth_credentials.AnonymousCredentials): + self._grpc_client = self._create_anonymous_client( + client_options, credentials + ) return if client_info is None: @@ -75,15 +76,20 @@ def __init__( attempt_direct_path=attempt_direct_path, ) - def _create_anonymous_client(self, client_options): + def _create_anonymous_client(self, client_options, credentials): channel = grpc.aio.insecure_channel(client_options.api_endpoint) transport = storage_v2.services.storage.transports.StorageGrpcAsyncIOTransport( - channel=channel, credentials=auth_credentials.AnonymousCredentials() + channel=channel, credentials=credentials ) return storage_v2.StorageAsyncClient(transport=transport) - def _create_insecure_grpc_client(self, client_options): - return self._create_anonymous_client(client_options) + @classmethod + def _create_insecure_grpc_client(cls, client_options): + return cls( + credentials=auth_credentials.AnonymousCredentials(), + client_options=client_options, + attempt_direct_path=False, + ) def _create_async_grpc_client( self, diff --git a/noxfile.py b/noxfile.py index d7ca4dd88..2aabad17e 100644 --- a/noxfile.py +++ b/noxfile.py @@ -236,6 +236,7 @@ def conftest_retry(session): session.install( "pytest", "pytest-xdist", + "pytest-asyncio", "grpcio", "grpcio-status", "grpc-google-iam-v1", diff --git a/tests/conformance/test_bidi_reads.py b/tests/conformance/test_bidi_reads.py index 6ea01e522..384de6e09 100644 --- a/tests/conformance/test_bidi_reads.py +++ b/tests/conformance/test_bidi_reads.py @@ -1,4 +1,3 @@ -import asyncio import io import uuid import grpc @@ -55,8 +54,7 @@ async def run_test_scenario( retry_test_id = resp.json()["id"] # 2. Set up downloader and metadata for fault injection. - grpc_client = AsyncGrpcClient( - create_insecure_channel=True, + grpc_client = AsyncGrpcClient._create_insecure_grpc_client( client_options=client_options.ClientOptions(api_endpoint=GRPC_ENDPOINT), ) downloader = await AsyncMultiRangeDownloader.create_mrd( @@ -246,11 +244,8 @@ async def run_open_test_scenario( # 3. Execute the open (via create_mrd) and assert the outcome. try: - grpc_client = AsyncGrpcClient( - create_insecure_channel=True, - client_options=client_options.ClientOptions( - api_endpoint=GRPC_ENDPOINT - ), + grpc_client = AsyncGrpcClient._create_insecure_grpc_client( + client_options=client_options.ClientOptions(api_endpoint=GRPC_ENDPOINT), ) downloader = await AsyncMultiRangeDownloader.create_mrd( grpc_client, diff --git a/tests/conformance/test_bidi_writes.py b/tests/conformance/test_bidi_writes.py index 5dea0032e..81f079f3e 100644 --- a/tests/conformance/test_bidi_writes.py +++ b/tests/conformance/test_bidi_writes.py @@ -72,8 +72,7 @@ def on_retry_error(exc): retry_test_id = resp.json()["id"] # 2. Set up writer and metadata for fault injection. - grpc_client = AsyncGrpcClient( - create_insecure_channel=True, + grpc_client = AsyncGrpcClient._create_insecure_grpc_client( client_options=client_options.ClientOptions(api_endpoint=GRPC_ENDPOINT), ) writer = AsyncAppendableObjectWriter( From 4dd60c0547ea5d7ac36f1a4acc3653e955d62c71 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 20 Feb 2026 18:40:22 +0000 Subject: [PATCH 05/16] fix and update unit tests --- tests/unit/asyncio/test_async_grpc_client.py | 56 ++++++++------------ 1 file changed, 23 insertions(+), 33 deletions(-) diff --git a/tests/unit/asyncio/test_async_grpc_client.py b/tests/unit/asyncio/test_async_grpc_client.py index f193acb60..06cb232d5 100644 --- a/tests/unit/asyncio/test_async_grpc_client.py +++ b/tests/unit/asyncio/test_async_grpc_client.py @@ -19,6 +19,7 @@ from google.api_core import client_info as client_info_lib from google.cloud.storage.asyncio import async_grpc_client from google.cloud.storage import __version__ +from google.api_core import client_options def _make_credentials(spec=None): @@ -157,36 +158,31 @@ def test_grpc_client_property(self, mock_grpc_gapic_client): assert retrieved_client is mock_grpc_gapic_client.return_value @mock.patch("google.cloud._storage_v2.StorageAsyncClient") - def test_grpc_client_with_anon_creds(self, mock_grpc_gapic_client): + @mock.patch( + "google.cloud.storage.asyncio.async_grpc_client.grpc.aio.insecure_channel" + ) + def test_grpc_client_with_anon_creds( + self, mock_insecure_channel, mock_async_storage_client + ): # Arrange - mock_transport_cls = mock.MagicMock() - mock_grpc_gapic_client.get_transport_class.return_value = mock_transport_cls - channel_sentinel = mock.sentinel.channel - - mock_transport_cls.create_channel.return_value = channel_sentinel - mock_transport_cls.return_value = mock.sentinel.transport + mock_channel = mock.MagicMock() + mock_insecure_channel.return_value = mock_channel # Act - anonymous_creds = AnonymousCredentials() - client = async_grpc_client.AsyncGrpcClient(credentials=anonymous_creds) - retrieved_client = client.grpc_client + client = async_grpc_client.AsyncGrpcClient( + client_options=client_options.ClientOptions( + api_endpoint="my-grpc-endpoint" + ), + credentials=AnonymousCredentials(), + ) # Assert - assert retrieved_client is mock_grpc_gapic_client.return_value - - kwargs = mock_grpc_gapic_client.call_args.kwargs - client_info = kwargs["client_info"] - agent_version = f"gcloud-python/{__version__}" - assert agent_version in client_info.user_agent - primary_user_agent = client_info.to_user_agent() - expected_options = (("grpc.primary_user_agent", primary_user_agent),) + assert client.grpc_client is mock_async_storage_client.return_value + mock_insecure_channel.assert_called_once_with("my-grpc-endpoint") - mock_transport_cls.create_channel.assert_called_once_with( - attempt_direct_path=True, - credentials=anonymous_creds, - options=expected_options, - ) - mock_transport_cls.assert_called_once_with(channel=channel_sentinel) + kwargs = mock_async_storage_client.call_args.kwargs + transport = kwargs["transport"] + assert isinstance(transport._credentials, AnonymousCredentials) @mock.patch("google.cloud._storage_v2.StorageAsyncClient") def test_user_agent_with_custom_client_info(self, mock_async_storage_client): @@ -221,9 +217,7 @@ async def test_delete_object(self, mock_async_storage_client): mock_gapic_client = mock.AsyncMock() mock_async_storage_client.return_value = mock_gapic_client - client = async_grpc_client.AsyncGrpcClient( - credentials=_make_credentials(spec=AnonymousCredentials) - ) + client = async_grpc_client.AsyncGrpcClient(credentials=_make_credentials()) bucket_name = "bucket" object_name = "object" @@ -264,9 +258,7 @@ async def test_get_object(self, mock_async_storage_client): mock_gapic_client = mock.AsyncMock() mock_async_storage_client.return_value = mock_gapic_client - client = async_grpc_client.AsyncGrpcClient( - credentials=_make_credentials(spec=AnonymousCredentials) - ) + client = async_grpc_client.AsyncGrpcClient(credentials=_make_credentials()) bucket_name = "bucket" object_name = "object" @@ -293,9 +285,7 @@ async def test_get_object_with_all_parameters(self, mock_async_storage_client): mock_gapic_client = mock.AsyncMock() mock_async_storage_client.return_value = mock_gapic_client - client = async_grpc_client.AsyncGrpcClient( - credentials=_make_credentials(spec=AnonymousCredentials) - ) + client = async_grpc_client.AsyncGrpcClient(credentials=_make_credentials()) bucket_name = "bucket" object_name = "object" From 8de2b33fa753d5b00c1a6ca8bfcbc4503cad1e45 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 6 Mar 2026 07:40:36 +0000 Subject: [PATCH 06/16] fix: smarter resumption conformance test --- .../asyncio/async_appendable_object_writer.py | 7 +- .../asyncio/async_multi_range_downloader.py | 7 +- .../retry/bidi_stream_retry_manager.py | 2 +- tests/conformance/test_bidi_reads.py | 72 ++++++++++--------- tests/conformance/test_bidi_writes.py | 50 ++----------- 5 files changed, 50 insertions(+), 88 deletions(-) diff --git a/google/cloud/storage/asyncio/async_appendable_object_writer.py b/google/cloud/storage/asyncio/async_appendable_object_writer.py index 56b3a0ec5..0f1660c92 100644 --- a/google/cloud/storage/asyncio/async_appendable_object_writer.py +++ b/google/cloud/storage/asyncio/async_appendable_object_writer.py @@ -43,7 +43,6 @@ _extract_bidi_writes_redirect_proto, ) - _MAX_CHUNK_SIZE_BYTES = 2 * 1024 * 1024 # 2 MiB _DEFAULT_FLUSH_INTERVAL_BYTES = 16 * 1024 * 1024 # 16 MiB _BIDI_WRITE_REDIRECTED_TYPE_URL = ( @@ -236,6 +235,7 @@ async def state_lookup(self) -> int: def _on_open_error(self, exc): """Extracts routing token and write handle on redirect error during open.""" + logger.warning(f"Error occurred during AAOW open. Exception: {exc}") redirect_proto = _extract_bidi_writes_redirect_proto(exc) if redirect_proto: if redirect_proto.routing_token: @@ -289,8 +289,7 @@ async def _do_open(): await self.write_obj_stream.close() except Exception as e: logger.warning( - "Error closing previous write stream during open retry. Got exception: ", - {e}, + f"Error closing previous write stream during open retry. Got exception: {e}" ) self.write_obj_stream = None self._is_stream_open = False @@ -383,8 +382,6 @@ async def generator(): logger.info( f"Re-opening the stream with attempt_count: {attempt_count}" ) - if self.write_obj_stream and self.write_obj_stream.is_stream_open: - await self.write_obj_stream.close() current_metadata = list(metadata) if metadata else [] if write_state.routing_token: diff --git a/google/cloud/storage/asyncio/async_multi_range_downloader.py b/google/cloud/storage/asyncio/async_multi_range_downloader.py index 3ee773a04..5ba266e02 100644 --- a/google/cloud/storage/asyncio/async_multi_range_downloader.py +++ b/google/cloud/storage/asyncio/async_multi_range_downloader.py @@ -41,7 +41,6 @@ from google.cloud import _storage_v2 from google.cloud.storage._helpers import generate_random_56_bit_integer - _MAX_READ_RANGES_PER_BIDI_READ_REQUEST = 100 _BIDI_READ_REDIRECTED_TYPE_URL = ( "type.googleapis.com/google.storage.v2.BidiReadObjectRedirectedError" @@ -425,7 +424,7 @@ async def generator(): if attempt_count > 1: logger.info( - f"Resuming download (attempt {attempt_count - 1}) for {len(requests)} ranges." + f"Resuming download (attempt {attempt_count}) for {len(requests)} ranges." ) async with lock: @@ -446,11 +445,7 @@ async def generator(): logger.info( f"Re-opening stream with routing token: {current_token}" ) - # Close existing stream if any - if self.read_obj_str and self.read_obj_str.is_stream_open: - await self.read_obj_str.close() - # Re-initialize stream self.read_obj_str = _AsyncReadObjectStream( client=self.client.grpc_client, bucket_name=self.bucket_name, diff --git a/google/cloud/storage/asyncio/retry/bidi_stream_retry_manager.py b/google/cloud/storage/asyncio/retry/bidi_stream_retry_manager.py index 23bffb63d..947ee74c1 100644 --- a/google/cloud/storage/asyncio/retry/bidi_stream_retry_manager.py +++ b/google/cloud/storage/asyncio/retry/bidi_stream_retry_manager.py @@ -58,7 +58,7 @@ async def attempt(): return except Exception as e: if retry_policy._predicate(e): - logger.info( + logger.warning( f"Bidi stream operation failed: {e}. Attempting state recovery and retry." ) await self._strategy.recover_state_on_failure(e, state) diff --git a/tests/conformance/test_bidi_reads.py b/tests/conformance/test_bidi_reads.py index 384de6e09..a9133c534 100644 --- a/tests/conformance/test_bidi_reads.py +++ b/tests/conformance/test_bidi_reads.py @@ -15,6 +15,26 @@ import pytest from tests.conformance._utils import start_grpc_server +import logging +import sys + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(lineno)d - %(message)s", + force=True, +) +storage_logger = logging.getLogger("google.cloud.storage") +storage_logger.setLevel(logging.DEBUG) +storage_handler = logging.StreamHandler(sys.stderr) +# storage_handler.setLevel(logging.DEBUG) +storage_handler.setFormatter( + logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(lineno)d - %(message)s" + ) +) +storage_logger.addHandler(storage_handler) +# logging.getLogger("google.cloud.storage").setLevel(logging.INFO) # --- Configuration --- PROJECT_NUMBER = "12345" # A dummy project number is fine for the testbench. @@ -109,39 +129,9 @@ async def test_bidi_reads(): # Define all test scenarios test_scenarios = [ { - "name": "Retry on Service Unavailable (503)", - "method": "storage.objects.get", - "instruction": "return-503", - "expected_error": None, - }, - { - "name": "Retry on 500", - "method": "storage.objects.get", - "instruction": "return-500", - "expected_error": None, - }, - { - "name": "Retry on 504", - "method": "storage.objects.get", - "instruction": "return-504", - "expected_error": None, - }, - { - "name": "Retry on 429", + "name": "Smarter Resumption: Retry 503 after partial data", "method": "storage.objects.get", - "instruction": "return-429", - "expected_error": None, - }, - # { - # "name": "Smarter Resumption: Retry 503 after partial data", - # "method": "storage.objects.get", - # "instruction": "return-broken-stream-after-2K", - # "expected_error": None, - # }, - { - "name": "Retry on BidiReadObjectRedirectedError", - "method": "storage.objects.get", - "instruction": "redirect-send-handle-and-token-tokenval", # Testbench instruction for redirect + "instruction": "return-broken-stream-after-2K", "expected_error": None, }, ] @@ -196,6 +186,24 @@ async def write_req_gen(): "instruction": "return-401", "expected_error": exceptions.Unauthorized, }, + { + "name": "Retry on 500", + "method": "storage.objects.get", + "instruction": "return-500", + "expected_error": None, + }, + { + "name": "Retry on 504", + "method": "storage.objects.get", + "instruction": "return-504", + "expected_error": None, + }, + { + "name": "Retry on 429", + "method": "storage.objects.get", + "instruction": "return-429", + "expected_error": None, + }, ] for scenario in open_test_scenarios: await run_open_test_scenario( diff --git a/tests/conformance/test_bidi_writes.py b/tests/conformance/test_bidi_writes.py index 81f079f3e..b7a64bce0 100644 --- a/tests/conformance/test_bidi_writes.py +++ b/tests/conformance/test_bidi_writes.py @@ -18,7 +18,7 @@ PROJECT_NUMBER = "12345" # A dummy project number is fine for the testbench. GRPC_ENDPOINT = "localhost:8888" HTTP_ENDPOINT = "http://localhost:9000" -CONTENT = b"A" * 1024 * 10 # 10 KB +CONTENT = b"A" * 1024 * 1024 * 10 # 10 KB def _is_retryable(exc): @@ -79,6 +79,7 @@ def on_retry_error(exc): grpc_client, bucket_name, object_name, + writer_options={"FLUSH_INTERVAL_BYTES": 2 * 1024 * 1024}, ) fault_injection_metadata = (("x-retry-test-id", retry_test_id),) @@ -126,12 +127,6 @@ def on_retry_error(exc): ): raise - if not use_default: - assert ( - retry_count == 0 - ), f"Retry was incorrectly triggered for non-retriable error in {scenario['name']}!" - print(f"Success: caught expected exception for {scenario['name']}: {e}") - finally: # 5. Clean up the Retry Test resource. if retry_test_id: @@ -182,52 +177,19 @@ async def test_bidi_writes(): "instruction": "return-429", "expected_error": None, }, - # { - # "name": "Smarter Resumption: Retry 503 after partial data", - # "method": "storage.objects.insert", - # "instruction": "return-503-after-2K", - # "expected_error": None, - # }, + # TODO: b/490280918 { - "name": "Retry on BidiWriteObjectRedirectedError", + "name": "Smarter Resumption: Retry 503 after partial data", "method": "storage.objects.insert", - "instruction": "redirect-send-handle-and-token-tokenval", + "instruction": "return-503-after-3072K", # 3072 KiB == 3 MiB "expected_error": None, }, { - "name": "Fail on 401", - "method": "storage.objects.insert", - "instruction": "return-401", - "expected_error": exceptions.Unauthorized, - }, - { - "name": "Default Policy: Retry on 503", - "method": "storage.objects.insert", - "instruction": "return-503", - "expected_error": None, - "use_default_policy": True, - }, - { - "name": "Default Policy: Retry on 503", - "method": "storage.objects.insert", - "instruction": "return-500", - "expected_error": None, - "use_default_policy": True, - }, - { - "name": "Default Policy: Retry on BidiWriteObjectRedirectedError", + "name": "Retry on BidiWriteObjectRedirectedError", "method": "storage.objects.insert", "instruction": "redirect-send-handle-and-token-tokenval", "expected_error": None, - "use_default_policy": True, }, - # { - # "name": "Default Policy: Smarter Ressumption", - # "method": "storage.objects.insert", - # "instruction": "return-503-after-2K", - # "expected_error": None, - # "use_default_policy": True, - # }, ] try: From c73e95ab4556689a330c18602777e3ee9bfedc57 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 6 Mar 2026 07:57:34 +0000 Subject: [PATCH 07/16] scope changes --- .../asyncio/async_appendable_object_writer.py | 1 - .../retry/bidi_stream_retry_manager.py | 2 +- tests/conformance/test_bidi_reads.py | 20 ------------------- 3 files changed, 1 insertion(+), 22 deletions(-) diff --git a/google/cloud/storage/asyncio/async_appendable_object_writer.py b/google/cloud/storage/asyncio/async_appendable_object_writer.py index 0f1660c92..c65209680 100644 --- a/google/cloud/storage/asyncio/async_appendable_object_writer.py +++ b/google/cloud/storage/asyncio/async_appendable_object_writer.py @@ -235,7 +235,6 @@ async def state_lookup(self) -> int: def _on_open_error(self, exc): """Extracts routing token and write handle on redirect error during open.""" - logger.warning(f"Error occurred during AAOW open. Exception: {exc}") redirect_proto = _extract_bidi_writes_redirect_proto(exc) if redirect_proto: if redirect_proto.routing_token: diff --git a/google/cloud/storage/asyncio/retry/bidi_stream_retry_manager.py b/google/cloud/storage/asyncio/retry/bidi_stream_retry_manager.py index 947ee74c1..23bffb63d 100644 --- a/google/cloud/storage/asyncio/retry/bidi_stream_retry_manager.py +++ b/google/cloud/storage/asyncio/retry/bidi_stream_retry_manager.py @@ -58,7 +58,7 @@ async def attempt(): return except Exception as e: if retry_policy._predicate(e): - logger.warning( + logger.info( f"Bidi stream operation failed: {e}. Attempting state recovery and retry." ) await self._strategy.recover_state_on_failure(e, state) diff --git a/tests/conformance/test_bidi_reads.py b/tests/conformance/test_bidi_reads.py index a9133c534..93234cc62 100644 --- a/tests/conformance/test_bidi_reads.py +++ b/tests/conformance/test_bidi_reads.py @@ -15,26 +15,6 @@ import pytest from tests.conformance._utils import start_grpc_server -import logging -import sys - -# Configure logging -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(name)s - %(levelname)s - %(lineno)d - %(message)s", - force=True, -) -storage_logger = logging.getLogger("google.cloud.storage") -storage_logger.setLevel(logging.DEBUG) -storage_handler = logging.StreamHandler(sys.stderr) -# storage_handler.setLevel(logging.DEBUG) -storage_handler.setFormatter( - logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(lineno)d - %(message)s" - ) -) -storage_logger.addHandler(storage_handler) -# logging.getLogger("google.cloud.storage").setLevel(logging.INFO) # --- Configuration --- PROJECT_NUMBER = "12345" # A dummy project number is fine for the testbench. From af4997ff9b13b9650cb0537cfdfcf068b329cf04 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 6 Mar 2026 08:36:32 +0000 Subject: [PATCH 08/16] raise errors appropriately --- .../asyncio/async_multi_range_downloader.py | 2 +- .../retry/bidi_stream_retry_manager.py | 2 +- tests/conformance/test_bidi_reads.py | 24 +++++++------------ 3 files changed, 11 insertions(+), 17 deletions(-) diff --git a/google/cloud/storage/asyncio/async_multi_range_downloader.py b/google/cloud/storage/asyncio/async_multi_range_downloader.py index 902765607..51afd255b 100644 --- a/google/cloud/storage/asyncio/async_multi_range_downloader.py +++ b/google/cloud/storage/asyncio/async_multi_range_downloader.py @@ -229,7 +229,6 @@ def __init__( self.persisted_size: Optional[int] = None # updated after opening the stream self._open_retries: int = 0 - async def __aenter__(self): """Opens the underlying bidi-gRPC connection to read from the object.""" await self.open() @@ -242,6 +241,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): def _on_open_error(self, exc): """Extracts routing token and read handle on redirect error during open.""" + logger.warning(f"Error occurred while opening MRD: {exc}") routing_token, read_handle = _handle_redirect(exc) if routing_token: self._routing_token = routing_token diff --git a/google/cloud/storage/asyncio/retry/bidi_stream_retry_manager.py b/google/cloud/storage/asyncio/retry/bidi_stream_retry_manager.py index 23bffb63d..947ee74c1 100644 --- a/google/cloud/storage/asyncio/retry/bidi_stream_retry_manager.py +++ b/google/cloud/storage/asyncio/retry/bidi_stream_retry_manager.py @@ -58,7 +58,7 @@ async def attempt(): return except Exception as e: if retry_policy._predicate(e): - logger.info( + logger.warning( f"Bidi stream operation failed: {e}. Attempting state recovery and retry." ) await self._strategy.recover_state_on_failure(e, state) diff --git a/tests/conformance/test_bidi_reads.py b/tests/conformance/test_bidi_reads.py index 93234cc62..190229ffd 100644 --- a/tests/conformance/test_bidi_reads.py +++ b/tests/conformance/test_bidi_reads.py @@ -1,4 +1,5 @@ import io +import traceback import uuid import grpc import requests @@ -36,9 +37,7 @@ def _is_retriable(exc): ) -async def run_test_scenario( - gapic_client, http_client, bucket_name, object_name, scenario -): +async def run_test_scenario(http_client, bucket_name, object_name, scenario): """Runs a single fault-injection test scenario.""" print(f"\n--- RUNNING SCENARIO: {scenario['name']} ---") @@ -142,9 +141,7 @@ async def write_req_gen(): # Run all defined test scenarios. for scenario in test_scenarios: - await run_test_scenario( - gapic_client, http_client, bucket_name, object_name, scenario - ) + await run_test_scenario(http_client, bucket_name, object_name, scenario) # Define and run test scenarios specifically for the open() method open_test_scenarios = [ @@ -187,13 +184,12 @@ async def write_req_gen(): ] for scenario in open_test_scenarios: await run_open_test_scenario( - gapic_client, http_client, bucket_name, object_name, scenario + http_client, bucket_name, object_name, scenario ) - except Exception: - import traceback - - traceback.print_exc() + except Exception as e: + print(f"Test failed with error: {e}. Traceback: {traceback.format_exc()}") + raise e finally: # Clean up the test bucket. try: @@ -210,11 +206,9 @@ async def write_req_gen(): print(f"Warning: Cleanup failed: {e}") -async def run_open_test_scenario( - gapic_client, http_client, bucket_name, object_name, scenario -): +async def run_open_test_scenario(http_client, bucket_name, object_name, scenario): """Runs a fault-injection test scenario specifically for the open() method.""" - print(f"\n--- RUNNING SCENARIO: {scenario['name']} ---") + print(f"\n--- RUNNING OPEN SCENARIO: {scenario['name']} ---") retry_test_id = None try: From fdea4fbbb130aed0c83e1fb139223422a286dcfe Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 6 Mar 2026 09:05:06 +0000 Subject: [PATCH 09/16] add unit test for mrd --- .../test_async_multi_range_downloader.py | 133 +++++++++++++++++- 1 file changed, 132 insertions(+), 1 deletion(-) diff --git a/tests/unit/asyncio/test_async_multi_range_downloader.py b/tests/unit/asyncio/test_async_multi_range_downloader.py index 379e6410b..11afc055c 100644 --- a/tests/unit/asyncio/test_async_multi_range_downloader.py +++ b/tests/unit/asyncio/test_async_multi_range_downloader.py @@ -22,12 +22,12 @@ from google.cloud.storage.asyncio.async_multi_range_downloader import ( AsyncMultiRangeDownloader, + _MAX_READ_RANGES_PER_BIDI_READ_REQUEST, ) from google.cloud.storage.asyncio import async_read_object_stream from io import BytesIO from google.cloud.storage.exceptions import DataCorruption - _TEST_BUCKET_NAME = "test-bucket" _TEST_OBJECT_NAME = "test-object" _TEST_OBJECT_SIZE = 1024 * 1024 # 1 MiB @@ -93,6 +93,7 @@ async def test_create_mrd(self, mock_cls_async_read_object_stream): assert mrd.read_handle == _TEST_READ_HANDLE assert mrd.persisted_size == _TEST_OBJECT_SIZE assert mrd.is_stream_open + assert mrd._open_retries == 0 @mock.patch( "google.cloud.storage.asyncio.async_multi_range_downloader.generate_random_56_bit_integer" @@ -446,3 +447,133 @@ async def test_create_mrd_with_both_generation_and_generation_number(self): generation=_TEST_GENERATION_NUMBER, generation_number=_TEST_GENERATION_NUMBER, ) + + @mock.patch("google.cloud.storage.asyncio.async_multi_range_downloader.AsyncRetry") + @mock.patch( + "google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream" + ) + @pytest.mark.asyncio + async def test_open_retries_increment( + self, mock_cls_async_read_object_stream, mock_async_retry + ): + # Arrange + # Configure AsyncRetry mock to return a pass-through decorator so we can await the result + mock_policy = mock.MagicMock() + mock_policy.side_effect = lambda f: f + mock_async_retry.return_value = mock_policy + + mrd, _ = await self._make_mock_mrd(mock_cls_async_read_object_stream) + # _make_mock_mrd calls create_mrd -> open. + # We need to test logic where retry happens. + + # Create fresh MRD + mock_client = mock.MagicMock() + mock_client.grpc_client = mock.AsyncMock() + mrd = AsyncMultiRangeDownloader( + mock_client, _TEST_BUCKET_NAME, _TEST_OBJECT_NAME + ) + # Mock stream + mock_stream = mock_cls_async_read_object_stream.return_value + mock_stream.open = AsyncMock() + + # Action: We want to capture the on_error passed to AsyncRetry + await mrd.open() + + # Assert + # Check that AsyncRetry was initialized with a wrapper + call_args = mock_async_retry.call_args + assert call_args is not None + _, kwargs = call_args + on_error = kwargs.get("on_error") + assert on_error is not None + + # Simulate error to trigger increment + assert mrd._open_retries == 0 + on_error(ValueError("test")) + assert mrd._open_retries == 1 + + @mock.patch("google.cloud.storage.asyncio.async_multi_range_downloader.logger") + @pytest.mark.asyncio + async def test_on_open_error_logs_warning(self, mock_logger): + # Arrange + mock_client = mock.MagicMock() + mrd = AsyncMultiRangeDownloader( + mock_client, _TEST_BUCKET_NAME, _TEST_OBJECT_NAME + ) + exc = ValueError("test error") + + # Act + mrd._on_open_error(exc) + + # Assert + mock_logger.warning.assert_called_once_with( + f"Error occurred while opening MRD: {exc}" + ) + + @mock.patch("google.cloud.storage.asyncio.async_multi_range_downloader.logger") + @mock.patch( + "google.cloud.storage.asyncio.async_multi_range_downloader.generate_random_56_bit_integer" + ) + @mock.patch( + "google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream" + ) + @pytest.mark.asyncio + async def test_download_ranges_resumption_logging( + self, mock_cls_async_read_object_stream, mock_random_int, mock_logger + ): + # Arrange + mock_mrd, _ = await self._make_mock_mrd(mock_cls_async_read_object_stream) + + mock_mrd.read_obj_str.send = AsyncMock() + mock_mrd.read_obj_str.recv = AsyncMock() + + from google.api_core import exceptions as core_exceptions + + retryable_exc = core_exceptions.ServiceUnavailable("Retry me") + + # mock send to raise exception ONCE then succeed + mock_mrd.read_obj_str.send.side_effect = [ + retryable_exc, + None, # Success on second try + ] + + # mock recv for second try + mock_mrd.read_obj_str.recv.side_effect = [ + _storage_v2.BidiReadObjectResponse( + object_data_ranges=[ + _storage_v2.ObjectRangeData( + checksummed_data=_storage_v2.ChecksummedData( + content=b"data", crc32c=123 + ), + range_end=True, + read_range=_storage_v2.ReadRange( + read_offset=0, read_length=4, read_id=123 + ), + ) + ] + ), + None, + ] + + mock_random_int.return_value = 123 + + # Act + buffer = BytesIO() + # Patch Checksum where it is likely used (reads_resumption_strategy or similar), + # but actually if we use google_crc32c directly, we should patch that or provide valid CRC. + # Since we can't reliably predict where Checksum is imported/used without more digging, + # let's provide a valid CRC for b"data". + # Checksum(b"data").digest() -> needs to match crc32c=123. + # But we can't force b"data" to have crc=123. + # So we MUST patch Checksum. + # It is used in google.cloud.storage.asyncio.retry.reads_resumption_strategy + + with mock.patch( + "google.cloud.storage.asyncio.retry.reads_resumption_strategy.Checksum" + ) as mock_chk: + mock_chk.return_value.digest.return_value = (123).to_bytes(4, "big") + + await mock_mrd.download_ranges([(0, 4, buffer)]) + + # Assert + mock_logger.info.assert_any_call("Resuming download (attempt 2) for 1 ranges.") From 24970fa0e552577b5965ed9bd23278a127eb50ca Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 6 Mar 2026 09:13:06 +0000 Subject: [PATCH 10/16] add unit tests for AAOW --- .../test_async_appendable_object_writer.py | 65 ++++++++++++++++--- 1 file changed, 57 insertions(+), 8 deletions(-) diff --git a/tests/unit/asyncio/test_async_appendable_object_writer.py b/tests/unit/asyncio/test_async_appendable_object_writer.py index 0c8fe4375..51ce43e6e 100644 --- a/tests/unit/asyncio/test_async_appendable_object_writer.py +++ b/tests/unit/asyncio/test_async_appendable_object_writer.py @@ -218,6 +218,48 @@ def test_on_open_error_redirection(self, mock_appendable_writer): assert writer.write_handle.handle == b"h1" assert writer.generation == 777 + @pytest.mark.asyncio + async def test_open_closes_existing_stream(self, mock_appendable_writer): + """Verify proper cleanup of existing stream on re-open.""" + writer = self._make_one(mock_appendable_writer["mock_client"]) + # We simulate a state where write_obj_stream exists but we are opening (e.g. retry or stale) + writer._is_stream_open = False + # Set an existing stream + old_stream = mock.AsyncMock() + old_stream.is_stream_open = True + writer.write_obj_stream = old_stream + + # Mock the creation of NEW stream to avoid overwriting our old_stream reference too early if we needed it, + # but here we just want to verify old_stream.close() is called. + + # Act + await writer.open() + + # Assert + old_stream.close.assert_awaited_once() + assert writer.write_obj_stream != old_stream + assert writer._is_stream_open + + @pytest.mark.asyncio + async def test_open_logs_warning_on_close_error(self, mock_appendable_writer): + """Verify logging when closing existing stream fails.""" + writer = self._make_one(mock_appendable_writer["mock_client"]) + old_stream = mock.AsyncMock() + old_stream.is_stream_open = True + old_stream.close.side_effect = ValueError("close failed") + writer.write_obj_stream = old_stream + writer._is_stream_open = False + + with mock.patch( + "google.cloud.storage.asyncio.async_appendable_object_writer.logger" + ) as mock_logger: + await writer.open() + + mock_logger.warning.assert_called_once() + args, _ = mock_logger.warning.call_args + assert "Error closing previous write stream" in args[0] + assert "close failed" in args[0] + # ------------------------------------------------------------------------- # Append Tests # ------------------------------------------------------------------------- @@ -246,9 +288,7 @@ async def test_append_data_less_than_flush_interval(self, mock_appendable_writer ], ) @pytest.mark.asyncio - async def test_append( - self, data_len, mock_appendable_writer - ): + async def test_append(self, data_len, mock_appendable_writer): """Verify append orchestrates manager and drives the internal generator.""" # Arrange writer = self._make_one(mock_appendable_writer["mock_client"]) @@ -272,10 +312,19 @@ async def test_append( # Assert expected_recv_count = data_len // _DEFAULT_FLUSH_INTERVAL_BYTES assert writer.offset == data_len - assert writer.bytes_appended_since_last_flush == data_len % _DEFAULT_FLUSH_INTERVAL_BYTES - assert writer.persisted_size == expected_recv_count*_DEFAULT_FLUSH_INTERVAL_BYTES - assert writer.write_obj_stream.send.await_count == -(-data_len // _MAX_CHUNK_SIZE_BYTES) # Ceiling division for number of chunks - assert writer.write_obj_stream.recv.await_count == expected_recv_count # Expect 1 recv per flush interval + assert ( + writer.bytes_appended_since_last_flush + == data_len % _DEFAULT_FLUSH_INTERVAL_BYTES + ) + assert ( + writer.persisted_size == expected_recv_count * _DEFAULT_FLUSH_INTERVAL_BYTES + ) + assert writer.write_obj_stream.send.await_count == -( + -data_len // _MAX_CHUNK_SIZE_BYTES + ) # Ceiling division for number of chunks + assert ( + writer.write_obj_stream.recv.await_count == expected_recv_count + ) # Expect 1 recv per flush interval @pytest.mark.asyncio async def test_append_recovery_reopens_stream(self, mock_appendable_writer): @@ -318,7 +367,7 @@ async def mock_execute(state, policy): MockManager.return_value.execute.side_effect = mock_execute await writer.append(b"0123456789") - mock_appendable_writer["mock_stream"].close.assert_awaited() + # mock_appendable_writer["mock_stream"].close.assert_awaited() # Removed because open() is mocked mock_writer_open.assert_awaited() assert writer.persisted_size == 5 From 9cc6854032454ccfc5a9836b89a473241070622f Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 6 Mar 2026 12:13:09 +0000 Subject: [PATCH 11/16] remove unused import --- tests/unit/asyncio/test_async_multi_range_downloader.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/asyncio/test_async_multi_range_downloader.py b/tests/unit/asyncio/test_async_multi_range_downloader.py index 11afc055c..f813c8a39 100644 --- a/tests/unit/asyncio/test_async_multi_range_downloader.py +++ b/tests/unit/asyncio/test_async_multi_range_downloader.py @@ -22,7 +22,6 @@ from google.cloud.storage.asyncio.async_multi_range_downloader import ( AsyncMultiRangeDownloader, - _MAX_READ_RANGES_PER_BIDI_READ_REQUEST, ) from google.cloud.storage.asyncio import async_read_object_stream from io import BytesIO From e790593148926e5f3f947631109404b567877c25 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 6 Mar 2026 13:02:43 +0000 Subject: [PATCH 12/16] add versbose logs for debugging --- noxfile.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/noxfile.py b/noxfile.py index 2aabad17e..a3cfb81ba 100644 --- a/noxfile.py +++ b/noxfile.py @@ -248,13 +248,15 @@ def conftest_retry(session): # Run #CPU processes in parallel if no test session arguments are passed in. if session.posargs: test_cmd = [ - "py.test", - "--quiet", + "pytest", + "-vv", + "-s", + # "--quiet", conformance_test_folder_path, *session.posargs, ] else: - test_cmd = ["py.test", "-n", "auto", "--quiet", conformance_test_folder_path] + test_cmd = ["py.test", "-vv", "-s", "-n", "auto", conformance_test_folder_path] # Run py.test against the conformance tests. session.run(*test_cmd, env={"DOCKER_API_VERSION": "1.39"}) From 08462daec2b4f5d2caa2aad654b5971e98d66d65 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 6 Mar 2026 13:38:37 +0000 Subject: [PATCH 13/16] debug logs --- tests/conformance/_utils.py | 4 +++- tests/conformance/test_bidi_reads.py | 23 +++++++++++++++++------ tests/conformance/test_conformance.py | 6 ++++++ 3 files changed, 26 insertions(+), 7 deletions(-) diff --git a/tests/conformance/_utils.py b/tests/conformance/_utils.py index 2c4d9a89c..496b1c9e0 100644 --- a/tests/conformance/_utils.py +++ b/tests/conformance/_utils.py @@ -1,5 +1,6 @@ import time import requests +import traceback def start_grpc_server(grpc_endpoint, http_endpoint): """Starts the testbench gRPC server if it's not already running. @@ -20,7 +21,8 @@ def start_grpc_server(grpc_endpoint, http_endpoint): if response.status_code == 200: return except requests.exceptions.RequestException: - pass + print("Failed to create grpc server", traceback.format_exc()) + raise elapsed_time = time.time() - start_time if elapsed_time >= max_time: diff --git a/tests/conformance/test_bidi_reads.py b/tests/conformance/test_bidi_reads.py index 190229ffd..6d1f38de4 100644 --- a/tests/conformance/test_bidi_reads.py +++ b/tests/conformance/test_bidi_reads.py @@ -1,5 +1,7 @@ import io +import os import traceback +import urllib import uuid import grpc import requests @@ -18,9 +20,13 @@ from tests.conformance._utils import start_grpc_server # --- Configuration --- + +TEST_BENCH_ENDPOINT = os.environ.get("STORAGE_EMULATOR_HOST", "http://localhost:9000") +_PORT = urllib.parse.urlsplit(TEST_BENCH_ENDPOINT).port + PROJECT_NUMBER = "12345" # A dummy project number is fine for the testbench. GRPC_ENDPOINT = "localhost:8888" -HTTP_ENDPOINT = "http://localhost:9000" +TEST_BENCH_ENDPOINT = "http://localhost:9000" CONTENT_LENGTH = 1024 * 10 # 10 KB @@ -48,7 +54,9 @@ async def run_test_scenario(http_client, bucket_name, object_name, scenario): "instructions": {scenario["method"]: [scenario["instruction"]]}, "transport": "GRPC", } - resp = http_client.post(f"{HTTP_ENDPOINT}/retry_test", json=retry_test_config) + resp = http_client.post( + f"{TEST_BENCH_ENDPOINT}/retry_test", json=retry_test_config + ) resp.raise_for_status() retry_test_id = resp.json()["id"] @@ -85,14 +93,15 @@ async def run_test_scenario(http_client, bucket_name, object_name, scenario): finally: # 4. Clean up the Retry Test resource. if retry_test_id: - http_client.delete(f"{HTTP_ENDPOINT}/retry_test/{retry_test_id}") + http_client.delete(f"{TEST_BENCH_ENDPOINT}/retry_test/{retry_test_id}") @pytest.mark.asyncio async def test_bidi_reads(): """Main function to set up resources and run all test scenarios.""" + print("starting grpc server", GRPC_ENDPOINT, TEST_BENCH_ENDPOINT) start_grpc_server( - GRPC_ENDPOINT, HTTP_ENDPOINT + GRPC_ENDPOINT, TEST_BENCH_ENDPOINT ) # Ensure the testbench gRPC server is running before this test executes. channel = grpc.aio.insecure_channel(GRPC_ENDPOINT) creds = auth_credentials.AnonymousCredentials() @@ -217,7 +226,9 @@ async def run_open_test_scenario(http_client, bucket_name, object_name, scenario "instructions": {scenario["method"]: [scenario["instruction"]]}, "transport": "GRPC", } - resp = http_client.post(f"{HTTP_ENDPOINT}/retry_test", json=retry_test_config) + resp = http_client.post( + f"{TEST_BENCH_ENDPOINT}/retry_test", json=retry_test_config + ) resp.raise_for_status() retry_test_id = resp.json()["id"] @@ -254,4 +265,4 @@ async def run_open_test_scenario(http_client, bucket_name, object_name, scenario finally: # 4. Clean up the Retry Test resource. if retry_test_id: - http_client.delete(f"{HTTP_ENDPOINT}/retry_test/{retry_test_id}") + http_client.delete(f"{TEST_BENCH_ENDPOINT}/retry_test/{retry_test_id}") diff --git a/tests/conformance/test_conformance.py b/tests/conformance/test_conformance.py index 819218d24..a9017aa09 100644 --- a/tests/conformance/test_conformance.py +++ b/tests/conformance/test_conformance.py @@ -923,6 +923,12 @@ def run_test_case( hmac_key, file_data, ): + print( + "Debug Log: env vars:", + os.environ.get("STORAGE_EMULATOR_HOST", None), + _HOST, + _PORT, + ) scenario = _CONFORMANCE_TESTS[scenario_id - 1] expect_success = scenario["expectSuccess"] precondition_provided = scenario["preconditionProvided"] From f688167334808622daf52225900d2c279b2082d0 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 6 Mar 2026 14:19:35 +0000 Subject: [PATCH 14/16] increase timeout --- noxfile.py | 2 +- tests/conformance/test_conformance.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/noxfile.py b/noxfile.py index a3cfb81ba..6bce85327 100644 --- a/noxfile.py +++ b/noxfile.py @@ -256,7 +256,7 @@ def conftest_retry(session): *session.posargs, ] else: - test_cmd = ["py.test", "-vv", "-s", "-n", "auto", conformance_test_folder_path] + test_cmd = ["pytest", "-vv", "-s", "-n", "auto", conformance_test_folder_path] # Run py.test against the conformance tests. session.run(*test_cmd, env={"DOCKER_API_VERSION": "1.39"}) diff --git a/tests/conformance/test_conformance.py b/tests/conformance/test_conformance.py index a9017aa09..734569a9a 100644 --- a/tests/conformance/test_conformance.py +++ b/tests/conformance/test_conformance.py @@ -1010,5 +1010,5 @@ def run_test_case( globals()[test_name] = functools.partial( run_test_case, id, m, c, lib_func, _HOST ) - time.sleep(5) + time.sleep(500) proc.kill() From 36ecfcdfbef0f3a10f2d7b9353dd79c539a29447 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 6 Mar 2026 15:41:59 +0000 Subject: [PATCH 15/16] run bidi reads & writes from test conf test --- tests/conformance/test_conformance.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/conformance/test_conformance.py b/tests/conformance/test_conformance.py index 734569a9a..bd8c6cb07 100644 --- a/tests/conformance/test_conformance.py +++ b/tests/conformance/test_conformance.py @@ -31,6 +31,8 @@ from google.cloud.storage.hmac_key import HMACKeyMetadata from . import _read_local_json +from . import test_bidi_reads +from . import test_bidi_writes _CONFORMANCE_TESTS = _read_local_json("retry_strategy_test_data.json")["retryTests"] @@ -1010,5 +1012,9 @@ def run_test_case( globals()[test_name] = functools.partial( run_test_case, id, m, c, lib_func, _HOST ) + + globals()["test_bidi_reads"] = test_bidi_reads.test_bidi_reads + globals()["test_bidi_writes"] = test_bidi_writes.test_bidi_writes + time.sleep(500) proc.kill() From d845768196959eb940589c7e32106d40822e91de Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 6 Mar 2026 17:05:37 +0000 Subject: [PATCH 16/16] run separate dockers for bidi apis --- tests/conformance/test_bidi_reads.py | 52 +++++++++++++++++++++---- tests/conformance/test_bidi_writes.py | 55 ++++++++++++++++++++++++--- tests/conformance/test_conformance.py | 14 +------ 3 files changed, 96 insertions(+), 25 deletions(-) diff --git a/tests/conformance/test_bidi_reads.py b/tests/conformance/test_bidi_reads.py index 6d1f38de4..935d9bffa 100644 --- a/tests/conformance/test_bidi_reads.py +++ b/tests/conformance/test_bidi_reads.py @@ -1,5 +1,6 @@ import io -import os +import subprocess +import time import traceback import urllib import uuid @@ -21,14 +22,50 @@ # --- Configuration --- -TEST_BENCH_ENDPOINT = os.environ.get("STORAGE_EMULATOR_HOST", "http://localhost:9000") + +TEST_BENCH_ENDPOINT = ( + "http://localhost:9001" # 9000 in VM is taken by test_conformance.py +) _PORT = urllib.parse.urlsplit(TEST_BENCH_ENDPOINT).port +_GRPC_PORT = 8888 PROJECT_NUMBER = "12345" # A dummy project number is fine for the testbench. -GRPC_ENDPOINT = "localhost:8888" -TEST_BENCH_ENDPOINT = "http://localhost:9000" +GRPC_ENDPOINT = f"localhost:{_GRPC_PORT}" CONTENT_LENGTH = 1024 * 10 # 10 KB +_DEFAULT_IMAGE_NAME = "gcr.io/cloud-devrel-public-resources/storage-testbench" +_DEFAULT_IMAGE_TAG = "latest" +_DOCKER_IMAGE = f"{_DEFAULT_IMAGE_NAME}:{_DEFAULT_IMAGE_TAG}" +_PULL_CMD = ["docker", "pull", _DOCKER_IMAGE] +_RUN_CMD = [ + "docker", + "run", + "--name", + "bidi_reads_container", + "--rm", + "-d", + "-p", + f"{_PORT}:9000", + "-p", + f"{_GRPC_PORT}:{_GRPC_PORT}", + _DOCKER_IMAGE, +] +_DOCKER_STOP_CMD = [ + "docker", + "stop", + "bidi_reads_container", +] + + +@pytest.fixture(scope="module") +def testbench(): + subprocess.run(_PULL_CMD) + proc = subprocess.Popen(_RUN_CMD) + time.sleep(10) + yield GRPC_ENDPOINT, TEST_BENCH_ENDPOINT + subprocess.run(_DOCKER_STOP_CMD) + proc.kill() + def _is_retriable(exc): """Predicate for identifying retriable errors.""" @@ -97,11 +134,12 @@ async def run_test_scenario(http_client, bucket_name, object_name, scenario): @pytest.mark.asyncio -async def test_bidi_reads(): +async def test_bidi_reads(testbench): """Main function to set up resources and run all test scenarios.""" - print("starting grpc server", GRPC_ENDPOINT, TEST_BENCH_ENDPOINT) + grpc_endpoint, test_bench_endpoint = testbench + print("starting grpc server", grpc_endpoint, test_bench_endpoint) start_grpc_server( - GRPC_ENDPOINT, TEST_BENCH_ENDPOINT + grpc_endpoint, test_bench_endpoint ) # Ensure the testbench gRPC server is running before this test executes. channel = grpc.aio.insecure_channel(GRPC_ENDPOINT) creds = auth_credentials.AnonymousCredentials() diff --git a/tests/conformance/test_bidi_writes.py b/tests/conformance/test_bidi_writes.py index b7a64bce0..aaf5e741b 100644 --- a/tests/conformance/test_bidi_writes.py +++ b/tests/conformance/test_bidi_writes.py @@ -1,3 +1,6 @@ +import subprocess +import time +import urllib import uuid import grpc import pytest @@ -15,9 +18,50 @@ from tests.conformance._utils import start_grpc_server # --- Configuration --- +TEST_BENCH_ENDPOINT = ( + "http://localhost:9002" # 9000 in VM is taken by test_conformance.py, 9001 by reads +) +_PORT = urllib.parse.urlsplit(TEST_BENCH_ENDPOINT).port +_GRPC_PORT = 8888 + PROJECT_NUMBER = "12345" # A dummy project number is fine for the testbench. -GRPC_ENDPOINT = "localhost:8888" -HTTP_ENDPOINT = "http://localhost:9000" +GRPC_ENDPOINT = f"localhost:{_GRPC_PORT}" +HTTP_ENDPOINT = TEST_BENCH_ENDPOINT + +_DEFAULT_IMAGE_NAME = "gcr.io/cloud-devrel-public-resources/storage-testbench" +_DEFAULT_IMAGE_TAG = "latest" +_DOCKER_IMAGE = f"{_DEFAULT_IMAGE_NAME}:{_DEFAULT_IMAGE_TAG}" +_PULL_CMD = ["docker", "pull", _DOCKER_IMAGE] +_RUN_CMD = [ + "docker", + "run", + "--name", + "bidi_writes_container", + "--rm", + "-d", + "-p", + f"{_PORT}:9000", + "-p", + f"{_GRPC_PORT}:8888", + _DOCKER_IMAGE, +] +_DOCKER_STOP_CMD = [ + "docker", + "stop", + "bidi_writes_container", +] + + +@pytest.fixture(scope="module") +def testbench(): + subprocess.run(_PULL_CMD) + proc = subprocess.Popen(_RUN_CMD) + time.sleep(10) + yield GRPC_ENDPOINT, HTTP_ENDPOINT + subprocess.run(_DOCKER_STOP_CMD) + proc.kill() + + CONTENT = b"A" * 1024 * 1024 * 10 # 10 KB @@ -134,12 +178,13 @@ def on_retry_error(exc): @pytest.mark.asyncio -async def test_bidi_writes(): +async def test_bidi_writes(testbench): """Main function to set up resources and run all test scenarios.""" + grpc_endpoint, http_endpoint = testbench start_grpc_server( - GRPC_ENDPOINT, HTTP_ENDPOINT + grpc_endpoint, http_endpoint ) # Ensure the testbench gRPC server is running before this test executes. - channel = grpc.aio.insecure_channel(GRPC_ENDPOINT) + channel = grpc.aio.insecure_channel(grpc_endpoint) creds = auth_credentials.AnonymousCredentials() transport = storage_v2.services.storage.transports.StorageGrpcAsyncIOTransport( channel=channel, diff --git a/tests/conformance/test_conformance.py b/tests/conformance/test_conformance.py index bd8c6cb07..819218d24 100644 --- a/tests/conformance/test_conformance.py +++ b/tests/conformance/test_conformance.py @@ -31,8 +31,6 @@ from google.cloud.storage.hmac_key import HMACKeyMetadata from . import _read_local_json -from . import test_bidi_reads -from . import test_bidi_writes _CONFORMANCE_TESTS = _read_local_json("retry_strategy_test_data.json")["retryTests"] @@ -925,12 +923,6 @@ def run_test_case( hmac_key, file_data, ): - print( - "Debug Log: env vars:", - os.environ.get("STORAGE_EMULATOR_HOST", None), - _HOST, - _PORT, - ) scenario = _CONFORMANCE_TESTS[scenario_id - 1] expect_success = scenario["expectSuccess"] precondition_provided = scenario["preconditionProvided"] @@ -1012,9 +1004,5 @@ def run_test_case( globals()[test_name] = functools.partial( run_test_case, id, m, c, lib_func, _HOST ) - - globals()["test_bidi_reads"] = test_bidi_reads.test_bidi_reads - globals()["test_bidi_writes"] = test_bidi_writes.test_bidi_writes - - time.sleep(500) + time.sleep(5) proc.kill()