diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index f29214b458..6769831cb5 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -39,6 +39,7 @@ import org.apache.parquet.column.values.factory.ValuesWriterFactory; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; /** @@ -135,6 +136,7 @@ public static WriterVersion fromString(String name) { private final Map extraMetaData; private final ColumnProperty statistics; private final ColumnProperty sizeStatistics; + private final ColumnProperty compressionCodec; private ParquetProperties(Builder builder) { this.pageSizeThreshold = builder.pageSize; @@ -167,6 +169,7 @@ private ParquetProperties(Builder builder) { this.extraMetaData = builder.extraMetaData; this.statistics = builder.statistics.build(); this.sizeStatistics = builder.sizeStatistics.build(); + this.compressionCodec = builder.compressionCodec.build(); } public static Builder builder() { @@ -348,6 +351,14 @@ public int getBloomFilterCandidatesCount(ColumnDescriptor column) { return numBloomFilterCandidates.getValue(column); } + public CompressionCodecName getCompressionCodec(ColumnDescriptor column) { + return compressionCodec.getValue(column); + } + + public CompressionCodecName getDefaultCompressionCodec() { + return compressionCodec.getDefaultValue(); + } + public Map getExtraMetaData() { return extraMetaData; } @@ -419,6 +430,7 @@ public static class Builder { private Map extraMetaData = new HashMap<>(); private final ColumnProperty.Builder statistics; private final ColumnProperty.Builder sizeStatistics; + private final ColumnProperty.Builder compressionCodec; private Builder() { enableDict = ColumnProperty.builder().withDefaultValue(DEFAULT_IS_DICTIONARY_ENABLED); @@ -436,6 +448,8 @@ private Builder() { ColumnProperty.builder().withDefaultValue(DEFAULT_BLOOM_FILTER_CANDIDATES_NUMBER); statistics = ColumnProperty.builder().withDefaultValue(DEFAULT_STATISTICS_ENABLED); sizeStatistics = ColumnProperty.builder().withDefaultValue(DEFAULT_SIZE_STATISTICS_ENABLED); + compressionCodec = + ColumnProperty.builder().withDefaultValue(CompressionCodecName.UNCOMPRESSED); } private Builder(ParquetProperties toCopy) { @@ -460,6 +474,7 @@ private Builder(ParquetProperties toCopy) { this.extraMetaData = toCopy.extraMetaData; this.statistics = ColumnProperty.builder(toCopy.statistics); this.sizeStatistics = ColumnProperty.builder(toCopy.sizeStatistics); + this.compressionCodec = ColumnProperty.builder(toCopy.compressionCodec); } /** @@ -756,6 +771,32 @@ public Builder withSizeStatisticsEnabled(String columnPath, boolean enabled) { return this; } + /** + * Set the compression codec for the columns not specified by + * {@link #withCompressionCodec(String, CompressionCodecName)}. + * + * @param codecName the compression codec to use by default + * @return this builder for method chaining. + */ + public Builder withCompressionCodec(CompressionCodecName codecName) { + this.compressionCodec.withDefaultValue( + Objects.requireNonNull(codecName, "compressionCodecName cannot be null")); + return this; + } + + /** + * Set the compression codec for the specified column. + * + * @param columnPath the path of the column (dot-string) + * @param codecName the compression codec to use for the column + * @return this builder for method chaining. + */ + public Builder withCompressionCodec(String columnPath, CompressionCodecName codecName) { + this.compressionCodec.withValue( + columnPath, Objects.requireNonNull(codecName, "compressionCodecName cannot be null")); + return this; + } + public ParquetProperties build() { ParquetProperties properties = new ParquetProperties(this); // we pass a constructed but uninitialized factory to ParquetProperties above as currently diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java index d9e6ea0990..6b92f89df2 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -44,6 +44,7 @@ import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter; +import org.apache.parquet.compression.CompressionCodecFactory; import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor; import org.apache.parquet.crypto.AesCipher; import org.apache.parquet.crypto.InternalColumnEncryptionSetup; @@ -672,6 +673,83 @@ public ColumnChunkPageWriteStore( } } + /** + * Construct a page write store with per-column compression support. + * Each column's compression codec is resolved from {@code props} via + * {@link ParquetProperties#getCompressionCodec(ColumnDescriptor)}. + * + * @param codecFactory factory to create compressors for each codec + * @param props properties containing per-column compression configuration + * @param schema the message schema + * @param allocator byte buffer allocator + * @param columnIndexTruncateLength truncate length for column indexes + * @param pageWriteChecksumEnabled whether to write page checksums + * @param fileEncryptor file encryptor (null if not encrypted) + * @param rowGroupOrdinal row group ordinal + */ + public ColumnChunkPageWriteStore( + CompressionCodecFactory codecFactory, + ParquetProperties props, + MessageType schema, + ByteBufferAllocator allocator, + int columnIndexTruncateLength, + boolean pageWriteChecksumEnabled, + InternalFileEncryptor fileEncryptor, + int rowGroupOrdinal) { + this.schema = schema; + if (null == fileEncryptor) { + for (ColumnDescriptor path : schema.getColumns()) { + BytesInputCompressor compressor = codecFactory.getCompressor(props.getCompressionCodec(path)); + writers.put( + path, + new ColumnChunkPageWriter( + path, + compressor, + allocator, + columnIndexTruncateLength, + pageWriteChecksumEnabled, + null, + null, + null, + -1, + -1)); + } + return; + } + + // Encrypted file + int columnOrdinal = -1; + byte[] fileAAD = fileEncryptor.getFileAAD(); + for (ColumnDescriptor path : schema.getColumns()) { + columnOrdinal++; + BlockCipher.Encryptor headerBlockEncryptor = null; + BlockCipher.Encryptor pageBlockEncryptor = null; + ColumnPath columnPath = ColumnPath.get(path.getPath()); + + BytesInputCompressor compressor = codecFactory.getCompressor(props.getCompressionCodec(path)); + + InternalColumnEncryptionSetup columnSetup = fileEncryptor.getColumnSetup(columnPath, true, columnOrdinal); + if (columnSetup.isEncrypted()) { + headerBlockEncryptor = columnSetup.getMetaDataEncryptor(); + pageBlockEncryptor = columnSetup.getDataEncryptor(); + } + + writers.put( + path, + new ColumnChunkPageWriter( + path, + compressor, + allocator, + columnIndexTruncateLength, + pageWriteChecksumEnabled, + headerBlockEncryptor, + pageBlockEncryptor, + fileAAD, + rowGroupOrdinal, + columnOrdinal)); + } + } + @Override public PageWriter getPageWriter(ColumnDescriptor path) { return writers.get(path); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index 41b068d01a..5a42f1ba43 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -28,6 +28,7 @@ import org.apache.parquet.column.ColumnWriteStore; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; +import org.apache.parquet.compression.CompressionCodecFactory; import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor; import org.apache.parquet.crypto.InternalFileEncryptor; import org.apache.parquet.hadoop.api.WriteSupport; @@ -52,6 +53,7 @@ class InternalParquetRecordWriter { private final int rowGroupRecordCountThreshold; private long nextRowGroupSize; private final BytesInputCompressor compressor; + private final CompressionCodecFactory codecFactory; private final boolean validating; private final ParquetProperties props; @@ -77,7 +79,9 @@ class InternalParquetRecordWriter { * @param extraMetaData extra meta data to write in the footer of the file * @param rowGroupSize the size of a block in the file (this will be approximate) * @param compressor the codec used to compress + * @deprecated Use {@link #InternalParquetRecordWriter(ParquetFileWriter, WriteSupport, MessageType, Map, long, CompressionCodecFactory, boolean, ParquetProperties)} for per-column compression support */ + @Deprecated public InternalParquetRecordWriter( ParquetFileWriter parquetFileWriter, WriteSupport writeSupport, @@ -95,6 +99,41 @@ public InternalParquetRecordWriter( this.rowGroupRecordCountThreshold = props.getRowGroupRowCountLimit(); this.nextRowGroupSize = rowGroupSizeThreshold; this.compressor = compressor; + this.codecFactory = null; + this.validating = validating; + this.props = props; + this.fileEncryptor = parquetFileWriter.getEncryptor(); + this.rowGroupOrdinal = 0; + initStore(); + recordCountForNextMemCheck = props.getMinRowCountForPageSizeCheck(); + } + + /** + * @param parquetFileWriter the file to write to + * @param writeSupport the class to convert incoming records + * @param schema the schema of the records + * @param extraMetaData extra meta data to write in the footer of the file + * @param rowGroupSize the size of a block in the file (this will be approximate) + * @param codecFactory the codec factory for per-column compression + */ + public InternalParquetRecordWriter( + ParquetFileWriter parquetFileWriter, + WriteSupport writeSupport, + MessageType schema, + Map extraMetaData, + long rowGroupSize, + CompressionCodecFactory codecFactory, + boolean validating, + ParquetProperties props) { + this.parquetFileWriter = parquetFileWriter; + this.writeSupport = Objects.requireNonNull(writeSupport, "writeSupport cannot be null"); + this.schema = schema; + this.extraMetaData = extraMetaData; + this.rowGroupSizeThreshold = rowGroupSize; + this.rowGroupRecordCountThreshold = props.getRowGroupRowCountLimit(); + this.nextRowGroupSize = rowGroupSizeThreshold; + this.compressor = null; + this.codecFactory = codecFactory; this.validating = validating; this.props = props; this.fileEncryptor = parquetFileWriter.getEncryptor(); @@ -108,14 +147,27 @@ public ParquetMetadata getFooter() { } private void initStore() { - ColumnChunkPageWriteStore columnChunkPageWriteStore = new ColumnChunkPageWriteStore( - compressor, - schema, - props.getAllocator(), - props.getColumnIndexTruncateLength(), - props.getPageWriteChecksumEnabled(), - fileEncryptor, - rowGroupOrdinal); + ColumnChunkPageWriteStore columnChunkPageWriteStore; + if (codecFactory != null) { + columnChunkPageWriteStore = new ColumnChunkPageWriteStore( + codecFactory, + props, + schema, + props.getAllocator(), + props.getColumnIndexTruncateLength(), + props.getPageWriteChecksumEnabled(), + fileEncryptor, + rowGroupOrdinal); + } else { + columnChunkPageWriteStore = new ColumnChunkPageWriteStore( + compressor, + schema, + props.getAllocator(), + props.getColumnIndexTruncateLength(), + props.getPageWriteChecksumEnabled(), + fileEncryptor, + rowGroupOrdinal); + } pageStore = columnChunkPageWriteStore; bloomFilterWriteStore = columnChunkPageWriteStore; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index 868ae634c1..2fc65ac78b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -546,6 +546,10 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp STATISTICS_ENABLED, key -> conf.getBoolean(key, ParquetProperties.DEFAULT_STATISTICS_ENABLED), propsBuilder::withStatisticsEnabled) + .withColumnConfig( + COMPRESSION, + key -> CompressionCodecName.fromConf(conf.get(key)), + propsBuilder::withCompressionCodec) .parseConfig(conf); ParquetProperties props = propsBuilder.build(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java index 51528b10be..ca267f25e7 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java @@ -201,15 +201,11 @@ public ParquetRecordWriter( MemoryManager memoryManager, Configuration conf) { this.codecFactory = new CodecFactory(conf, props.getPageSizeThreshold()); + // Ensure the default compression codec from ParquetOutputFormat is set in props + ParquetProperties propsWithCodec = + ParquetProperties.copy(props).withCompressionCodec(codec).build(); internalWriter = new InternalParquetRecordWriter( - w, - writeSupport, - schema, - extraMetaData, - blockSize, - codecFactory.getCompressor(codec), - validating, - props); + w, writeSupport, schema, extraMetaData, blockSize, codecFactory, validating, propsWithCodec); this.memoryManager = Objects.requireNonNull(memoryManager, "memoryManager cannot be null"); memoryManager.addWriter(internalWriter, blockSize); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 8eb5f7f17b..424d3af69a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -395,7 +395,6 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport fileWriter.start(); this.codecFactory = codecFactory; - CompressionCodecFactory.BytesInputCompressor compressor = codecFactory.getCompressor(compressionCodecName); final Map extraMetadata; if (encodingProps.getExtraMetaData() == null @@ -418,7 +417,7 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport } this.writer = new InternalParquetRecordWriter( - fileWriter, writeSupport, schema, extraMetadata, rowGroupSize, compressor, validating, encodingProps); + fileWriter, writeSupport, schema, extraMetadata, rowGroupSize, codecFactory, validating, encodingProps); } public void write(T object) throws IOException { @@ -559,13 +558,29 @@ public SELF withWriteMode(ParquetFileWriter.Mode mode) { /** * Set the {@link CompressionCodecName compression codec} used by the - * constructed writer. + * constructed writer. This sets the default compression codec for all columns. + * Per-column overrides can be set with {@link #withCompressionCodec(String, CompressionCodecName)}. * * @param codecName a {@code CompressionCodecName} * @return this builder for method chaining. */ public SELF withCompressionCodec(CompressionCodecName codecName) { this.codecName = codecName; + encodingPropsBuilder.withCompressionCodec(codecName); + return self(); + } + + /** + * Set the {@link CompressionCodecName compression codec} for a specific column. + * Columns not explicitly configured will use the default codec set by + * {@link #withCompressionCodec(CompressionCodecName)}. + * + * @param columnPath the path of the column (dot-string) + * @param codecName the compression codec to use for the column + * @return this builder for method chaining. + */ + public SELF withCompressionCodec(String columnPath, CompressionCodecName codecName) { + encodingPropsBuilder.withCompressionCodec(columnPath, codecName); return self(); } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index a7888b58d8..36b472af3a 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -27,6 +27,8 @@ import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; import static org.apache.parquet.hadoop.ParquetFileReader.readFooter; import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType; import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; @@ -858,4 +860,181 @@ public void testNoFlushAfterException() throws Exception { FileSystem fs = file.getFileSystem(conf); assertTrue(!fs.exists(file) || fs.getFileStatus(file).getLen() == 0); } + + @Test + public void testPerColumnCompression() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY) + .as(stringType()) + .named("name") + .required(INT32) + .named("id") + .required(BINARY) + .named("embeddings") + .named("test_schema"); + + File file = temp.newFile(); + temp.delete(); + Path path = new Path(file.getAbsolutePath()); + + SimpleGroupFactory factory = new SimpleGroupFactory(schema); + + // Write with per-column compression: SNAPPY default, embeddings UNCOMPRESSED + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withType(schema) + .withCompressionCodec(SNAPPY) + .withCompressionCodec("embeddings", UNCOMPRESSED) + .build()) { + for (int i = 0; i < 1000; i++) { + writer.write(factory.newGroup() + .append("name", "name_" + i) + .append("id", i) + .append("embeddings", "embedding_data_" + i)); + } + } + + // Verify data integrity via round-trip read + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), path).build()) { + for (int i = 0; i < 1000; i++) { + Group group = reader.read(); + assertEquals("name_" + i, group.getString("name", 0)); + assertEquals(i, group.getInteger("id", 0)); + assertEquals("embedding_data_" + i, group.getString("embeddings", 0)); + } + assertNull(reader.read()); + } + + // Verify per-column codec in footer metadata + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) { + for (BlockMetaData block : reader.getFooter().getBlocks()) { + for (ColumnChunkMetaData column : block.getColumns()) { + if (column.getPath().toDotString().equals("embeddings")) { + assertEquals("embeddings column should be UNCOMPRESSED", UNCOMPRESSED, column.getCodec()); + } else { + assertEquals( + column.getPath().toDotString() + " column should be SNAPPY", SNAPPY, column.getCodec()); + } + } + } + } + } + + @Test + public void testPerColumnCompressionMultipleCodecs() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY) + .as(stringType()) + .named("col_a") + .required(BINARY) + .as(stringType()) + .named("col_b") + .required(BINARY) + .as(stringType()) + .named("col_c") + .named("test_schema"); + + File file = temp.newFile(); + temp.delete(); + Path path = new Path(file.getAbsolutePath()); + + SimpleGroupFactory factory = new SimpleGroupFactory(schema); + + // Write with three different codecs + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withType(schema) + .withCompressionCodec(GZIP) + .withCompressionCodec("col_a", SNAPPY) + .withCompressionCodec("col_c", UNCOMPRESSED) + .build()) { + for (int i = 0; i < 100; i++) { + writer.write(factory.newGroup() + .append("col_a", "a_" + i) + .append("col_b", "b_" + i) + .append("col_c", "c_" + i)); + } + } + + // Verify data integrity + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), path).build()) { + for (int i = 0; i < 100; i++) { + Group group = reader.read(); + assertEquals("a_" + i, group.getString("col_a", 0)); + assertEquals("b_" + i, group.getString("col_b", 0)); + assertEquals("c_" + i, group.getString("col_c", 0)); + } + assertNull(reader.read()); + } + + // Verify per-column codecs + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) { + for (BlockMetaData block : reader.getFooter().getBlocks()) { + for (ColumnChunkMetaData column : block.getColumns()) { + switch (column.getPath().toDotString()) { + case "col_a": + assertEquals(SNAPPY, column.getCodec()); + break; + case "col_b": + assertEquals(GZIP, column.getCodec()); + break; + case "col_c": + assertEquals(UNCOMPRESSED, column.getCodec()); + break; + } + } + } + } + } + + @Test + public void testDefaultCompressionOnlyBackwardsCompatible() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY) + .as(stringType()) + .named("name") + .required(INT32) + .named("id") + .named("test_schema"); + + File file = temp.newFile(); + temp.delete(); + Path path = new Path(file.getAbsolutePath()); + + SimpleGroupFactory factory = new SimpleGroupFactory(schema); + + // Write with only the default codec (no per-column overrides) + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withType(schema) + .withCompressionCodec(SNAPPY) + .build()) { + for (int i = 0; i < 100; i++) { + writer.write(factory.newGroup().append("name", "test_" + i).append("id", i)); + } + } + + // Verify all columns use the default codec + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) { + for (BlockMetaData block : reader.getFooter().getBlocks()) { + for (ColumnChunkMetaData column : block.getColumns()) { + assertEquals( + column.getPath().toDotString() + " should use default SNAPPY", SNAPPY, column.getCodec()); + } + } + } + + // Verify data integrity + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), path).build()) { + for (int i = 0; i < 100; i++) { + Group group = reader.read(); + assertEquals("test_" + i, group.getString("name", 0)); + assertEquals(i, group.getInteger("id", 0)); + } + assertNull(reader.read()); + } + } }