Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE,
CommitTableResponse,
CreateTableTransaction,
ReplaceTableTransaction,
StagedTable,
Table,
TableProperties,
Expand Down Expand Up @@ -442,6 +443,66 @@ def create_table_if_not_exists(
except TableAlreadyExistsError:
return self.load_table(identifier)

@abstractmethod
def replace_table(
self,
identifier: str | Identifier,
schema: Schema | pa.Schema,
location: str | None = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> Table:
"""Atomically replace a table's schema, spec, sort order, location, and properties.

The table UUID and history (snapshots, schemas, specs, sort orders) are preserved.
The current snapshot is cleared (main branch ref is removed).

Args:
identifier (str | Identifier): Table identifier.
schema (Schema): New table schema.
location (str | None): New table location. Defaults to the existing location.
partition_spec (PartitionSpec): New partition spec.
sort_order (SortOrder): New sort order.
properties (Properties): New table properties (merged with existing).

Returns:
Table: the replaced table instance.

Raises:
NoSuchTableError: If the table does not exist.
"""

@abstractmethod
def replace_table_transaction(
self,
identifier: str | Identifier,
schema: Schema | pa.Schema,
location: str | None = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> ReplaceTableTransaction:
"""Create a ReplaceTableTransaction.

The transaction can be used to stage additional changes (schema evolution,
partition evolution, etc.) before committing.

Args:
identifier (str | Identifier): Table identifier.
schema (Schema): New table schema.
location (str | None): New table location. Defaults to the existing location.
partition_spec (PartitionSpec): New partition spec.
sort_order (SortOrder): New sort order.
properties (Properties): New table properties (merged with existing).

Returns:
ReplaceTableTransaction: A transaction for the replace operation.

Raises:
NoSuchTableError: If the table does not exist.
"""

@abstractmethod
def load_table(self, identifier: str | Identifier) -> Table:
"""Load the table's metadata and returns the table instance.
Expand Down Expand Up @@ -888,6 +949,28 @@ def create_table_transaction(
self._create_staged_table(identifier, schema, location, partition_spec, sort_order, properties)
)

def replace_table(
self,
identifier: str | Identifier,
schema: Schema | pa.Schema,
location: str | None = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> Table:
raise NotImplementedError("replace_table is not yet supported for this catalog type")

def replace_table_transaction(
self,
identifier: str | Identifier,
schema: Schema | pa.Schema,
location: str | None = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> ReplaceTableTransaction:
raise NotImplementedError("replace_table_transaction is not yet supported for this catalog type")

def table_exists(self, identifier: str | Identifier) -> bool:
try:
self.load_table(identifier)
Expand Down
23 changes: 23 additions & 0 deletions pyiceberg/catalog/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from pyiceberg.table import (
CommitTableResponse,
CreateTableTransaction,
ReplaceTableTransaction,
Table,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
Expand Down Expand Up @@ -64,6 +65,28 @@ def create_table_transaction(
) -> CreateTableTransaction:
raise NotImplementedError

def replace_table(
self,
identifier: str | Identifier,
schema: Schema | pa.Schema,
location: str | None = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> Table:
raise NotImplementedError

def replace_table_transaction(
self,
identifier: str | Identifier,
schema: Schema | pa.Schema,
location: str | None = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> ReplaceTableTransaction:
raise NotImplementedError

def load_table(self, identifier: str | Identifier) -> Table:
raise NotImplementedError

Expand Down
81 changes: 79 additions & 2 deletions pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,19 @@
FileIO,
load_file_io,
)
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec, assign_fresh_partition_spec_ids
from pyiceberg.schema import Schema, assign_fresh_schema_ids
from pyiceberg.partitioning import (
UNPARTITIONED_PARTITION_SPEC,
PartitionSpec,
assign_fresh_partition_spec_ids,
assign_fresh_partition_spec_ids_for_replace,
)
from pyiceberg.schema import Schema, assign_fresh_schema_ids, assign_fresh_schema_ids_for_replace
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
CreateTableTransaction,
FileScanTask,
ReplaceTableTransaction,
StagedTable,
Table,
TableIdentifier,
Expand Down Expand Up @@ -930,6 +936,77 @@ def create_table_transaction(
staged_table = self._response_to_staged_table(self.identifier_to_tuple(identifier), table_response)
return CreateTableTransaction(staged_table)

def replace_table(
self,
identifier: str | Identifier,
schema: Schema | pa.Schema,
location: str | None = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> Table:
txn = self.replace_table_transaction(
identifier=identifier,
schema=schema,
location=location,
partition_spec=partition_spec,
sort_order=sort_order,
properties=properties,
)
return txn.commit_transaction()

@retry(**_RETRY_ARGS)
def replace_table_transaction(
self,
identifier: str | Identifier,
schema: Schema | pa.Schema,
location: str | None = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> ReplaceTableTransaction:
existing_table = self.load_table(identifier)
existing_metadata = existing_table.metadata

iceberg_schema = self._convert_schema_if_needed(
schema,
int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore
)

# Assign fresh schema IDs, reusing IDs from the existing schema by field name
fresh_schema, _ = assign_fresh_schema_ids_for_replace(
iceberg_schema, existing_metadata.schema(), existing_metadata.last_column_id
)

# Assign fresh partition spec IDs, reusing IDs from existing specs
fresh_partition_spec, _ = assign_fresh_partition_spec_ids_for_replace(
partition_spec, iceberg_schema, fresh_schema, existing_metadata.partition_specs, existing_metadata.last_partition_id
)

# Assign fresh sort order IDs
fresh_sort_order = assign_fresh_sort_order_ids(sort_order, iceberg_schema, fresh_schema)

# Use existing location if not specified
resolved_location = location.rstrip("/") if location else existing_metadata.location

# Create a StagedTable from the existing table
staged_table = StagedTable(
identifier=existing_table.name(),
metadata=existing_metadata,
metadata_location=existing_table.metadata_location,
io=existing_table.io,
catalog=self,
)

return ReplaceTableTransaction(
table=staged_table,
new_schema=fresh_schema,
new_spec=fresh_partition_spec,
new_sort_order=fresh_sort_order,
new_location=resolved_location,
new_properties=properties,
)

@retry(**_RETRY_ARGS)
def create_view(
self,
Expand Down
67 changes: 67 additions & 0 deletions pyiceberg/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,73 @@ def assign_fresh_partition_spec_ids(spec: PartitionSpec, old_schema: Schema, fre
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID)


def assign_fresh_partition_spec_ids_for_replace(
spec: PartitionSpec,
old_schema: Schema,
fresh_schema: Schema,
existing_specs: list[PartitionSpec],
last_partition_id: int | None,
) -> tuple[PartitionSpec, int]:
"""Assign partition field IDs for a replace operation, reusing IDs from existing specs.

For each partition field, if a field with the same (source_id, transform) pair exists in
any of the existing specs, its partition field ID is reused; otherwise a fresh ID is
allocated starting from last_partition_id + 1.

Args:
spec: The new partition spec to assign IDs to.
old_schema: The schema that the new spec's source_ids reference.
fresh_schema: The schema with freshly assigned field IDs.
existing_specs: All partition specs from the existing table metadata.
last_partition_id: The current table's last_partition_id.

Returns:
A tuple of (fresh_spec, new_last_partition_id).
"""
effective_last_partition_id = last_partition_id if last_partition_id is not None else PARTITION_FIELD_ID_START - 1

# Build (source_id, transform) → partition_field_id mapping from all existing specs
# Use max() for dedup when the same (source_id, transform) appears in multiple specs
transform_to_field_id: dict[tuple[int, str], int] = {}
for existing_spec in existing_specs:
for field in existing_spec.fields:
key = (field.source_id, str(field.transform))
if key not in transform_to_field_id or field.field_id > transform_to_field_id[key]:
transform_to_field_id[key] = field.field_id

next_id = effective_last_partition_id
partition_fields = []
for field in spec.fields:
original_column_name = old_schema.find_column_name(field.source_id)
if original_column_name is None:
raise ValueError(f"Could not find in old schema: {field}")
fresh_field = fresh_schema.find_field(original_column_name)
if fresh_field is None:
raise ValueError(f"Could not find field in fresh schema: {original_column_name}")

validate_partition_name(field.name, field.transform, fresh_field.field_id, fresh_schema, set())

key = (fresh_field.field_id, str(field.transform))
if key in transform_to_field_id:
partition_field_id = transform_to_field_id[key]
else:
next_id += 1
partition_field_id = next_id
transform_to_field_id[key] = partition_field_id

partition_fields.append(
PartitionField(
name=field.name,
source_id=fresh_field.field_id,
field_id=partition_field_id,
transform=field.transform,
)
)

new_last_partition_id = max(next_id, effective_last_partition_id)
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID), new_last_partition_id


T = TypeVar("T")


Expand Down
52 changes: 52 additions & 0 deletions pyiceberg/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1380,6 +1380,58 @@ def primitive(self, primitive: PrimitiveType) -> PrimitiveType:
return primitive


class _SetFreshIDsForReplace(_SetFreshIDs):
"""Assign fresh IDs for a replace operation, reusing IDs from the base schema by field name.

For each field in the new schema, if a field with the same full name exists in the
base schema, its ID is reused; otherwise a fresh ID is allocated starting from
last_column_id + 1.
"""

def __init__(self, old_id_to_base_id: dict[int, int], starting_id: int) -> None:
self.old_id_to_new_id: dict[int, int] = {}
self._old_id_to_base_id = old_id_to_base_id
counter = itertools.count(starting_id + 1)
self.next_id_func = lambda: next(counter)

def _get_and_increment(self, current_id: int) -> int:
if current_id in self._old_id_to_base_id:
new_id = self._old_id_to_base_id[current_id]
else:
new_id = self.next_id_func()
self.old_id_to_new_id[current_id] = new_id
return new_id


def assign_fresh_schema_ids_for_replace(schema: Schema, base_schema: Schema, last_column_id: int) -> tuple[Schema, int]:
"""Assign fresh IDs to a schema for a replace operation, reusing IDs from the base schema.

For each field in the new schema, if a field with the same full path name exists
in the base schema, its ID is reused. New fields get IDs starting from
last_column_id + 1.

Args:
schema: The new schema to assign IDs to.
base_schema: The existing table's schema (IDs are reused from here by name).
last_column_id: The current table's last_column_id (new IDs start above this).

Returns:
A tuple of (fresh_schema, new_last_column_id).
"""
base_name_to_id = index_by_name(base_schema)
new_id_to_name = index_name_by_id(schema)

old_id_to_base_id: dict[int, int] = {}
for old_id, name in new_id_to_name.items():
if name in base_name_to_id:
old_id_to_base_id[old_id] = base_name_to_id[name]

visitor = _SetFreshIDsForReplace(old_id_to_base_id, last_column_id)
fresh_schema = pre_order_visit(schema, visitor)
new_last_column_id = max(fresh_schema.highest_field_id, last_column_id)
return fresh_schema, new_last_column_id


# Implementation copied from Apache Iceberg repo.
def make_compatible_name(name: str) -> str:
"""Make a field name compatible with Avro specification.
Expand Down
Loading
Loading