Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,27 @@ The `Store` class supports the following options:
Use `Store.for_download()` as a convenient shorthand for storing results
as a single Parquet file with a presigned URL.

#### Store options

You can pass format-specific Spark write options through the `options`
parameter. These correspond to the options available in Spark's
`DataFrameWriter` and are applied after the server's default options,
allowing you to override them.

```python
# CSV without headers and a custom delimiter
store = Store.for_download(
format=StorageFormat.CSV,
options={"header": "false", "delimiter": "|"},
)

# GeoJSON preserving null fields
store = Store.for_download(
format=StorageFormat.GEOJSON,
options={"ignoreNullFields": "false"},
)
```

### Execution progress

You can monitor the progress of running queries by registering a
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "wherobots-python-dbapi"
version = "0.24.0"
version = "0.25.0"
description = "Python DB-API driver for Wherobots DB"
authors = [{ name = "Maxime Petazzoni", email = "max@wherobots.com" }]
requires-python = ">=3.10, <4"
Expand Down
130 changes: 130 additions & 0 deletions tests/test_store_options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""Tests for Store options support."""

import json

from wherobots.db.models import Store
from wherobots.db.types import StorageFormat


class TestStoreOptions:
"""Tests for the options field on Store."""

def test_default_options_is_none(self):
store = Store(format=StorageFormat.PARQUET)
assert store.options is None

def test_options_set(self):
store = Store(
format=StorageFormat.CSV,
options={"header": "false", "delimiter": "|"},
)
assert store.options == {"header": "false", "delimiter": "|"}

def test_empty_options_normalized_to_none(self):
store = Store(format=StorageFormat.PARQUET, options={})
assert store.options is None

def test_none_options_stays_none(self):
store = Store(format=StorageFormat.PARQUET, options=None)
assert store.options is None

def test_options_defensively_copied(self):
original = {"header": "false"}
store = Store(format=StorageFormat.CSV, options=original)
# Mutating the original should not affect the store
original["delimiter"] = "|"
assert "delimiter" not in store.options


class TestStoreForDownloadWithOptions:
"""Tests for Store.for_download() with options parameter."""

def test_for_download_default_no_options(self):
store = Store.for_download()
assert store.options is None

def test_for_download_with_options(self):
store = Store.for_download(options={"header": "false"})
assert store.options == {"header": "false"}
assert store.single is True
assert store.generate_presigned_url is True

def test_for_download_with_format_and_options(self):
store = Store.for_download(
format=StorageFormat.CSV,
options={"header": "false", "delimiter": "|"},
)
assert store.format == StorageFormat.CSV
assert store.options == {"header": "false", "delimiter": "|"}

def test_for_download_empty_options_normalized(self):
store = Store.for_download(options={})
assert store.options is None


class TestStoreSerializationWithOptions:
"""Tests for Store.to_dict() serialization."""

def test_serialize_without_options(self):
store = Store.for_download(format=StorageFormat.GEOJSON)
d = store.to_dict()
assert d == {
"format": "geojson",
"single": "true",
"generate_presigned_url": "true",
}
assert "options" not in d

def test_serialize_with_options(self):
store = Store.for_download(
format=StorageFormat.CSV,
options={"header": "false", "delimiter": "|"},
)
d = store.to_dict()
assert d == {
"format": "csv",
"single": "true",
"generate_presigned_url": "true",
"options": {"header": "false", "delimiter": "|"},
}

def test_serialize_empty_options_omitted(self):
store = Store(format=StorageFormat.PARQUET, options={})
d = store.to_dict()
assert "options" not in d

def test_json_roundtrip_with_options(self):
store = Store.for_download(
format=StorageFormat.GEOJSON,
options={"ignoreNullFields": "false"},
)
d = store.to_dict()
payload = json.dumps(d)
parsed = json.loads(payload)
assert parsed["options"] == {"ignoreNullFields": "false"}

def test_full_request_shape(self):
"""Verify the full execute_sql request dict shape with store options."""
store = Store.for_download(
format=StorageFormat.CSV,
options={"header": "false"},
)
request = {
"kind": "execute_sql",
"execution_id": "test-id",
"statement": "SELECT 1",
}
store_dict = store.to_dict()
request["store"] = store_dict

assert request == {
"kind": "execute_sql",
"execution_id": "test-id",
"statement": "SELECT 1",
"store": {
"format": "csv",
"single": "true",
"generate_presigned_url": "true",
"options": {"header": "false"},
},
}
6 changes: 1 addition & 5 deletions wherobots/db/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,7 @@ def __execute_sql(
request["enable_progress_events"] = True

if store:
request["store"] = {
"format": store.format.value,
"single": str(store.single).lower(),
"generate_presigned_url": str(store.generate_presigned_url).lower(),
}
request["store"] = store.to_dict()

self.__queries[execution_id] = Query(
sql=sql,
Expand Down
35 changes: 34 additions & 1 deletion wherobots/db/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dataclasses import dataclass
from typing import Any, Dict

import pandas

Expand Down Expand Up @@ -31,25 +32,40 @@ class Store:
single: If True, store as a single file. If False, store as multiple files.
generate_presigned_url: If True, generate a presigned URL for the result.
Requires single=True.
options: Optional dict of format-specific Spark DataFrameWriter options
(e.g. ``{"header": "false", "delimiter": "|"}`` for CSV). These are
applied after the server's default options, so they can override them.
An empty dict is normalized to None.
"""

format: StorageFormat
single: bool = False
generate_presigned_url: bool = False
options: dict[str, str] | None = None

def __post_init__(self) -> None:
if self.generate_presigned_url and not self.single:
raise ValueError("Presigned URL can only be generated when single=True")
# Normalize empty options to None and defensively copy.
if self.options:
self.options = dict(self.options)
else:
self.options = None

@classmethod
def for_download(cls, format: StorageFormat | None = None) -> "Store":
def for_download(
cls,
format: StorageFormat | None = None,
options: dict[str, str] | None = None,
) -> "Store":
"""Create a configuration for downloading results via a presigned URL.

This is a convenience method that creates a configuration with
single file mode and presigned URL generation enabled.

Args:
format: The storage format.
options: Optional format-specific Spark DataFrameWriter options.

Returns:
A Store configured for single-file download with presigned URL.
Expand All @@ -58,8 +74,25 @@ def for_download(cls, format: StorageFormat | None = None) -> "Store":
format=format or DEFAULT_STORAGE_FORMAT,
single=True,
generate_presigned_url=True,
options=options,
)

def to_dict(self) -> Dict[str, Any]:
"""Serialize this Store to a dict for the WebSocket request.

Returns a dict suitable for inclusion as the ``"store"`` field in an
``execute_sql`` request. The ``options`` key is omitted when there
are no user-supplied options (backward compatible).
"""
d: Dict[str, Any] = {
"format": self.format.value,
"single": str(self.single).lower(),
"generate_presigned_url": str(self.generate_presigned_url).lower(),
}
if self.options:
d["options"] = self.options
return d


@dataclass
class ExecutionResult:
Expand Down