diff --git a/pyiceberg/avro/file.py b/pyiceberg/avro/file.py index 7db92818fe..b10ef9247c 100644 --- a/pyiceberg/avro/file.py +++ b/pyiceberg/avro/file.py @@ -22,6 +22,7 @@ import io import json import os +import threading from collections.abc import Callable from dataclasses import dataclass from enum import Enum @@ -31,6 +32,8 @@ TypeVar, ) +from cachetools import LRUCache + from pyiceberg.avro.codecs import AVRO_CODEC_KEY, CODEC_MAPPING_ICEBERG_TO_AVRO, KNOWN_CODECS from pyiceberg.avro.codecs.codec import Codec from pyiceberg.avro.decoder import BinaryDecoder, new_decoder @@ -68,6 +71,48 @@ _SCHEMA_KEY = "avro.schema" +# Cache for Avro-to-Iceberg schema conversion, keyed by raw schema JSON string. +# Manifests of the same type share the same Avro schema, so this avoids +# redundant JSON parsing and schema conversion on every manifest open. +_schema_cache: LRUCache[str, Schema] = LRUCache(maxsize=32) +_schema_cache_lock = threading.Lock() + +# Cache for resolved reader trees, keyed by object identity of (file_schema, +# read_schema, read_types, read_enums). Reader objects are stateless — read() +# takes a decoder and returns decoded data without mutating self, so sharing +# cached readers across threads and calls is safe. +_reader_cache: LRUCache[tuple[int, ...], Reader] = LRUCache(maxsize=32) +_reader_cache_lock = threading.Lock() + + +def _cached_avro_to_iceberg(avro_schema_string: str) -> Schema: + """Convert an Avro schema JSON string to an Iceberg Schema, with caching.""" + with _schema_cache_lock: + if avro_schema_string in _schema_cache: + return _schema_cache[avro_schema_string] + schema = AvroSchemaConversion().avro_to_iceberg(json.loads(avro_schema_string)) + with _schema_cache_lock: + _schema_cache[avro_schema_string] = schema + return schema + + +def _cached_resolve_reader( + file_schema: Schema, + read_schema: Schema, + read_types: dict[int, Callable[..., StructProtocol]], + read_enums: dict[int, Callable[..., Enum]], +) -> Reader: + """Resolve a reader tree for the given schema pair, with caching.""" + key = (id(file_schema), id(read_schema), id(read_types), id(read_enums)) + with _reader_cache_lock: + if key in _reader_cache: + return _reader_cache[key] + reader = resolve_reader(file_schema, read_schema, read_types, read_enums) + with _reader_cache_lock: + _reader_cache[key] = reader + return reader + + class AvroFileHeader(Record): @property def magic(self) -> bytes: @@ -97,9 +142,7 @@ def compression_codec(self) -> type[Codec] | None: def get_schema(self) -> Schema: if _SCHEMA_KEY in self.meta: - avro_schema_string = self.meta[_SCHEMA_KEY] - avro_schema = json.loads(avro_schema_string) - return AvroSchemaConversion().avro_to_iceberg(avro_schema) + return _cached_avro_to_iceberg(self.meta[_SCHEMA_KEY]) else: raise ValueError("No schema found in Avro file headers") @@ -178,7 +221,7 @@ def __enter__(self) -> AvroFile[D]: if not self.read_schema: self.read_schema = self.schema - self.reader = resolve_reader(self.schema, self.read_schema, self.read_types, self.read_enums) + self.reader = _cached_resolve_reader(self.schema, self.read_schema, self.read_types, self.read_enums) return self diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 5797e1f050..603788f4db 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -66,6 +66,7 @@ RecursiveDict, TableVersion, ) +from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.config import Config, merge_config from pyiceberg.utils.properties import property_as_bool from pyiceberg.view import View @@ -90,6 +91,7 @@ MANIFEST_LIST = "manifest list" PREVIOUS_METADATA = "previous metadata" METADATA = "metadata" +DATA_FILE = "data" URI = "uri" LOCATION = "location" EXTERNAL_TABLE = "EXTERNAL_TABLE" @@ -284,7 +286,7 @@ def list_catalogs() -> list[str]: def delete_files(io: FileIO, files_to_delete: set[str], file_type: str) -> None: - """Delete files. + """Delete files in parallel. Log warnings if failing to delete any file. @@ -293,32 +295,41 @@ def delete_files(io: FileIO, files_to_delete: set[str], file_type: str) -> None: files_to_delete: A set of file paths to be deleted. file_type: The type of the file. """ - for file in files_to_delete: + + def _delete_file(file: str) -> None: try: io.delete(file) except OSError: logger.warning(f"Failed to delete {file_type} file {file}", exc_info=logger.isEnabledFor(logging.DEBUG)) + executor = ExecutorFactory.get_or_create() + list(executor.map(_delete_file, files_to_delete)) + def delete_data_files(io: FileIO, manifests_to_delete: list[ManifestFile]) -> None: """Delete data files linked to given manifests. + Deduplicates manifests by path before reading entries, since the same manifest + appears across multiple snapshots' manifest lists. Deletes data files in parallel. + Log warnings if failing to delete any file. Args: io: The FileIO used to delete the object. manifests_to_delete: A list of manifest contains paths of data files to be deleted. """ - deleted_files: dict[str, bool] = {} + unique_manifests: dict[str, ManifestFile] = {} for manifest_file in manifests_to_delete: + unique_manifests.setdefault(manifest_file.manifest_path, manifest_file) + + # Collect all unique data file paths + data_file_paths: set[str] = set() + for manifest_file in unique_manifests.values(): for entry in manifest_file.fetch_manifest_entry(io, discard_deleted=False): - path = entry.data_file.file_path - if not deleted_files.get(path, False): - try: - io.delete(path) - except OSError: - logger.warning(f"Failed to delete data file {path}", exc_info=logger.isEnabledFor(logging.DEBUG)) - deleted_files[path] = True + data_file_paths.add(entry.data_file.file_path) + + # Delete in parallel + delete_files(io, data_file_paths, DATA_FILE) def _import_catalog(name: str, catalog_impl: str, properties: Properties) -> Catalog | None: