Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions compression/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,10 @@ under the License.
<artifactId>zstd-jni</artifactId>
<version>1.5.7-6</version>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.10.7</version>
</dependency>
</dependencies>
</project>
1 change: 1 addition & 0 deletions compression/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
requires org.apache.arrow.memory.core;
requires org.apache.arrow.vector;
requires org.apache.commons.compress;
requires snappy.java;

// Also defined under META-INF/services to support non-modular applications
provides CompressionCodec.Factory with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public CompressionCodec createCodec(CompressionUtil.CodecType codecType) {
switch (codecType) {
case LZ4_FRAME:
return new Lz4CompressionCodec();
case SNAPPY:
return new SnappyCompressionCodec();
case ZSTD:
return new ZstdCompressionCodec();
default:
Expand All @@ -45,6 +47,8 @@ public CompressionCodec createCodec(CompressionUtil.CodecType codecType, int com
switch (codecType) {
case LZ4_FRAME:
return new Lz4CompressionCodec();
case SNAPPY:
return new SnappyCompressionCodec();
case ZSTD:
return new ZstdCompressionCodec(compressionLevel);
default:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.compression;

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.compression.AbstractCompressionCodec;
import org.apache.arrow.vector.compression.CompressionUtil;
import org.xerial.snappy.Snappy;

/** Compression codec for the Snappy algorithm. */
public class SnappyCompressionCodec extends AbstractCompressionCodec {

@Override
protected ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
Preconditions.checkArgument(
uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
"The uncompressed buffer size exceeds the integer limit %s.",
Integer.MAX_VALUE);

byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
uncompressedBuffer.getBytes(/* index= */ 0, inBytes);

final byte[] outBytes;
try {
outBytes = Snappy.compress(inBytes);
} catch (Exception e) {
throw new RuntimeException("Error compressing with Snappy", e);
}

ArrowBuf compressedBuffer =
allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
compressedBuffer.setBytes(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes);
compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
return compressedBuffer;
}

@Override
protected ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
Preconditions.checkArgument(
compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
"The compressed buffer size exceeds the integer limit %s",
Integer.MAX_VALUE);

long decompressedLength = readUncompressedLength(compressedBuffer);

byte[] inBytes =
new byte
[(int) (compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH)];
compressedBuffer.getBytes(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, inBytes);

final byte[] outBytes;
try {
outBytes = Snappy.uncompress(inBytes);
} catch (Exception e) {
throw new RuntimeException("Error decompressing with Snappy", e);
}

if (outBytes.length != decompressedLength) {
throw new RuntimeException(
"Expected != actual decompressed length: "
+ decompressedLength
+ " != "
+ outBytes.length);
}

ArrowBuf decompressedBuffer = allocator.buffer(decompressedLength);
decompressedBuffer.setBytes(/* index= */ 0, outBytes);
decompressedBuffer.writerIndex(decompressedLength);
return decompressedBuffer;
}

@Override
public CompressionUtil.CodecType getCodecType() {
return CompressionUtil.CodecType.SNAPPY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ static Collection<Arguments> codecs() {
CompressionCodec lz4Codec = new Lz4CompressionCodec();
params.add(Arguments.arguments(len, lz4Codec));

CompressionCodec snappyCodec = new SnappyCompressionCodec();
params.add(Arguments.arguments(len, snappyCodec));

CompressionCodec zstdCodec = new ZstdCompressionCodec();
params.add(Arguments.arguments(len, zstdCodec));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ public final class CompressionType {
private CompressionType() { }
public static final byte LZ4_FRAME = 0;
public static final byte ZSTD = 1;
public static final byte SNAPPY = 2;

public static final String[] names = { "LZ4_FRAME", "ZSTD", };
public static final String[] names = { "LZ4_FRAME", "ZSTD", "SNAPPY", };

public static String name(int e) { return names[e]; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ public enum CodecType {

LZ4_FRAME(org.apache.arrow.flatbuf.CompressionType.LZ4_FRAME),

ZSTD(org.apache.arrow.flatbuf.CompressionType.ZSTD);
ZSTD(org.apache.arrow.flatbuf.CompressionType.ZSTD),

SNAPPY(org.apache.arrow.flatbuf.CompressionType.SNAPPY);

private final byte type;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public CompressionCodec createCodec(CompressionUtil.CodecType codecType) {
case NO_COMPRESSION:
return NoCompressionCodec.INSTANCE;
case LZ4_FRAME:
case SNAPPY:
case ZSTD:
throw new IllegalArgumentException(
"Please add arrow-compression module to use CommonsCompressionFactory for "
Expand Down
Loading