diff --git a/README.md b/README.md index 4fe0387..e8d9bd2 100644 --- a/README.md +++ b/README.md @@ -99,6 +99,27 @@ The `Store` class supports the following options: Use `Store.for_download()` as a convenient shorthand for storing results as a single Parquet file with a presigned URL. +#### Store options + +You can pass format-specific Spark write options through the `options` +parameter. These correspond to the options available in Spark's +`DataFrameWriter` and are applied after the server's default options, +allowing you to override them. + +```python +# CSV without headers and a custom delimiter +store = Store.for_download( + format=StorageFormat.CSV, + options={"header": "false", "delimiter": "|"}, +) + +# GeoJSON preserving null fields +store = Store.for_download( + format=StorageFormat.GEOJSON, + options={"ignoreNullFields": "false"}, +) +``` + ### Execution progress You can monitor the progress of running queries by registering a diff --git a/pyproject.toml b/pyproject.toml index 720fe39..13a8050 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "wherobots-python-dbapi" -version = "0.24.0" +version = "0.25.0" description = "Python DB-API driver for Wherobots DB" authors = [{ name = "Maxime Petazzoni", email = "max@wherobots.com" }] requires-python = ">=3.10, <4" diff --git a/tests/test_store_options.py b/tests/test_store_options.py new file mode 100644 index 0000000..2791f9b --- /dev/null +++ b/tests/test_store_options.py @@ -0,0 +1,130 @@ +"""Tests for Store options support.""" + +import json + +from wherobots.db.models import Store +from wherobots.db.types import StorageFormat + + +class TestStoreOptions: + """Tests for the options field on Store.""" + + def test_default_options_is_none(self): + store = Store(format=StorageFormat.PARQUET) + assert store.options is None + + def test_options_set(self): + store = Store( + format=StorageFormat.CSV, + options={"header": "false", "delimiter": "|"}, + ) + assert store.options == {"header": "false", "delimiter": "|"} + + def test_empty_options_normalized_to_none(self): + store = Store(format=StorageFormat.PARQUET, options={}) + assert store.options is None + + def test_none_options_stays_none(self): + store = Store(format=StorageFormat.PARQUET, options=None) + assert store.options is None + + def test_options_defensively_copied(self): + original = {"header": "false"} + store = Store(format=StorageFormat.CSV, options=original) + # Mutating the original should not affect the store + original["delimiter"] = "|" + assert "delimiter" not in store.options + + +class TestStoreForDownloadWithOptions: + """Tests for Store.for_download() with options parameter.""" + + def test_for_download_default_no_options(self): + store = Store.for_download() + assert store.options is None + + def test_for_download_with_options(self): + store = Store.for_download(options={"header": "false"}) + assert store.options == {"header": "false"} + assert store.single is True + assert store.generate_presigned_url is True + + def test_for_download_with_format_and_options(self): + store = Store.for_download( + format=StorageFormat.CSV, + options={"header": "false", "delimiter": "|"}, + ) + assert store.format == StorageFormat.CSV + assert store.options == {"header": "false", "delimiter": "|"} + + def test_for_download_empty_options_normalized(self): + store = Store.for_download(options={}) + assert store.options is None + + +class TestStoreSerializationWithOptions: + """Tests for Store.to_dict() serialization.""" + + def test_serialize_without_options(self): + store = Store.for_download(format=StorageFormat.GEOJSON) + d = store.to_dict() + assert d == { + "format": "geojson", + "single": "true", + "generate_presigned_url": "true", + } + assert "options" not in d + + def test_serialize_with_options(self): + store = Store.for_download( + format=StorageFormat.CSV, + options={"header": "false", "delimiter": "|"}, + ) + d = store.to_dict() + assert d == { + "format": "csv", + "single": "true", + "generate_presigned_url": "true", + "options": {"header": "false", "delimiter": "|"}, + } + + def test_serialize_empty_options_omitted(self): + store = Store(format=StorageFormat.PARQUET, options={}) + d = store.to_dict() + assert "options" not in d + + def test_json_roundtrip_with_options(self): + store = Store.for_download( + format=StorageFormat.GEOJSON, + options={"ignoreNullFields": "false"}, + ) + d = store.to_dict() + payload = json.dumps(d) + parsed = json.loads(payload) + assert parsed["options"] == {"ignoreNullFields": "false"} + + def test_full_request_shape(self): + """Verify the full execute_sql request dict shape with store options.""" + store = Store.for_download( + format=StorageFormat.CSV, + options={"header": "false"}, + ) + request = { + "kind": "execute_sql", + "execution_id": "test-id", + "statement": "SELECT 1", + } + store_dict = store.to_dict() + request["store"] = store_dict + + assert request == { + "kind": "execute_sql", + "execution_id": "test-id", + "statement": "SELECT 1", + "store": { + "format": "csv", + "single": "true", + "generate_presigned_url": "true", + "options": {"header": "false"}, + }, + } diff --git a/wherobots/db/connection.py b/wherobots/db/connection.py index 862abe4..cbe9e47 100644 --- a/wherobots/db/connection.py +++ b/wherobots/db/connection.py @@ -279,11 +279,7 @@ def __execute_sql( request["enable_progress_events"] = True if store: - request["store"] = { - "format": store.format.value, - "single": str(store.single).lower(), - "generate_presigned_url": str(store.generate_presigned_url).lower(), - } + request["store"] = store.to_dict() self.__queries[execution_id] = Query( sql=sql, diff --git a/wherobots/db/models.py b/wherobots/db/models.py index 3a0a939..ed1b75d 100644 --- a/wherobots/db/models.py +++ b/wherobots/db/models.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from typing import Any, Dict import pandas @@ -31,18 +32,32 @@ class Store: single: If True, store as a single file. If False, store as multiple files. generate_presigned_url: If True, generate a presigned URL for the result. Requires single=True. + options: Optional dict of format-specific Spark DataFrameWriter options + (e.g. ``{"header": "false", "delimiter": "|"}`` for CSV). These are + applied after the server's default options, so they can override them. + An empty dict is normalized to None. """ format: StorageFormat single: bool = False generate_presigned_url: bool = False + options: dict[str, str] | None = None def __post_init__(self) -> None: if self.generate_presigned_url and not self.single: raise ValueError("Presigned URL can only be generated when single=True") + # Normalize empty options to None and defensively copy. + if self.options: + self.options = dict(self.options) + else: + self.options = None @classmethod - def for_download(cls, format: StorageFormat | None = None) -> "Store": + def for_download( + cls, + format: StorageFormat | None = None, + options: dict[str, str] | None = None, + ) -> "Store": """Create a configuration for downloading results via a presigned URL. This is a convenience method that creates a configuration with @@ -50,6 +65,7 @@ def for_download(cls, format: StorageFormat | None = None) -> "Store": Args: format: The storage format. + options: Optional format-specific Spark DataFrameWriter options. Returns: A Store configured for single-file download with presigned URL. @@ -58,8 +74,25 @@ def for_download(cls, format: StorageFormat | None = None) -> "Store": format=format or DEFAULT_STORAGE_FORMAT, single=True, generate_presigned_url=True, + options=options, ) + def to_dict(self) -> Dict[str, Any]: + """Serialize this Store to a dict for the WebSocket request. + + Returns a dict suitable for inclusion as the ``"store"`` field in an + ``execute_sql`` request. The ``options`` key is omitted when there + are no user-supplied options (backward compatible). + """ + d: Dict[str, Any] = { + "format": self.format.value, + "single": str(self.single).lower(), + "generate_presigned_url": str(self.generate_presigned_url).lower(), + } + if self.options: + d["options"] = self.options + return d + @dataclass class ExecutionResult: