Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.parquet.column.values.factory.ValuesWriterFactory;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;

/**
Expand Down Expand Up @@ -135,6 +136,7 @@ public static WriterVersion fromString(String name) {
private final Map<String, String> extraMetaData;
private final ColumnProperty<Boolean> statistics;
private final ColumnProperty<Boolean> sizeStatistics;
private final ColumnProperty<CompressionCodecName> compressionCodec;

private ParquetProperties(Builder builder) {
this.pageSizeThreshold = builder.pageSize;
Expand Down Expand Up @@ -167,6 +169,7 @@ private ParquetProperties(Builder builder) {
this.extraMetaData = builder.extraMetaData;
this.statistics = builder.statistics.build();
this.sizeStatistics = builder.sizeStatistics.build();
this.compressionCodec = builder.compressionCodec.build();
}

public static Builder builder() {
Expand Down Expand Up @@ -348,6 +351,14 @@ public int getBloomFilterCandidatesCount(ColumnDescriptor column) {
return numBloomFilterCandidates.getValue(column);
}

public CompressionCodecName getCompressionCodec(ColumnDescriptor column) {
return compressionCodec.getValue(column);
}

public CompressionCodecName getDefaultCompressionCodec() {
return compressionCodec.getDefaultValue();
}

public Map<String, String> getExtraMetaData() {
return extraMetaData;
}
Expand Down Expand Up @@ -419,6 +430,7 @@ public static class Builder {
private Map<String, String> extraMetaData = new HashMap<>();
private final ColumnProperty.Builder<Boolean> statistics;
private final ColumnProperty.Builder<Boolean> sizeStatistics;
private final ColumnProperty.Builder<CompressionCodecName> compressionCodec;

private Builder() {
enableDict = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_IS_DICTIONARY_ENABLED);
Expand All @@ -436,6 +448,8 @@ private Builder() {
ColumnProperty.<Integer>builder().withDefaultValue(DEFAULT_BLOOM_FILTER_CANDIDATES_NUMBER);
statistics = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_STATISTICS_ENABLED);
sizeStatistics = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_SIZE_STATISTICS_ENABLED);
compressionCodec =
ColumnProperty.<CompressionCodecName>builder().withDefaultValue(CompressionCodecName.UNCOMPRESSED);
}

private Builder(ParquetProperties toCopy) {
Expand All @@ -460,6 +474,7 @@ private Builder(ParquetProperties toCopy) {
this.extraMetaData = toCopy.extraMetaData;
this.statistics = ColumnProperty.builder(toCopy.statistics);
this.sizeStatistics = ColumnProperty.builder(toCopy.sizeStatistics);
this.compressionCodec = ColumnProperty.builder(toCopy.compressionCodec);
}

/**
Expand Down Expand Up @@ -756,6 +771,32 @@ public Builder withSizeStatisticsEnabled(String columnPath, boolean enabled) {
return this;
}

/**
* Set the compression codec for the columns not specified by
* {@link #withCompressionCodec(String, CompressionCodecName)}.
*
* @param codecName the compression codec to use by default
* @return this builder for method chaining.
*/
public Builder withCompressionCodec(CompressionCodecName codecName) {
this.compressionCodec.withDefaultValue(
Objects.requireNonNull(codecName, "compressionCodecName cannot be null"));
return this;
}

/**
* Set the compression codec for the specified column.
*
* @param columnPath the path of the column (dot-string)
* @param codecName the compression codec to use for the column
* @return this builder for method chaining.
*/
public Builder withCompressionCodec(String columnPath, CompressionCodecName codecName) {
this.compressionCodec.withValue(
columnPath, Objects.requireNonNull(codecName, "compressionCodecName cannot be null"));
return this;
}

public ParquetProperties build() {
ParquetProperties properties = new ParquetProperties(this);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor;
import org.apache.parquet.crypto.AesCipher;
import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
Expand Down Expand Up @@ -672,6 +673,83 @@ public ColumnChunkPageWriteStore(
}
}

/**
* Construct a page write store with per-column compression support.
* Each column's compression codec is resolved from {@code props} via
* {@link ParquetProperties#getCompressionCodec(ColumnDescriptor)}.
*
* @param codecFactory factory to create compressors for each codec
* @param props properties containing per-column compression configuration
* @param schema the message schema
* @param allocator byte buffer allocator
* @param columnIndexTruncateLength truncate length for column indexes
* @param pageWriteChecksumEnabled whether to write page checksums
* @param fileEncryptor file encryptor (null if not encrypted)
* @param rowGroupOrdinal row group ordinal
*/
public ColumnChunkPageWriteStore(
CompressionCodecFactory codecFactory,
ParquetProperties props,
MessageType schema,
ByteBufferAllocator allocator,
int columnIndexTruncateLength,
boolean pageWriteChecksumEnabled,
InternalFileEncryptor fileEncryptor,
int rowGroupOrdinal) {
this.schema = schema;
if (null == fileEncryptor) {
for (ColumnDescriptor path : schema.getColumns()) {
BytesInputCompressor compressor = codecFactory.getCompressor(props.getCompressionCodec(path));
writers.put(
path,
new ColumnChunkPageWriter(
path,
compressor,
allocator,
columnIndexTruncateLength,
pageWriteChecksumEnabled,
null,
null,
null,
-1,
-1));
}
return;
}

// Encrypted file
int columnOrdinal = -1;
byte[] fileAAD = fileEncryptor.getFileAAD();
for (ColumnDescriptor path : schema.getColumns()) {
columnOrdinal++;
BlockCipher.Encryptor headerBlockEncryptor = null;
BlockCipher.Encryptor pageBlockEncryptor = null;
ColumnPath columnPath = ColumnPath.get(path.getPath());

BytesInputCompressor compressor = codecFactory.getCompressor(props.getCompressionCodec(path));

InternalColumnEncryptionSetup columnSetup = fileEncryptor.getColumnSetup(columnPath, true, columnOrdinal);
if (columnSetup.isEncrypted()) {
headerBlockEncryptor = columnSetup.getMetaDataEncryptor();
pageBlockEncryptor = columnSetup.getDataEncryptor();
}

writers.put(
path,
new ColumnChunkPageWriter(
path,
compressor,
allocator,
columnIndexTruncateLength,
pageWriteChecksumEnabled,
headerBlockEncryptor,
pageBlockEncryptor,
fileAAD,
rowGroupOrdinal,
columnOrdinal));
}
}

@Override
public PageWriter getPageWriter(ColumnDescriptor path) {
return writers.get(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor;
import org.apache.parquet.crypto.InternalFileEncryptor;
import org.apache.parquet.hadoop.api.WriteSupport;
Expand All @@ -52,6 +53,7 @@ class InternalParquetRecordWriter<T> {
private final int rowGroupRecordCountThreshold;
private long nextRowGroupSize;
private final BytesInputCompressor compressor;
private final CompressionCodecFactory codecFactory;
private final boolean validating;
private final ParquetProperties props;

Expand All @@ -77,7 +79,9 @@ class InternalParquetRecordWriter<T> {
* @param extraMetaData extra meta data to write in the footer of the file
* @param rowGroupSize the size of a block in the file (this will be approximate)
* @param compressor the codec used to compress
* @deprecated Use {@link #InternalParquetRecordWriter(ParquetFileWriter, WriteSupport, MessageType, Map, long, CompressionCodecFactory, boolean, ParquetProperties)} for per-column compression support
*/
@Deprecated
public InternalParquetRecordWriter(
ParquetFileWriter parquetFileWriter,
WriteSupport<T> writeSupport,
Expand All @@ -95,6 +99,41 @@ public InternalParquetRecordWriter(
this.rowGroupRecordCountThreshold = props.getRowGroupRowCountLimit();
this.nextRowGroupSize = rowGroupSizeThreshold;
this.compressor = compressor;
this.codecFactory = null;
this.validating = validating;
this.props = props;
this.fileEncryptor = parquetFileWriter.getEncryptor();
this.rowGroupOrdinal = 0;
initStore();
recordCountForNextMemCheck = props.getMinRowCountForPageSizeCheck();
}

/**
* @param parquetFileWriter the file to write to
* @param writeSupport the class to convert incoming records
* @param schema the schema of the records
* @param extraMetaData extra meta data to write in the footer of the file
* @param rowGroupSize the size of a block in the file (this will be approximate)
* @param codecFactory the codec factory for per-column compression
*/
public InternalParquetRecordWriter(
ParquetFileWriter parquetFileWriter,
WriteSupport<T> writeSupport,
MessageType schema,
Map<String, String> extraMetaData,
long rowGroupSize,
CompressionCodecFactory codecFactory,
boolean validating,
ParquetProperties props) {
this.parquetFileWriter = parquetFileWriter;
this.writeSupport = Objects.requireNonNull(writeSupport, "writeSupport cannot be null");
this.schema = schema;
this.extraMetaData = extraMetaData;
this.rowGroupSizeThreshold = rowGroupSize;
this.rowGroupRecordCountThreshold = props.getRowGroupRowCountLimit();
this.nextRowGroupSize = rowGroupSizeThreshold;
this.compressor = null;
this.codecFactory = codecFactory;
this.validating = validating;
this.props = props;
this.fileEncryptor = parquetFileWriter.getEncryptor();
Expand All @@ -108,14 +147,27 @@ public ParquetMetadata getFooter() {
}

private void initStore() {
ColumnChunkPageWriteStore columnChunkPageWriteStore = new ColumnChunkPageWriteStore(
compressor,
schema,
props.getAllocator(),
props.getColumnIndexTruncateLength(),
props.getPageWriteChecksumEnabled(),
fileEncryptor,
rowGroupOrdinal);
ColumnChunkPageWriteStore columnChunkPageWriteStore;
if (codecFactory != null) {
columnChunkPageWriteStore = new ColumnChunkPageWriteStore(
codecFactory,
props,
schema,
props.getAllocator(),
props.getColumnIndexTruncateLength(),
props.getPageWriteChecksumEnabled(),
fileEncryptor,
rowGroupOrdinal);
} else {
columnChunkPageWriteStore = new ColumnChunkPageWriteStore(
compressor,
schema,
props.getAllocator(),
props.getColumnIndexTruncateLength(),
props.getPageWriteChecksumEnabled(),
fileEncryptor,
rowGroupOrdinal);
}
pageStore = columnChunkPageWriteStore;
bloomFilterWriteStore = columnChunkPageWriteStore;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,10 @@ public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, Comp
STATISTICS_ENABLED,
key -> conf.getBoolean(key, ParquetProperties.DEFAULT_STATISTICS_ENABLED),
propsBuilder::withStatisticsEnabled)
.withColumnConfig(
COMPRESSION,
key -> CompressionCodecName.fromConf(conf.get(key)),
propsBuilder::withCompressionCodec)
.parseConfig(conf);

ParquetProperties props = propsBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,11 @@ public ParquetRecordWriter(
MemoryManager memoryManager,
Configuration conf) {
this.codecFactory = new CodecFactory(conf, props.getPageSizeThreshold());
// Ensure the default compression codec from ParquetOutputFormat is set in props
ParquetProperties propsWithCodec =
ParquetProperties.copy(props).withCompressionCodec(codec).build();
internalWriter = new InternalParquetRecordWriter<T>(
w,
writeSupport,
schema,
extraMetaData,
blockSize,
codecFactory.getCompressor(codec),
validating,
props);
w, writeSupport, schema, extraMetaData, blockSize, codecFactory, validating, propsWithCodec);
this.memoryManager = Objects.requireNonNull(memoryManager, "memoryManager cannot be null");
memoryManager.addWriter(internalWriter, blockSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,6 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport<T> writeSupport
fileWriter.start();

this.codecFactory = codecFactory;
CompressionCodecFactory.BytesInputCompressor compressor = codecFactory.getCompressor(compressionCodecName);

final Map<String, String> extraMetadata;
if (encodingProps.getExtraMetaData() == null
Expand All @@ -418,7 +417,7 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport<T> writeSupport
}

this.writer = new InternalParquetRecordWriter<T>(
fileWriter, writeSupport, schema, extraMetadata, rowGroupSize, compressor, validating, encodingProps);
fileWriter, writeSupport, schema, extraMetadata, rowGroupSize, codecFactory, validating, encodingProps);
}

public void write(T object) throws IOException {
Expand Down Expand Up @@ -559,13 +558,29 @@ public SELF withWriteMode(ParquetFileWriter.Mode mode) {

/**
* Set the {@link CompressionCodecName compression codec} used by the
* constructed writer.
* constructed writer. This sets the default compression codec for all columns.
* Per-column overrides can be set with {@link #withCompressionCodec(String, CompressionCodecName)}.
*
* @param codecName a {@code CompressionCodecName}
* @return this builder for method chaining.
*/
public SELF withCompressionCodec(CompressionCodecName codecName) {
this.codecName = codecName;
encodingPropsBuilder.withCompressionCodec(codecName);
return self();
}

/**
* Set the {@link CompressionCodecName compression codec} for a specific column.
* Columns not explicitly configured will use the default codec set by
* {@link #withCompressionCodec(CompressionCodecName)}.
*
* @param columnPath the path of the column (dot-string)
* @param codecName the compression codec to use for the column
* @return this builder for method chaining.
*/
public SELF withCompressionCodec(String columnPath, CompressionCodecName codecName) {
encodingPropsBuilder.withCompressionCodec(columnPath, codecName);
return self();
}

Expand Down
Loading