From a7be0673c0c06a6b2b5132012c86a706bfb9c313 Mon Sep 17 00:00:00 2001 From: jameswillis Date: Mon, 2 Mar 2026 13:41:40 -0800 Subject: [PATCH 1/3] feat: add options support to Store for format-specific Spark DataFrameWriter options --- README.md | 21 ++++++ tests/test_store_options.py | 147 ++++++++++++++++++++++++++++++++++++ wherobots/db/connection.py | 5 +- wherobots/db/models.py | 18 ++++- 4 files changed, 189 insertions(+), 2 deletions(-) create mode 100644 tests/test_store_options.py diff --git a/README.md b/README.md index 4fe0387..e8d9bd2 100644 --- a/README.md +++ b/README.md @@ -99,6 +99,27 @@ The `Store` class supports the following options: Use `Store.for_download()` as a convenient shorthand for storing results as a single Parquet file with a presigned URL. +#### Store options + +You can pass format-specific Spark write options through the `options` +parameter. These correspond to the options available in Spark's +`DataFrameWriter` and are applied after the server's default options, +allowing you to override them. + +```python +# CSV without headers and a custom delimiter +store = Store.for_download( + format=StorageFormat.CSV, + options={"header": "false", "delimiter": "|"}, +) + +# GeoJSON preserving null fields +store = Store.for_download( + format=StorageFormat.GEOJSON, + options={"ignoreNullFields": "false"}, +) +``` + ### Execution progress You can monitor the progress of running queries by registering a diff --git a/tests/test_store_options.py b/tests/test_store_options.py new file mode 100644 index 0000000..ff378a6 --- /dev/null +++ b/tests/test_store_options.py @@ -0,0 +1,147 @@ +"""Tests for Store options support.""" + +import json + +from wherobots.db.models import Store +from wherobots.db.types import StorageFormat + + +class TestStoreOptions: + """Tests for the options field on Store.""" + + def test_default_options_is_none(self): + store = Store(format=StorageFormat.PARQUET) + assert store.options is None + + def test_options_set(self): + store = Store( + format=StorageFormat.CSV, + options={"header": "false", "delimiter": "|"}, + ) + assert store.options == {"header": "false", "delimiter": "|"} + + def test_empty_options_normalized_to_none(self): + store = Store(format=StorageFormat.PARQUET, options={}) + assert store.options is None + + def test_none_options_stays_none(self): + store = Store(format=StorageFormat.PARQUET, options=None) + assert store.options is None + + def test_options_defensively_copied(self): + original = {"header": "false"} + store = Store(format=StorageFormat.CSV, options=original) + # Mutating the original should not affect the store + original["delimiter"] = "|" + assert "delimiter" not in store.options + + def test_options_dict_is_mutable(self): + """Store is not frozen, so options dict can be mutated after construction.""" + store = Store(format=StorageFormat.CSV, options={"header": "false"}) + store.options["delimiter"] = "|" + assert store.options == {"header": "false", "delimiter": "|"} + + +class TestStoreForDownloadWithOptions: + """Tests for Store.for_download() with options parameter.""" + + def test_for_download_default_no_options(self): + store = Store.for_download() + assert store.options is None + + def test_for_download_with_options(self): + store = Store.for_download(options={"header": "false"}) + assert store.options == {"header": "false"} + assert store.single is True + assert store.generate_presigned_url is True + + def test_for_download_with_format_and_options(self): + store = Store.for_download( + format=StorageFormat.CSV, + options={"header": "false", "delimiter": "|"}, + ) + assert store.format == StorageFormat.CSV + assert store.options == {"header": "false", "delimiter": "|"} + + def test_for_download_empty_options_normalized(self): + store = Store.for_download(options={}) + assert store.options is None + + +class TestStoreSerializationWithOptions: + """Tests for store dict serialization matching connection.py's format.""" + + def _serialize_store(self, store: Store) -> dict: + """Replicate the serialization logic from Connection.__execute_sql.""" + store_dict = { + "format": store.format.value, + "single": str(store.single).lower(), + "generate_presigned_url": str(store.generate_presigned_url).lower(), + } + if store.options: + store_dict["options"] = store.options + return store_dict + + def test_serialize_without_options(self): + store = Store.for_download(format=StorageFormat.GEOJSON) + d = self._serialize_store(store) + assert d == { + "format": "geojson", + "single": "true", + "generate_presigned_url": "true", + } + assert "options" not in d + + def test_serialize_with_options(self): + store = Store.for_download( + format=StorageFormat.CSV, + options={"header": "false", "delimiter": "|"}, + ) + d = self._serialize_store(store) + assert d == { + "format": "csv", + "single": "true", + "generate_presigned_url": "true", + "options": {"header": "false", "delimiter": "|"}, + } + + def test_serialize_empty_options_omitted(self): + store = Store(format=StorageFormat.PARQUET, options={}) + d = self._serialize_store(store) + assert "options" not in d + + def test_json_roundtrip_with_options(self): + store = Store.for_download( + format=StorageFormat.GEOJSON, + options={"ignoreNullFields": "false"}, + ) + d = self._serialize_store(store) + payload = json.dumps(d) + parsed = json.loads(payload) + assert parsed["options"] == {"ignoreNullFields": "false"} + + def test_full_request_shape(self): + """Verify the full execute_sql request dict shape with store options.""" + store = Store.for_download( + format=StorageFormat.CSV, + options={"header": "false"}, + ) + request = { + "kind": "execute_sql", + "execution_id": "test-id", + "statement": "SELECT 1", + } + store_dict = self._serialize_store(store) + request["store"] = store_dict + + assert request == { + "kind": "execute_sql", + "execution_id": "test-id", + "statement": "SELECT 1", + "store": { + "format": "csv", + "single": "true", + "generate_presigned_url": "true", + "options": {"header": "false"}, + }, + } diff --git a/wherobots/db/connection.py b/wherobots/db/connection.py index 862abe4..b1545f0 100644 --- a/wherobots/db/connection.py +++ b/wherobots/db/connection.py @@ -279,11 +279,14 @@ def __execute_sql( request["enable_progress_events"] = True if store: - request["store"] = { + store_dict: dict[str, Any] = { "format": store.format.value, "single": str(store.single).lower(), "generate_presigned_url": str(store.generate_presigned_url).lower(), } + if store.options: + store_dict["options"] = store.options + request["store"] = store_dict self.__queries[execution_id] = Query( sql=sql, diff --git a/wherobots/db/models.py b/wherobots/db/models.py index 3a0a939..8c24e35 100644 --- a/wherobots/db/models.py +++ b/wherobots/db/models.py @@ -31,18 +31,32 @@ class Store: single: If True, store as a single file. If False, store as multiple files. generate_presigned_url: If True, generate a presigned URL for the result. Requires single=True. + options: Optional dict of format-specific Spark DataFrameWriter options + (e.g. ``{"header": "false", "delimiter": "|"}`` for CSV). These are + applied after the server's default options, so they can override them. + An empty dict is normalized to None. """ format: StorageFormat single: bool = False generate_presigned_url: bool = False + options: dict[str, str] | None = None def __post_init__(self) -> None: if self.generate_presigned_url and not self.single: raise ValueError("Presigned URL can only be generated when single=True") + # Normalize empty options to None and defensively copy. + if self.options: + self.options = dict(self.options) + else: + self.options = None @classmethod - def for_download(cls, format: StorageFormat | None = None) -> "Store": + def for_download( + cls, + format: StorageFormat | None = None, + options: dict[str, str] | None = None, + ) -> "Store": """Create a configuration for downloading results via a presigned URL. This is a convenience method that creates a configuration with @@ -50,6 +64,7 @@ def for_download(cls, format: StorageFormat | None = None) -> "Store": Args: format: The storage format. + options: Optional format-specific Spark DataFrameWriter options. Returns: A Store configured for single-file download with presigned URL. @@ -58,6 +73,7 @@ def for_download(cls, format: StorageFormat | None = None) -> "Store": format=format or DEFAULT_STORAGE_FORMAT, single=True, generate_presigned_url=True, + options=options, ) From 9b5c68fe61ed8bae8ca08acf154614b5f9533a16 Mon Sep 17 00:00:00 2001 From: jameswillis Date: Tue, 3 Mar 2026 14:02:42 -0800 Subject: [PATCH 2/3] refactor: add Store.to_dict() and use it in connection + tests Address review feedback: - Add to_dict() method on Store for WebSocket request serialization - Use store.to_dict() in Connection.__execute_sql instead of inline dict - Replace duplicated _serialize_store() helper in tests with to_dict() - Remove test_options_dict_is_mutable to avoid enshrining mutability --- tests/test_store_options.py | 29 ++++++----------------------- wherobots/db/connection.py | 9 +-------- wherobots/db/models.py | 17 +++++++++++++++++ 3 files changed, 24 insertions(+), 31 deletions(-) diff --git a/tests/test_store_options.py b/tests/test_store_options.py index ff378a6..2791f9b 100644 --- a/tests/test_store_options.py +++ b/tests/test_store_options.py @@ -35,12 +35,6 @@ def test_options_defensively_copied(self): original["delimiter"] = "|" assert "delimiter" not in store.options - def test_options_dict_is_mutable(self): - """Store is not frozen, so options dict can be mutated after construction.""" - store = Store(format=StorageFormat.CSV, options={"header": "false"}) - store.options["delimiter"] = "|" - assert store.options == {"header": "false", "delimiter": "|"} - class TestStoreForDownloadWithOptions: """Tests for Store.for_download() with options parameter.""" @@ -69,22 +63,11 @@ def test_for_download_empty_options_normalized(self): class TestStoreSerializationWithOptions: - """Tests for store dict serialization matching connection.py's format.""" - - def _serialize_store(self, store: Store) -> dict: - """Replicate the serialization logic from Connection.__execute_sql.""" - store_dict = { - "format": store.format.value, - "single": str(store.single).lower(), - "generate_presigned_url": str(store.generate_presigned_url).lower(), - } - if store.options: - store_dict["options"] = store.options - return store_dict + """Tests for Store.to_dict() serialization.""" def test_serialize_without_options(self): store = Store.for_download(format=StorageFormat.GEOJSON) - d = self._serialize_store(store) + d = store.to_dict() assert d == { "format": "geojson", "single": "true", @@ -97,7 +80,7 @@ def test_serialize_with_options(self): format=StorageFormat.CSV, options={"header": "false", "delimiter": "|"}, ) - d = self._serialize_store(store) + d = store.to_dict() assert d == { "format": "csv", "single": "true", @@ -107,7 +90,7 @@ def test_serialize_with_options(self): def test_serialize_empty_options_omitted(self): store = Store(format=StorageFormat.PARQUET, options={}) - d = self._serialize_store(store) + d = store.to_dict() assert "options" not in d def test_json_roundtrip_with_options(self): @@ -115,7 +98,7 @@ def test_json_roundtrip_with_options(self): format=StorageFormat.GEOJSON, options={"ignoreNullFields": "false"}, ) - d = self._serialize_store(store) + d = store.to_dict() payload = json.dumps(d) parsed = json.loads(payload) assert parsed["options"] == {"ignoreNullFields": "false"} @@ -131,7 +114,7 @@ def test_full_request_shape(self): "execution_id": "test-id", "statement": "SELECT 1", } - store_dict = self._serialize_store(store) + store_dict = store.to_dict() request["store"] = store_dict assert request == { diff --git a/wherobots/db/connection.py b/wherobots/db/connection.py index b1545f0..cbe9e47 100644 --- a/wherobots/db/connection.py +++ b/wherobots/db/connection.py @@ -279,14 +279,7 @@ def __execute_sql( request["enable_progress_events"] = True if store: - store_dict: dict[str, Any] = { - "format": store.format.value, - "single": str(store.single).lower(), - "generate_presigned_url": str(store.generate_presigned_url).lower(), - } - if store.options: - store_dict["options"] = store.options - request["store"] = store_dict + request["store"] = store.to_dict() self.__queries[execution_id] = Query( sql=sql, diff --git a/wherobots/db/models.py b/wherobots/db/models.py index 8c24e35..ed1b75d 100644 --- a/wherobots/db/models.py +++ b/wherobots/db/models.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from typing import Any, Dict import pandas @@ -76,6 +77,22 @@ def for_download( options=options, ) + def to_dict(self) -> Dict[str, Any]: + """Serialize this Store to a dict for the WebSocket request. + + Returns a dict suitable for inclusion as the ``"store"`` field in an + ``execute_sql`` request. The ``options`` key is omitted when there + are no user-supplied options (backward compatible). + """ + d: Dict[str, Any] = { + "format": self.format.value, + "single": str(self.single).lower(), + "generate_presigned_url": str(self.generate_presigned_url).lower(), + } + if self.options: + d["options"] = self.options + return d + @dataclass class ExecutionResult: From 65a871146eb8880cd604eedf9447ec56b8da6993 Mon Sep 17 00:00:00 2001 From: jameswillis Date: Tue, 3 Mar 2026 14:18:22 -0800 Subject: [PATCH 3/3] chore: bump version to 0.25.0 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 720fe39..13a8050 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "wherobots-python-dbapi" -version = "0.24.0" +version = "0.25.0" description = "Python DB-API driver for Wherobots DB" authors = [{ name = "Maxime Petazzoni", email = "max@wherobots.com" }] requires-python = ">=3.10, <4"