-
Notifications
You must be signed in to change notification settings - Fork 4k
GH-48846: [C++] Read message metadata and body in one go in IPC file reader #48975
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
da24cfc
a0e7d11
e1058fc
d78c383
ff6a9e5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -552,9 +552,15 @@ class TestIpcRoundTrip : public ::testing::TestWithParam<MakeRecordBatch*>, | |
| ASSERT_OK(WriteRecordBatch(*batch, buffer_offset, mmap_.get(), &metadata_length, | ||
| &body_length, options_)); | ||
|
|
||
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message, | ||
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message1, | ||
| ReadMessage(0, metadata_length, mmap_.get())); | ||
| ASSERT_EQ(expected_version, message->metadata_version()); | ||
| ASSERT_EQ(expected_version, message1->metadata_version()); | ||
|
|
||
| ASSERT_OK_AND_ASSIGN(auto message2, | ||
| ReadMessage(0, metadata_length, body_length, mmap_.get())); | ||
| ASSERT_EQ(expected_version, message2->metadata_version()); | ||
|
|
||
| ASSERT_TRUE(message1->Equals(*message2)); | ||
| } | ||
| }; | ||
|
|
||
|
|
@@ -613,6 +619,27 @@ TEST(TestReadMessage, CorruptedSmallInput) { | |
| ASSERT_EQ(nullptr, message); | ||
| } | ||
|
|
||
| TEST(TestReadMessage, ReadBodyWithLength) { | ||
| // Test the optimized ReadMessage(offset, meta_len, body_len, file) overload | ||
| std::shared_ptr<RecordBatch> batch; | ||
| ASSERT_OK(MakeIntRecordBatch(&batch)); | ||
|
|
||
| ASSERT_OK_AND_ASSIGN(auto stream, io::BufferOutputStream::Create(0)); | ||
| int32_t metadata_length; | ||
| int64_t body_length; | ||
| ASSERT_OK(WriteRecordBatch(*batch, 0, stream.get(), &metadata_length, &body_length, | ||
| IpcWriteOptions::Defaults())); | ||
|
|
||
| ASSERT_OK_AND_ASSIGN(auto buffer, stream->Finish()); | ||
| io::BufferReader reader(buffer); | ||
|
|
||
| ASSERT_OK_AND_ASSIGN(auto message, | ||
| ReadMessage(0, metadata_length, body_length, &reader)); | ||
|
|
||
| ASSERT_EQ(body_length, message->body_length()); | ||
| ASSERT_TRUE(message->Verify()); | ||
| } | ||
|
|
||
| TEST(TestMetadata, GetMetadataVersion) { | ||
| ASSERT_EQ(MetadataVersion::V1, ipc::internal::GetMetadataVersion( | ||
| flatbuf::MetadataVersion::MetadataVersion_V1)); | ||
|
|
@@ -1094,7 +1121,7 @@ TEST_F(RecursionLimits, ReadLimit) { | |
| &schema)); | ||
|
|
||
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message, | ||
| ReadMessage(0, metadata_length, mmap_.get())); | ||
| ReadMessage(0, metadata_length, body_length, mmap_.get())); | ||
|
|
||
| io::BufferReader reader(message->body()); | ||
|
|
||
|
|
@@ -1119,7 +1146,7 @@ TEST_F(RecursionLimits, StressLimit) { | |
| &schema)); | ||
|
|
||
| ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message, | ||
| ReadMessage(0, metadata_length, mmap_.get())); | ||
| ReadMessage(0, metadata_length, body_length, mmap_.get())); | ||
|
|
||
| DictionaryMemo empty_memo; | ||
|
|
||
|
|
@@ -3018,25 +3045,56 @@ void GetReadRecordBatchReadRanges( | |
|
|
||
| auto read_ranges = tracked->get_read_ranges(); | ||
|
|
||
| // there are 3 read IOs before reading body: | ||
| // 1) read magic and footer length IO | ||
| // 2) read footer IO | ||
| // 3) read record batch metadata IO | ||
| EXPECT_EQ(read_ranges.size(), 3 + expected_body_read_lengths.size()); | ||
|
Comment on lines
-3021
to
-3025
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, can we keep a less strict assertion to check that we can at least access the two first elements without segfaulting? // there are at least 2 read IOs before reading body:
// 1) read magic and footer length IO
// 2) read footer IO
EXPECT_GE(read_ranges.size(), 2);
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
| const int32_t magic_size = static_cast<int>(strlen(ipc::internal::kArrowMagicBytes)); | ||
| // read magic and footer length IO | ||
| auto file_end_size = magic_size + sizeof(int32_t); | ||
| auto footer_length_offset = buffer->size() - file_end_size; | ||
| auto footer_length = bit_util::FromLittleEndian( | ||
| util::SafeLoadAs<int32_t>(buffer->data() + footer_length_offset)); | ||
|
|
||
| // there are at least 2 read IOs before reading body: | ||
| // 1) read magic and footer length IO | ||
| // 2) footer IO | ||
| EXPECT_GE(read_ranges.size(), 2); | ||
|
|
||
| // read magic and footer length IO | ||
| EXPECT_EQ(read_ranges[0].length, file_end_size); | ||
| // read footer IO | ||
| EXPECT_EQ(read_ranges[1].length, footer_length); | ||
| // read record batch metadata. The exact size is tricky to determine but it doesn't | ||
| // matter for this test and it should be smaller than the footer. | ||
| EXPECT_LE(read_ranges[2].length, footer_length); | ||
| for (uint32_t i = 0; i < expected_body_read_lengths.size(); i++) { | ||
| EXPECT_EQ(read_ranges[3 + i].length, expected_body_read_lengths[i]); | ||
|
|
||
| if (included_fields.empty()) { | ||
| // When no fields are explicitly included, the reader optimizes by | ||
| // reading metadata and the entire body in a single IO. | ||
| // Thus, there are exactly 3 read IOs in total: | ||
| // 1) magic and footer length | ||
| // 2) footer | ||
| // 3) record batch metadata + body | ||
| EXPECT_EQ(read_ranges.size(), 3); | ||
|
|
||
| int64_t total_body = 0; | ||
| for (auto len : expected_body_read_lengths) total_body += len; | ||
|
|
||
| // In the optimized path (included_fields is empty), the 3rd read operation | ||
| // fetches both the message metadata (flatbuffer) and the entire message body | ||
| // in one contiguous block. Therefore, its length must at least exceed the | ||
| // total body length by the size of the metadata. | ||
| EXPECT_GT(read_ranges[2].length, total_body); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a small comment explaining why? Also can take inspiration from the other code path and add:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
| EXPECT_LE(read_ranges[2].length, total_body + footer_length); | ||
| } else { | ||
| // When fields are filtered, we see 3 initial reads followed by N body reads | ||
| // (one for each field/buffer range): | ||
| // 1) magic and footer length | ||
| // 2) footer | ||
| // 3) record batch metadata | ||
| // 4) individual body buffer reads | ||
| EXPECT_EQ(read_ranges.size(), 3 + expected_body_read_lengths.size()); | ||
|
|
||
| // read record batch metadata. The exact size is tricky to determine but it doesn't | ||
| // matter for this test and it should be smaller than the footer. | ||
| EXPECT_LE(read_ranges[2].length, footer_length); | ||
| for (uint32_t i = 0; i < expected_body_read_lengths.size(); i++) { | ||
| EXPECT_EQ(read_ranges[3 + i].length, expected_body_read_lengths[i]); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -3186,7 +3244,9 @@ class PreBufferingTest : public ::testing::TestWithParam<bool> { | |
| metadata_reads++; | ||
| } | ||
| } | ||
| ASSERT_EQ(metadata_reads, reader_->num_record_batches() - num_indices_pre_buffered); | ||
| // With ReadMessage optimization, non-prebuffered reads verify metadata and body | ||
| // in a single large read, so we no longer see small metadata-only reads here. | ||
| ASSERT_EQ(metadata_reads, 0); | ||
| ASSERT_EQ(data_reads, reader_->num_record_batches()); | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.