From 342827f7920f569b5e588d53e12259b0d56cff73 Mon Sep 17 00:00:00 2001 From: Pedro Matias Date: Tue, 24 Feb 2026 19:03:53 +0000 Subject: [PATCH 1/8] GH-1035: Support custom_metadata on RecordBatch --- .../vector/ipc/message/ArrowRecordBatch.java | 83 +++++++++++++++- .../vector/ipc/message/MessageSerializer.java | 97 ++++++++++++++++++- .../vector/ipc/MessageSerializerTest.java | 40 ++++++++ 3 files changed, 214 insertions(+), 6 deletions(-) diff --git a/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java b/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java index bc6bfa8c86..5d328b378b 100644 --- a/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java +++ b/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java @@ -19,7 +19,9 @@ import com.google.flatbuffers.FlatBufferBuilder; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import org.apache.arrow.flatbuf.RecordBatch; import org.apache.arrow.memory.ArrowBuf; @@ -52,6 +54,8 @@ public class ArrowRecordBatch implements ArrowMessage { private final List variadicBufferCounts; + private final Map customMetadata; + private boolean closed = false; public ArrowRecordBatch(int length, List nodes, List buffers) { @@ -66,6 +70,30 @@ public ArrowRecordBatch( this(length, nodes, buffers, bodyCompression, null, true); } + /** + * Construct a record batch from nodes with custom metadata. + * + * @param length how many rows in this batch + * @param nodes field level info + * @param buffers will be retained until this recordBatch is closed + * @param customMetadata custom metadata for this record batch + */ + public ArrowRecordBatch( + int length, + List nodes, + List buffers, + Map customMetadata) { + this( + length, + nodes, + buffers, + NoCompressionCodec.DEFAULT_BODY_COMPRESSION, + null, + true, + true, + customMetadata); + } + /** * Construct a record batch from nodes. * @@ -152,6 +180,39 @@ public ArrowRecordBatch( List variadicBufferCounts, boolean alignBuffers, boolean retainBuffers) { + this( + length, + nodes, + buffers, + bodyCompression, + variadicBufferCounts, + alignBuffers, + retainBuffers, + null); + } + + /** + * Construct a record batch from nodes. + * + * @param length how many rows in this batch + * @param nodes field level info + * @param buffers will be retained until this recordBatch is closed + * @param bodyCompression compression info. + * @param variadicBufferCounts the number of buffers in each variadic section. + * @param alignBuffers Whether to align buffers to an 8 byte boundary. + * @param retainBuffers Whether to retain() each source buffer in the constructor. If false, the + * caller is responsible for retaining the buffers beforehand. + * @param customMetadata custom metadata for this record batch. + */ + public ArrowRecordBatch( + int length, + List nodes, + List buffers, + ArrowBodyCompression bodyCompression, + List variadicBufferCounts, + boolean alignBuffers, + boolean retainBuffers, + Map customMetadata) { super(); this.length = length; this.nodes = nodes; @@ -159,6 +220,10 @@ public ArrowRecordBatch( Preconditions.checkArgument(bodyCompression != null, "body compression cannot be null"); this.bodyCompression = bodyCompression; this.variadicBufferCounts = variadicBufferCounts; + this.customMetadata = + customMetadata == null + ? Collections.emptyMap() + : Collections.unmodifiableMap(new HashMap<>(customMetadata)); List arrowBuffers = new ArrayList<>(buffers.size()); long offset = 0; for (ArrowBuf arrowBuf : buffers) { @@ -188,13 +253,18 @@ private ArrowRecordBatch( List nodes, List buffers, ArrowBodyCompression bodyCompression, - List variadicBufferCounts) { + List variadicBufferCounts, + Map customMetadata) { this.length = length; this.nodes = nodes; this.buffers = buffers; Preconditions.checkArgument(bodyCompression != null, "body compression cannot be null"); this.bodyCompression = bodyCompression; this.variadicBufferCounts = variadicBufferCounts; + this.customMetadata = + customMetadata == null + ? Collections.emptyMap() + : Collections.unmodifiableMap(new HashMap<>(customMetadata)); this.closed = false; List arrowBuffers = new ArrayList<>(); long offset = 0; @@ -218,6 +288,15 @@ public ArrowBodyCompression getBodyCompression() { return bodyCompression; } + /** + * Get the custom metadata for this record batch. + * + * @return the custom metadata as an unmodifiable map + */ + public Map getCustomMetadata() { + return customMetadata; + } + /** * Get the nodes in this record batch. * @@ -268,7 +347,7 @@ public ArrowRecordBatch cloneWithTransfer(final BufferAllocator allocator) { .collect(Collectors.toList()); close(); return new ArrowRecordBatch( - false, length, nodes, newBufs, bodyCompression, variadicBufferCounts); + false, length, nodes, newBufs, bodyCompression, variadicBufferCounts, customMetadata); } /** diff --git a/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java b/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java index 36f6ea449b..c7575ee19c 100644 --- a/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java +++ b/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java @@ -22,10 +22,13 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.arrow.flatbuf.Buffer; import org.apache.arrow.flatbuf.DictionaryBatch; import org.apache.arrow.flatbuf.FieldNode; +import org.apache.arrow.flatbuf.KeyValue; import org.apache.arrow.flatbuf.Message; import org.apache.arrow.flatbuf.MessageHeader; import org.apache.arrow.flatbuf.MetadataVersion; @@ -325,8 +328,17 @@ public static ByteBuffer serializeMetadata(ArrowMessage message) { public static ByteBuffer serializeMetadata(ArrowMessage message, IpcOption writeOption) { FlatBufferBuilder builder = new FlatBufferBuilder(); int batchOffset = message.writeTo(builder); + Map customMetadata = null; + if (message instanceof ArrowRecordBatch) { + customMetadata = ((ArrowRecordBatch) message).getCustomMetadata(); + } return serializeMessage( - builder, message.getMessageType(), batchOffset, message.computeBodyLength(), writeOption); + builder, + message.getMessageType(), + batchOffset, + message.computeBodyLength(), + writeOption, + customMetadata); } /** @@ -340,7 +352,18 @@ public static ByteBuffer serializeMetadata(ArrowMessage message, IpcOption write public static ArrowRecordBatch deserializeRecordBatch( Message recordBatchMessage, ArrowBuf bodyBuffer) throws IOException { RecordBatch recordBatchFB = (RecordBatch) recordBatchMessage.header(new RecordBatch()); - return deserializeRecordBatch(recordBatchFB, bodyBuffer); + // Extract custom metadata from the Message + Map customMetadata = null; + if (recordBatchMessage.customMetadataLength() > 0) { + customMetadata = new HashMap<>(); + for (int i = 0; i < recordBatchMessage.customMetadataLength(); i++) { + KeyValue kv = recordBatchMessage.customMetadata(i); + if (kv != null && kv.key() != null && kv.value() != null) { + customMetadata.put(kv.key(), kv.value()); + } + } + } + return deserializeRecordBatch(recordBatchFB, bodyBuffer, customMetadata); } /** @@ -395,10 +418,22 @@ public static ArrowRecordBatch deserializeRecordBatch( RecordBatch recordBatchFB = (RecordBatch) messageFB.header(new RecordBatch()); + // Extract custom metadata from the Message + Map customMetadata = null; + if (messageFB.customMetadataLength() > 0) { + customMetadata = new HashMap<>(); + for (int i = 0; i < messageFB.customMetadataLength(); i++) { + KeyValue kv = messageFB.customMetadata(i); + if (kv != null && kv.key() != null && kv.value() != null) { + customMetadata.put(kv.key(), kv.value()); + } + } + } + // Now read the body final ArrowBuf body = buffer.slice(block.getMetadataLength(), totalLen - block.getMetadataLength()); - return deserializeRecordBatch(recordBatchFB, body); + return deserializeRecordBatch(recordBatchFB, body, customMetadata); } /** @@ -411,6 +446,22 @@ public static ArrowRecordBatch deserializeRecordBatch( */ public static ArrowRecordBatch deserializeRecordBatch(RecordBatch recordBatchFB, ArrowBuf body) throws IOException { + return deserializeRecordBatch(recordBatchFB, body, null); + } + + /** + * Deserializes an ArrowRecordBatch given the Flatbuffer metadata, in-memory body, and custom + * metadata. + * + * @param recordBatchFB Deserialized FlatBuffer record batch + * @param body Read body of the record batch + * @param customMetadata Custom metadata from the Message + * @return ArrowRecordBatch from metadata and in-memory body + * @throws IOException on error + */ + public static ArrowRecordBatch deserializeRecordBatch( + RecordBatch recordBatchFB, ArrowBuf body, Map customMetadata) + throws IOException { // Now read the body int nodesLength = recordBatchFB.nodesLength(); List nodes = new ArrayList<>(); @@ -452,7 +503,9 @@ public static ArrowRecordBatch deserializeRecordBatch(RecordBatch recordBatchFB, buffers, bodyCompression, variadicBufferCounts, - /*alignBuffers*/ true); + /*alignBuffers*/ true, + /*retainBuffers*/ true, + customMetadata); body.getReferenceManager().release(); return arrowRecordBatch; } @@ -676,11 +729,47 @@ public static ByteBuffer serializeMessage( int headerOffset, long bodyLength, IpcOption writeOption) { + return serializeMessage(builder, headerType, headerOffset, bodyLength, writeOption, null); + } + + /** + * Serializes an Arrow message with metadata and custom metadata into a ByteBuffer. + * + * @param builder to write the flatbuf to + * @param headerType the type of the header + * @param headerOffset the offset in the buffer where the header starts + * @param bodyLength the length of the body + * @param writeOption IPC write options + * @param customMetadata custom metadata to attach to the message + * @return the corresponding ByteBuffer + */ + public static ByteBuffer serializeMessage( + FlatBufferBuilder builder, + byte headerType, + int headerOffset, + long bodyLength, + IpcOption writeOption, + Map customMetadata) { + int customMetadataOffset = 0; + if (customMetadata != null && !customMetadata.isEmpty()) { + int[] metadataOffsets = new int[customMetadata.size()]; + int i = 0; + for (Map.Entry entry : customMetadata.entrySet()) { + int keyOffset = builder.createString(entry.getKey()); + int valueOffset = builder.createString(entry.getValue()); + metadataOffsets[i++] = KeyValue.createKeyValue(builder, keyOffset, valueOffset); + } + customMetadataOffset = Message.createCustomMetadataVector(builder, metadataOffsets); + } + Message.startMessage(builder); Message.addHeaderType(builder, headerType); Message.addHeader(builder, headerOffset); Message.addVersion(builder, writeOption.metadataVersion.toFlatbufID()); Message.addBodyLength(builder, bodyLength); + if (customMetadataOffset != 0) { + Message.addCustomMetadata(builder, customMetadataOffset); + } builder.finish(Message.endMessage(builder)); return builder.dataBuffer(); } diff --git a/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java b/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java index b529ca645a..6ce48bc51b 100644 --- a/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java +++ b/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java @@ -31,7 +31,9 @@ import java.nio.channels.Channels; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; @@ -242,4 +244,42 @@ public static void verifyBatch(ArrowRecordBatch batch, byte[] validity, byte[] v assertArrayEquals(validity, MessageSerializerTest.array(buffers.get(0))); assertArrayEquals(values, MessageSerializerTest.array(buffers.get(1))); } + + @Test + public void testRecordBatchCustomMetadata() throws Exception { + byte[] validity = new byte[] {(byte) 255, 0}; + byte[] values = new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}; + + BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE); + ArrowBuf validityb = buf(alloc, validity); + ArrowBuf valuesb = buf(alloc, values); + + Map customMetadata = new HashMap<>(); + customMetadata.put("key1", "value1"); + customMetadata.put("key2", "value2"); + + ArrowRecordBatch batch = + new ArrowRecordBatch( + 16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb), customMetadata); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + MessageSerializer.serialize(new WriteChannel(Channels.newChannel(out)), batch); + + ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray()); + ReadChannel channel = new ReadChannel(Channels.newChannel(in)); + ArrowMessage deserialized = MessageSerializer.deserializeMessageBatch(channel, alloc); + + assertEquals(ArrowRecordBatch.class, deserialized.getClass()); + ArrowRecordBatch deserializedBatch = (ArrowRecordBatch) deserialized; + verifyBatch(deserializedBatch, validity, values); + + Map deserializedMetadata = deserializedBatch.getCustomMetadata(); + assertEquals(customMetadata, deserializedMetadata); + + validityb.close(); + valuesb.close(); + batch.close(); + deserialized.close(); + alloc.close(); + } } From cd5397dad4a4007391dfc02df6a43fcb476d5453 Mon Sep 17 00:00:00 2001 From: Pedro Matias Date: Wed, 25 Feb 2026 01:09:59 +0000 Subject: [PATCH 2/8] Add custom_metadata to ArrowMessage --- .../apache/arrow/vector/ipc/message/ArrowMessage.java | 11 +++++++++++ .../arrow/vector/ipc/message/ArrowRecordBatch.java | 1 + .../arrow/vector/ipc/message/MessageSerializer.java | 6 +----- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java b/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java index 6f8e893405..06a507fa9c 100644 --- a/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java +++ b/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java @@ -16,6 +16,8 @@ */ package org.apache.arrow.vector.ipc.message; +import java.util.Map; + /** Interface for Arrow IPC messages (https://arrow.apache.org/docs/format/IPC.html). */ public interface ArrowMessage extends FBSerializable, AutoCloseable { @@ -26,6 +28,15 @@ public interface ArrowMessage extends FBSerializable, AutoCloseable { /** Returns the flatbuffer enum value indicating the type of the message. */ byte getMessageType(); + /** + * Returns custom metadata for this message, or null if none. + * + * @return custom metadata map, or null if no custom metadata is present + */ + default Map getCustomMetadata() { + return null; + } + /** * Visitor interface for implementations of {@link ArrowMessage}. * diff --git a/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java b/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java index 5d328b378b..82d7f33758 100644 --- a/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java +++ b/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java @@ -293,6 +293,7 @@ public ArrowBodyCompression getBodyCompression() { * * @return the custom metadata as an unmodifiable map */ + @Override public Map getCustomMetadata() { return customMetadata; } diff --git a/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java b/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java index c7575ee19c..14166ce32f 100644 --- a/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java +++ b/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java @@ -328,17 +328,13 @@ public static ByteBuffer serializeMetadata(ArrowMessage message) { public static ByteBuffer serializeMetadata(ArrowMessage message, IpcOption writeOption) { FlatBufferBuilder builder = new FlatBufferBuilder(); int batchOffset = message.writeTo(builder); - Map customMetadata = null; - if (message instanceof ArrowRecordBatch) { - customMetadata = ((ArrowRecordBatch) message).getCustomMetadata(); - } return serializeMessage( builder, message.getMessageType(), batchOffset, message.computeBodyLength(), writeOption, - customMetadata); + message.getCustomMetadata()); } /** From 2d9e83095c6f650b14baecfa003f500b6b0b389a Mon Sep 17 00:00:00 2001 From: Pedro Matias Date: Wed, 25 Feb 2026 01:23:36 +0000 Subject: [PATCH 3/8] Add null metadata key/values as empty strings --- .../arrow/vector/ipc/message/MessageSerializer.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java b/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java index 14166ce32f..99d11d29b9 100644 --- a/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java +++ b/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java @@ -354,9 +354,9 @@ public static ArrowRecordBatch deserializeRecordBatch( customMetadata = new HashMap<>(); for (int i = 0; i < recordBatchMessage.customMetadataLength(); i++) { KeyValue kv = recordBatchMessage.customMetadata(i); - if (kv != null && kv.key() != null && kv.value() != null) { - customMetadata.put(kv.key(), kv.value()); - } + String key = kv.key(); + String value = kv.value(); + customMetadata.put(key == null ? "" : key, value == null ? "" : value); } } return deserializeRecordBatch(recordBatchFB, bodyBuffer, customMetadata); @@ -420,9 +420,9 @@ public static ArrowRecordBatch deserializeRecordBatch( customMetadata = new HashMap<>(); for (int i = 0; i < messageFB.customMetadataLength(); i++) { KeyValue kv = messageFB.customMetadata(i); - if (kv != null && kv.key() != null && kv.value() != null) { - customMetadata.put(kv.key(), kv.value()); - } + String key = kv.key(); + String value = kv.value(); + customMetadata.put(key == null ? "" : key, value == null ? "" : value); } } From e6ba9f0f8cda70c4813d4e8a12673c9127100f70 Mon Sep 17 00:00:00 2001 From: Pedro Matias Date: Thu, 26 Feb 2026 20:31:28 +0000 Subject: [PATCH 4/8] Add custom_metadata to String representation of ArrowRecordBatch --- .../org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java b/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java index 82d7f33758..9b67668b42 100644 --- a/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java +++ b/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java @@ -426,6 +426,7 @@ public String toString() { if (variadicBufferCounts != null && !variadicBufferCounts.isEmpty()) { variadicBufCount = variadicBufferCounts.size(); } + String meta = customMetadata.isEmpty() ? "" : "(metadata: " + customMetadata + ")"; return "ArrowRecordBatch [length=" + length + ", nodes=" @@ -438,7 +439,8 @@ public String toString() { + buffersLayout + ", closed=" + closed - + "]"; + + "]" + + meta; } /** Computes the size of the serialized body for this recordBatch. */ From 2cb1d5e23e895b035984cd7122b8ff2dddeb3fa5 Mon Sep 17 00:00:00 2001 From: Pedro Matias Date: Thu, 26 Feb 2026 22:22:10 +0000 Subject: [PATCH 5/8] Refactor repeated code --- .../vector/ipc/message/MessageSerializer.java | 41 ++++++++----------- 1 file changed, 16 insertions(+), 25 deletions(-) diff --git a/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java b/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java index 99d11d29b9..eff98a3b29 100644 --- a/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java +++ b/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java @@ -348,18 +348,7 @@ public static ByteBuffer serializeMetadata(ArrowMessage message, IpcOption write public static ArrowRecordBatch deserializeRecordBatch( Message recordBatchMessage, ArrowBuf bodyBuffer) throws IOException { RecordBatch recordBatchFB = (RecordBatch) recordBatchMessage.header(new RecordBatch()); - // Extract custom metadata from the Message - Map customMetadata = null; - if (recordBatchMessage.customMetadataLength() > 0) { - customMetadata = new HashMap<>(); - for (int i = 0; i < recordBatchMessage.customMetadataLength(); i++) { - KeyValue kv = recordBatchMessage.customMetadata(i); - String key = kv.key(); - String value = kv.value(); - customMetadata.put(key == null ? "" : key, value == null ? "" : value); - } - } - return deserializeRecordBatch(recordBatchFB, bodyBuffer, customMetadata); + return deserializeRecordBatch(recordBatchFB, bodyBuffer, getCustomMetadata(recordBatchMessage)); } /** @@ -414,22 +403,10 @@ public static ArrowRecordBatch deserializeRecordBatch( RecordBatch recordBatchFB = (RecordBatch) messageFB.header(new RecordBatch()); - // Extract custom metadata from the Message - Map customMetadata = null; - if (messageFB.customMetadataLength() > 0) { - customMetadata = new HashMap<>(); - for (int i = 0; i < messageFB.customMetadataLength(); i++) { - KeyValue kv = messageFB.customMetadata(i); - String key = kv.key(); - String value = kv.value(); - customMetadata.put(key == null ? "" : key, value == null ? "" : value); - } - } - // Now read the body final ArrowBuf body = buffer.slice(block.getMetadataLength(), totalLen - block.getMetadataLength()); - return deserializeRecordBatch(recordBatchFB, body, customMetadata); + return deserializeRecordBatch(recordBatchFB, body, getCustomMetadata(messageFB)); } /** @@ -839,4 +816,18 @@ public static ArrowBuf readMessageBody(ReadChannel in, long bodyLength, BufferAl } return bodyBuffer; } + + private static Map getCustomMetadata(Message message) { + if (message.customMetadataLength() == 0) { + return null; + } + Map customMetadata = new HashMap<>(); + for (int i = 0; i < message.customMetadataLength(); i++) { + KeyValue kv = message.customMetadata(i); + String key = kv.key(); + String value = kv.value(); + customMetadata.put(key == null ? "" : key, value == null ? "" : value); + } + return customMetadata; + } } From 9d9777eb9feeb2f9c0a49b9e7c77c4ff788df2d2 Mon Sep 17 00:00:00 2001 From: Pedro Matias Date: Thu, 26 Feb 2026 23:07:23 +0000 Subject: [PATCH 6/8] Add ArrowRecordBatch constructor with customMetadata and without retainBuffers to replicate existing API --- .../vector/ipc/message/ArrowRecordBatch.java | 30 +++++++++++++++++++ .../vector/ipc/message/MessageSerializer.java | 1 - 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java b/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java index 9b67668b42..ce363b4ba9 100644 --- a/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java +++ b/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java @@ -160,6 +160,36 @@ public ArrowRecordBatch( true); } + /** + * Construct a record batch from nodes. + * + * @param length how many rows in this batch + * @param nodes field level info + * @param buffers will be retained until this recordBatch is closed + * @param bodyCompression compression info. + * @param variadicBufferCounts the number of buffers in each variadic section. + * @param alignBuffers Whether to align buffers to an 8 byte boundary. + * @param customMetadata custom metadata for this record batch. + */ + public ArrowRecordBatch( + int length, + List nodes, + List buffers, + ArrowBodyCompression bodyCompression, + List variadicBufferCounts, + boolean alignBuffers, + Map customMetadata) { + this( + length, + nodes, + buffers, + bodyCompression, + variadicBufferCounts, + alignBuffers, /*retainBuffers*/ + true, + customMetadata); + } + /** * Construct a record batch from nodes. * diff --git a/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java b/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java index eff98a3b29..779669d470 100644 --- a/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java +++ b/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java @@ -477,7 +477,6 @@ public static ArrowRecordBatch deserializeRecordBatch( bodyCompression, variadicBufferCounts, /*alignBuffers*/ true, - /*retainBuffers*/ true, customMetadata); body.getReferenceManager().release(); return arrowRecordBatch; From 62a689be1439ee8d74d0b5412f642490af033a0b Mon Sep 17 00:00:00 2001 From: Pedro Matias Date: Fri, 27 Feb 2026 01:02:55 +0000 Subject: [PATCH 7/8] Refactor message serialization for readability --- .../vector/ipc/message/MessageSerializer.java | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java b/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java index 779669d470..a779f95193 100644 --- a/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java +++ b/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java @@ -419,6 +419,7 @@ public static ArrowRecordBatch deserializeRecordBatch( */ public static ArrowRecordBatch deserializeRecordBatch(RecordBatch recordBatchFB, ArrowBuf body) throws IOException { + // RecordBatch is not encapsulated in a Message, so there is no custom metadata return deserializeRecordBatch(recordBatchFB, body, null); } @@ -722,17 +723,7 @@ public static ByteBuffer serializeMessage( long bodyLength, IpcOption writeOption, Map customMetadata) { - int customMetadataOffset = 0; - if (customMetadata != null && !customMetadata.isEmpty()) { - int[] metadataOffsets = new int[customMetadata.size()]; - int i = 0; - for (Map.Entry entry : customMetadata.entrySet()) { - int keyOffset = builder.createString(entry.getKey()); - int valueOffset = builder.createString(entry.getValue()); - metadataOffsets[i++] = KeyValue.createKeyValue(builder, keyOffset, valueOffset); - } - customMetadataOffset = Message.createCustomMetadataVector(builder, metadataOffsets); - } + int customMetadataOffset = getCustomMetadataOffset(builder, customMetadata); Message.startMessage(builder); Message.addHeaderType(builder, headerType); @@ -829,4 +820,20 @@ private static Map getCustomMetadata(Message message) { } return customMetadata; } + + private static int getCustomMetadataOffset( + FlatBufferBuilder builder, Map customMetadata) { + int customMetadataOffset = 0; + if (customMetadata != null && !customMetadata.isEmpty()) { + int[] metadataOffsets = new int[customMetadata.size()]; + int i = 0; + for (Map.Entry entry : customMetadata.entrySet()) { + int keyOffset = builder.createString(entry.getKey()); + int valueOffset = builder.createString(entry.getValue()); + metadataOffsets[i++] = KeyValue.createKeyValue(builder, keyOffset, valueOffset); + } + customMetadataOffset = Message.createCustomMetadataVector(builder, metadataOffsets); + } + return customMetadataOffset; + } } From 22f5abdf0fcc340033ff7cc742f343e85f5582ee Mon Sep 17 00:00:00 2001 From: Pedro Matias Date: Fri, 27 Feb 2026 01:13:10 +0000 Subject: [PATCH 8/8] Refactor method name --- .../apache/arrow/vector/ipc/message/MessageSerializer.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java b/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java index a779f95193..0268797ca3 100644 --- a/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java +++ b/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java @@ -348,7 +348,8 @@ public static ByteBuffer serializeMetadata(ArrowMessage message, IpcOption write public static ArrowRecordBatch deserializeRecordBatch( Message recordBatchMessage, ArrowBuf bodyBuffer) throws IOException { RecordBatch recordBatchFB = (RecordBatch) recordBatchMessage.header(new RecordBatch()); - return deserializeRecordBatch(recordBatchFB, bodyBuffer, getCustomMetadata(recordBatchMessage)); + return deserializeRecordBatch( + recordBatchFB, bodyBuffer, deserializeCustomMetadata(recordBatchMessage)); } /** @@ -406,7 +407,7 @@ public static ArrowRecordBatch deserializeRecordBatch( // Now read the body final ArrowBuf body = buffer.slice(block.getMetadataLength(), totalLen - block.getMetadataLength()); - return deserializeRecordBatch(recordBatchFB, body, getCustomMetadata(messageFB)); + return deserializeRecordBatch(recordBatchFB, body, deserializeCustomMetadata(messageFB)); } /** @@ -807,7 +808,7 @@ public static ArrowBuf readMessageBody(ReadChannel in, long bodyLength, BufferAl return bodyBuffer; } - private static Map getCustomMetadata(Message message) { + private static Map deserializeCustomMetadata(Message message) { if (message.customMetadataLength() == 0) { return null; }