Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 40 additions & 8 deletions cpp/src/arrow/ipc/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cstddef>
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <vector>
Expand Down Expand Up @@ -363,9 +364,13 @@ Result<std::unique_ptr<Message>> ReadMessage(std::shared_ptr<Buffer> metadata,
}
}

Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_length,
io::RandomAccessFile* file,
const FieldsLoaderFunction& fields_loader) {
// Common helper for the two ReadMessage overloads that take a file + offset.
// When body_length is provided, metadata and body are read in a single IO.
// When body_length is absent, metadata is read first, then the body is read
// separately.
static Result<std::unique_ptr<Message>> ReadMessageInternal(
int64_t offset, int32_t metadata_length, std::optional<int64_t> body_length,
io::RandomAccessFile* file, const FieldsLoaderFunction& fields_loader) {
std::unique_ptr<Message> result;
auto listener = std::make_shared<AssignMessageDecoderListener>(&result);
MessageDecoder decoder(listener);
Expand All @@ -375,15 +380,18 @@ Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_le
decoder.next_required_size());
}

// TODO(GH-48846): we should take a body_length just like ReadMessageAsync
// and read metadata + body in one go.
ARROW_ASSIGN_OR_RAISE(auto metadata, file->ReadAt(offset, metadata_length));
// When body_length is known, read metadata + body in one IO call.
// Otherwise, read only metadata first.
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> metadata,
file->ReadAt(offset, metadata_length + body_length.value_or(0)));

if (metadata->size() < metadata_length) {
return Status::Invalid("Expected to read ", metadata_length,
" metadata bytes at offset ", offset, " but got ",
metadata->size());
}
ARROW_RETURN_NOT_OK(decoder.Consume(metadata));

ARROW_RETURN_NOT_OK(decoder.Consume(SliceBuffer(metadata, 0, metadata_length)));

switch (decoder.state()) {
case MessageDecoder::State::INITIAL:
Expand All @@ -398,14 +406,23 @@ Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_le
case MessageDecoder::State::BODY: {
std::shared_ptr<Buffer> body;
if (fields_loader) {
// Selective field loading: allocate a body buffer and read only the
// requested field ranges into it.
ARROW_ASSIGN_OR_RAISE(
body, AllocateBuffer(decoder.next_required_size(), default_memory_pool()));
RETURN_NOT_OK(ReadFieldsSubset(offset, metadata_length, file, fields_loader,
metadata, decoder.next_required_size(), body));
SliceBuffer(metadata, 0, metadata_length),
decoder.next_required_size(), body));
} else if (body_length.has_value()) {
// Body was already read as part of the combined IO; just slice it out.
body = SliceBuffer(metadata, metadata_length,
std::min(*body_length, metadata->size() - metadata_length));
} else {
// Body length was unknown; do a separate IO to read the body.
ARROW_ASSIGN_OR_RAISE(
body, file->ReadAt(offset + metadata_length, decoder.next_required_size()));
}

if (body->size() < decoder.next_required_size()) {
return Status::IOError("Expected to be able to read ",
decoder.next_required_size(),
Expand All @@ -421,6 +438,21 @@ Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_le
}
}

Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_length,
io::RandomAccessFile* file,
const FieldsLoaderFunction& fields_loader) {
return ReadMessageInternal(offset, metadata_length, /*body_length=*/std::nullopt, file,
fields_loader);
}

Result<std::unique_ptr<Message>> ReadMessage(const int64_t offset,
const int32_t metadata_length,
const int64_t body_length,
io::RandomAccessFile* file) {
return ReadMessageInternal(offset, metadata_length, body_length, file,
/*fields_loader=*/{});
}

Future<std::shared_ptr<Message>> ReadMessageAsync(int64_t offset, int32_t metadata_length,
int64_t body_length,
io::RandomAccessFile* file,
Expand Down
24 changes: 22 additions & 2 deletions cpp/src/arrow/ipc/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ class ARROW_EXPORT MessageReader {
// org::apache::arrow::flatbuf::RecordBatch*)
using FieldsLoaderFunction = std::function<Status(const void*, io::RandomAccessFile*)>;

/// \brief Read encapsulated RPC message from position in file
/// \brief Read encapsulated IPC message from position in file
///
/// Read a length-prefixed message flatbuffer starting at the indicated file
/// offset. If the message has a body with non-zero length, it will also be
Expand All @@ -469,7 +469,27 @@ Result<std::unique_ptr<Message>> ReadMessage(
const int64_t offset, const int32_t metadata_length, io::RandomAccessFile* file,
const FieldsLoaderFunction& fields_loader = {});

/// \brief Read encapsulated RPC message from cached buffers
/// \brief Read encapsulated IPC message from position in file
///
/// Read a length-prefixed message flatbuffer starting at the indicated file
/// offset.
///
/// The metadata_length includes at least the length prefix and the flatbuffer
///
/// \param[in] offset the position in the file where the message starts. The
/// first 4 bytes after the offset are the message length
/// \param[in] metadata_length the total number of bytes to read from file
/// \param[in] body_length the number of bytes for the message body
/// \param[in] file the seekable file interface to read from
/// \return the message read

ARROW_EXPORT
Result<std::unique_ptr<Message>> ReadMessage(const int64_t offset,
const int32_t metadata_length,
const int64_t body_length,
io::RandomAccessFile* file);

/// \brief Read encapsulated IPC message from cached buffers
///
/// The buffers should contain an entire message. Partial reads are not handled.
///
Expand Down
90 changes: 75 additions & 15 deletions cpp/src/arrow/ipc/read_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -552,9 +552,15 @@ class TestIpcRoundTrip : public ::testing::TestWithParam<MakeRecordBatch*>,
ASSERT_OK(WriteRecordBatch(*batch, buffer_offset, mmap_.get(), &metadata_length,
&body_length, options_));

ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message,
ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message1,
ReadMessage(0, metadata_length, mmap_.get()));
ASSERT_EQ(expected_version, message->metadata_version());
ASSERT_EQ(expected_version, message1->metadata_version());

ASSERT_OK_AND_ASSIGN(auto message2,
ReadMessage(0, metadata_length, body_length, mmap_.get()));
ASSERT_EQ(expected_version, message2->metadata_version());

ASSERT_TRUE(message1->Equals(*message2));
}
};

Expand Down Expand Up @@ -613,6 +619,27 @@ TEST(TestReadMessage, CorruptedSmallInput) {
ASSERT_EQ(nullptr, message);
}

TEST(TestReadMessage, ReadBodyWithLength) {
// Test the optimized ReadMessage(offset, meta_len, body_len, file) overload
std::shared_ptr<RecordBatch> batch;
ASSERT_OK(MakeIntRecordBatch(&batch));

ASSERT_OK_AND_ASSIGN(auto stream, io::BufferOutputStream::Create(0));
int32_t metadata_length;
int64_t body_length;
ASSERT_OK(WriteRecordBatch(*batch, 0, stream.get(), &metadata_length, &body_length,
IpcWriteOptions::Defaults()));

ASSERT_OK_AND_ASSIGN(auto buffer, stream->Finish());
io::BufferReader reader(buffer);

ASSERT_OK_AND_ASSIGN(auto message,
ReadMessage(0, metadata_length, body_length, &reader));

ASSERT_EQ(body_length, message->body_length());
ASSERT_TRUE(message->Verify());
}

TEST(TestMetadata, GetMetadataVersion) {
ASSERT_EQ(MetadataVersion::V1, ipc::internal::GetMetadataVersion(
flatbuf::MetadataVersion::MetadataVersion_V1));
Expand Down Expand Up @@ -1094,7 +1121,7 @@ TEST_F(RecursionLimits, ReadLimit) {
&schema));

ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message,
ReadMessage(0, metadata_length, mmap_.get()));
ReadMessage(0, metadata_length, body_length, mmap_.get()));

io::BufferReader reader(message->body());

Expand All @@ -1119,7 +1146,7 @@ TEST_F(RecursionLimits, StressLimit) {
&schema));

ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message,
ReadMessage(0, metadata_length, mmap_.get()));
ReadMessage(0, metadata_length, body_length, mmap_.get()));

DictionaryMemo empty_memo;

Expand Down Expand Up @@ -3018,25 +3045,56 @@ void GetReadRecordBatchReadRanges(

auto read_ranges = tracked->get_read_ranges();

// there are 3 read IOs before reading body:
// 1) read magic and footer length IO
// 2) read footer IO
// 3) read record batch metadata IO
EXPECT_EQ(read_ranges.size(), 3 + expected_body_read_lengths.size());
Comment on lines -3021 to -3025
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, can we keep a less strict assertion to check that we can at least access the two first elements without segfaulting?
Something like:

  // there are at least 2 read IOs before reading body:
  // 1) read magic and footer length IO
  // 2) read footer IO
  EXPECT_GE(read_ranges.size(), 2);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

const int32_t magic_size = static_cast<int>(strlen(ipc::internal::kArrowMagicBytes));
// read magic and footer length IO
auto file_end_size = magic_size + sizeof(int32_t);
auto footer_length_offset = buffer->size() - file_end_size;
auto footer_length = bit_util::FromLittleEndian(
util::SafeLoadAs<int32_t>(buffer->data() + footer_length_offset));

// there are at least 2 read IOs before reading body:
// 1) read magic and footer length IO
// 2) footer IO
EXPECT_GE(read_ranges.size(), 2);

// read magic and footer length IO
EXPECT_EQ(read_ranges[0].length, file_end_size);
// read footer IO
EXPECT_EQ(read_ranges[1].length, footer_length);
// read record batch metadata. The exact size is tricky to determine but it doesn't
// matter for this test and it should be smaller than the footer.
EXPECT_LE(read_ranges[2].length, footer_length);
for (uint32_t i = 0; i < expected_body_read_lengths.size(); i++) {
EXPECT_EQ(read_ranges[3 + i].length, expected_body_read_lengths[i]);

if (included_fields.empty()) {
// When no fields are explicitly included, the reader optimizes by
// reading metadata and the entire body in a single IO.
// Thus, there are exactly 3 read IOs in total:
// 1) magic and footer length
// 2) footer
// 3) record batch metadata + body
EXPECT_EQ(read_ranges.size(), 3);

int64_t total_body = 0;
for (auto len : expected_body_read_lengths) total_body += len;

// In the optimized path (included_fields is empty), the 3rd read operation
// fetches both the message metadata (flatbuffer) and the entire message body
// in one contiguous block. Therefore, its length must at least exceed the
// total body length by the size of the metadata.
EXPECT_GT(read_ranges[2].length, total_body);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a small comment explaining why? Also can take inspiration from the other code path and add:

  EXPECT_LE(read_ranges[2].length, total_body + footer_length);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

EXPECT_LE(read_ranges[2].length, total_body + footer_length);
} else {
// When fields are filtered, we see 3 initial reads followed by N body reads
// (one for each field/buffer range):
// 1) magic and footer length
// 2) footer
// 3) record batch metadata
// 4) individual body buffer reads
EXPECT_EQ(read_ranges.size(), 3 + expected_body_read_lengths.size());

// read record batch metadata. The exact size is tricky to determine but it doesn't
// matter for this test and it should be smaller than the footer.
EXPECT_LE(read_ranges[2].length, footer_length);
for (uint32_t i = 0; i < expected_body_read_lengths.size(); i++) {
EXPECT_EQ(read_ranges[3 + i].length, expected_body_read_lengths[i]);
}
}
}

Expand Down Expand Up @@ -3186,7 +3244,9 @@ class PreBufferingTest : public ::testing::TestWithParam<bool> {
metadata_reads++;
}
}
ASSERT_EQ(metadata_reads, reader_->num_record_batches() - num_indices_pre_buffered);
// With ReadMessage optimization, non-prebuffered reads verify metadata and body
// in a single large read, so we no longer see small metadata-only reads here.
ASSERT_EQ(metadata_reads, 0);
ASSERT_EQ(data_reads, reader_->num_record_batches());
}

Expand Down
12 changes: 9 additions & 3 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1236,9 +1236,15 @@ Result<std::unique_ptr<Message>> ReadMessageFromBlock(
const FileBlock& block, io::RandomAccessFile* file,
const FieldsLoaderFunction& fields_loader) {
RETURN_NOT_OK(CheckAligned(block));
ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, block.metadata_length,
file, fields_loader));
return CheckBodyLength(std::move(message), block);
if (fields_loader) {
ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, block.metadata_length,
file, fields_loader));
return CheckBodyLength(std::move(message), block);
} else {
ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, block.metadata_length,
block.body_length, file));
return CheckBodyLength(std::move(message), block);
}
}

Future<std::shared_ptr<Message>> ReadMessageFromBlockAsync(
Expand Down
Loading