diff --git a/src/iceberg/arrow/arrow_io.cc b/src/iceberg/arrow/arrow_io.cc index 6b159ca89..4c795badf 100644 --- a/src/iceberg/arrow/arrow_io.cc +++ b/src/iceberg/arrow/arrow_io.cc @@ -378,6 +378,13 @@ class ArrowPositionOutputStream : public PositionOutputStream { return position; } + Result StoredLength() const override { + if (!output_->closed()) { + return Position(); + } + return closed_position_; + } + Status Write(std::span data) override { ICEBERG_ASSIGN_OR_RAISE(auto size, ToInt64Length(data.size())); ICEBERG_ARROW_RETURN_NOT_OK(output_->Write(data.data(), size)); @@ -393,12 +400,15 @@ class ArrowPositionOutputStream : public PositionOutputStream { if (output_->closed()) { return {}; } + ICEBERG_ASSIGN_OR_RAISE(auto position, Position()); ICEBERG_ARROW_RETURN_NOT_OK(output_->Close()); + closed_position_ = position; return {}; } private: std::shared_ptr<::arrow::io::OutputStream> output_; + int64_t closed_position_ = 0; }; class ArrowInputFile : public InputFile { diff --git a/src/iceberg/file_io.h b/src/iceberg/file_io.h index 1f91fb0c1..ba6f0129a 100644 --- a/src/iceberg/file_io.h +++ b/src/iceberg/file_io.h @@ -66,6 +66,12 @@ class ICEBERG_EXPORT PositionOutputStream { /// \brief Return the current write position. virtual Result Position() const = 0; + /// \brief Return the current stored length of the output. + /// + /// This can differ from the current position for encrypting streams, and for other + /// non-length-preserving streams. + virtual Result StoredLength() const { return Position(); } + /// \brief Write all bytes in data at the current position. virtual Status Write(std::span data) = 0; diff --git a/src/iceberg/puffin/puffin_writer.cc b/src/iceberg/puffin/puffin_writer.cc index db749117f..9c173ab52 100644 --- a/src/iceberg/puffin/puffin_writer.cc +++ b/src/iceberg/puffin/puffin_writer.cc @@ -155,7 +155,7 @@ Status PuffinWriter::Finish() { footer_written_ = true; ICEBERG_RETURN_UNEXPECTED(stream_->Flush()); ICEBERG_RETURN_UNEXPECTED(stream_->Close()); - ICEBERG_ASSIGN_OR_RAISE(file_size_, stream_->Position()); + ICEBERG_ASSIGN_OR_RAISE(file_size_, stream_->StoredLength()); finished_ = true; return {}; } diff --git a/src/iceberg/test/arrow_io_test.cc b/src/iceberg/test/arrow_io_test.cc index 4ac83469c..a30e2da93 100644 --- a/src/iceberg/test/arrow_io_test.cc +++ b/src/iceberg/test/arrow_io_test.cc @@ -410,6 +410,21 @@ TEST_F(LocalFileIOTest, StdReadKeepsPositionAvailableAtEof) { EXPECT_THAT(stream->Position(), HasValue(::testing::Eq(3))); } +TEST(ArrowFileIOTest, OutputStoredLengthAfterClose) { + auto file_io = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); + ICEBERG_UNWRAP_OR_FAIL(auto output_file, file_io->NewOutputFile("output")); + ICEBERG_UNWRAP_OR_FAIL(auto output, output_file->Create()); + + std::array data = {std::byte{'a'}, std::byte{'b'}, std::byte{'c'}}; + ASSERT_THAT(output->Write(data), IsOk()); + ASSERT_THAT(output->Close(), IsOk()); + + auto position = output->Position(); + ASSERT_FALSE(position.has_value()); + EXPECT_THAT(position.error().message, ::testing::HasSubstr("closed")); + EXPECT_THAT(output->StoredLength(), HasValue(::testing::Eq(3))); +} + TEST_F(LocalFileIOTest, ResolvesForeignSchemeToUnderlyingPath) { ASSERT_THAT(file_io_->WriteFile(temp_filepath_, "hello world"), IsOk()); diff --git a/src/iceberg/test/std_io.h b/src/iceberg/test/std_io.h index 3866bddf0..725fc7ba5 100644 --- a/src/iceberg/test/std_io.h +++ b/src/iceberg/test/std_io.h @@ -170,6 +170,18 @@ class StdPositionOutputStream : public PositionOutputStream { return static_cast(position); } + Result StoredLength() const override { + if (file_.is_open()) { + return Position(); + } + std::error_code ec; + auto size = std::filesystem::file_size(location_, ec); + if (ec) { + return IOError("Failed to get file size for {}: {}", location_, ec.message()); + } + return detail::ToInt64FileSize(size, location_); + } + Status Write(std::span data) override { if (data.empty()) { return {};