Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c49fd56
Use Pinch to decode parquet metadata
Mar 4, 2026
faef937
WIP Implement Parquet reading using streamly
sharmrj Mar 8, 2026
2f95aa8
WIP: PArquet Refactor
sharmrj Mar 15, 2026
8dfea3c
Refactored the streaming parquet parser to return a stream of Columns…
sharmrj Mar 16, 2026
14f0399
Implemented a streaming parquet parser
sharmrj Mar 20, 2026
b29814a
copied over the tests for the parquet parser to test the unstable parser
sharmrj Mar 20, 2026
e0e5a70
Updated the pinch dependency constraints
sharmrj Mar 20, 2026
e0f25c9
Ran fourmolu on the changed files
sharmrj Mar 20, 2026
da0ecc1
ran fourmolu on `DataFrame.IO.Unstable.Parquet.Utils
sharmrj Mar 20, 2026
622a261
Ran fourmolu on the new test file
sharmrj Mar 20, 2026
4c2e2ce
Fixed some hlint issues
sharmrj Mar 20, 2026
6abbe5c
Fixed an issue where the parquet parser was using ~2x the amount of m…
sharmrj Apr 4, 2026
ba5ff6a
Changed Parquet Zstd decompression to no longer stream
sharmrj Apr 4, 2026
61aa7d3
Use `FileBufferedOrSeekable` for the `RandomAccess` instance for `Loc…
sharmrj Apr 4, 2026
461769f
WIP: Streaming Parquet Reader
sharmrj Apr 19, 2026
f0e3f9e
Merge remote-tracking branch 'refs/remotes/upstream/main' into stream…
sharmrj Apr 19, 2026
0206cfe
WIP: Streaming Parquet Implementation
sharmrj Apr 19, 2026
9361f5a
Cleaned up RandomAccess.hs
sharmrj Apr 19, 2026
f349ef1
Implemented the remainder of the parquet parser; replaced functions t…
sharmrj Apr 19, 2026
fe60a50
Formatting
sharmrj Apr 19, 2026
5095e68
Removed an unused pragma
sharmrj Apr 19, 2026
bdc2219
Removed shadowed variable names; removed unused imports; added the La…
sharmrj Apr 19, 2026
1b21195
fourmolu
sharmrj Apr 19, 2026
e01ffc1
Fixed some compiler warnings
sharmrj Apr 19, 2026
3b47a88
Move Unstable module to the main parquet folder; Remove Unstable Module
sharmrj Apr 20, 2026
d4759b5
Fixed hlint errors
sharmrj Apr 20, 2026
b700ec6
Updated examples.cabal with the new parquet IO files
sharmrj Apr 20, 2026
1f0fe12
Removed a duplicate module in examples.cabal
sharmrj Apr 20, 2026
61c7500
Add `pinch` to the `build-depends` list in `examples.cabal`
sharmrj Apr 20, 2026
076602f
Merge branch 'main' into streaming-parquet
sharmrj Apr 30, 2026
79b7248
Added a newline to the end of Parquet.hs
sharmrj Apr 30, 2026
25b1ff0
Removed the newline at the end of Parquet.hs
sharmrj Apr 30, 2026
6e36269
In the unboxed case we no longer box an unboxed value and then immedi…
sharmrj Apr 30, 2026
2e84484
Removed an unused language pragma in DataFrame.IO.Parquet.Utils
sharmrj Apr 30, 2026
7c1034e
Slightly modified a couple of comments
sharmrj Apr 30, 2026
43c1f02
Merge remote-tracking branch 'refs/remotes/origin/streaming-parquet' …
sharmrj Apr 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions dataframe.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ library
DataFrame.IO.CSV,
DataFrame.IO.JSON,
DataFrame.IO.Unstable.CSV,
DataFrame.IO.Unstable.Parquet.Utils,
DataFrame.IO.Unstable.Parquet.Thrift,
DataFrame.IO.Unstable.Parquet.PageParser,
DataFrame.IO.Unstable.Parquet,
DataFrame.IO.Utils.RandomAccess,
DataFrame.IO.Parquet,
DataFrame.IO.Parquet.Binary,
DataFrame.IO.Parquet.Dictionary,
Expand Down Expand Up @@ -148,6 +153,8 @@ library
http-conduit >= 2.3 && < 3,
streamly-core,
streamly-bytestring,
pinch >= 0.5.1.0 && <= 0.5.2.0 ,
streamly-core >= 0.3.0,

hs-source-dirs: src
c-sources: cbits/process_csv.c
Expand Down
4 changes: 4 additions & 0 deletions src/DataFrame.hs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ module DataFrame (
module CSV,
module UnstableCSV,
module Parquet,
module UnstableParquet,

-- * Type conversion
module Typing,
Expand Down Expand Up @@ -272,6 +273,9 @@ import DataFrame.IO.Unstable.CSV as UnstableCSV (
readCsvUnstable,
readTsvUnstable,
)
import DataFrame.IO.Unstable.Parquet as UnstableParquet (
readParquetUnstable,
)
import DataFrame.Internal.Column as Column (
Column,
fromList,
Expand Down
46 changes: 25 additions & 21 deletions src/DataFrame/IO/Parquet/Page.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,29 @@ isDictionaryPage page = case pageTypeHeader (pageHeader page) of
DictionaryPageHeader{..} -> True
_ -> False

decompressData :: CompressionCodec -> BS.ByteString -> IO BS.ByteString
decompressData codec compressed = case codec of
Copy link
Copy Markdown

@adithyaov adithyaov Mar 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The result of decompressData is used to produce a stream of Page (readPage). This decompression is strict in nature. I'm not sure if we can do a lazy, on-demand, decompression.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decompression happens per page. So in the step function of the columnChunk Unfold, we decompress each page in its entirety before we start processing it. I think there might possibly be some value in the future to reading each page lazily, but I would prefer to keep it strict for now.

ZSTD -> do
result <- Zstd.decompress
drainZstd result compressed []
where
drainZstd (Zstd.Consume f) input acc = do
result <- f input
drainZstd result BS.empty acc
drainZstd (Zstd.Produce chunk next) _ acc = do
result <- next
drainZstd result BS.empty (chunk : acc)
drainZstd (Zstd.Done final) _ acc =
pure $ BS.concat (reverse (final : acc))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bytestring might have something similar to fromListRevN or fromChunksRev. If not, it should be easy to write our own.
We can avoid a list traversal and pre-allocate the resulting array avoiding any unnecessary copies.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into this at some depth, and I think the best way to approach this is to not use streaming ZSTD since we know the uncompressed size up front from the PageHeader. So instead I use the function from Codec.Compression.Zstd.Base that directly calls the zstd decompress function :

    (ZSTD _) -> createAndTrim uncompressedSize $ \dstPtr ->
      let (srcFP, offset, compressedSize) = toForeignPtr compressed
      in withForeignPtr srcFP $ \srcPtr -> do
        result <- Zstd.decompress
                    dstPtr
                    uncompressedSize
                    (srcPtr `plusPtr`offset)
                    compressedSize
        case result of
          Left e -> error $ "ZSTD error: " <> e
          Right actualSize -> return actualSize

Wdyt @mchav?

drainZstd (Zstd.Error msg msg2) _ _ =
error ("ZSTD error: " ++ msg ++ " " ++ msg2)
SNAPPY -> case Snappy.decompress compressed of
Left e -> error (show e)
Right res -> pure res
UNCOMPRESSED -> pure compressed
GZIP -> pure (LB.toStrict (GZip.decompress (BS.fromStrict compressed)))
other -> error ("Unsupported compression type: " ++ show other)

readPage :: CompressionCodec -> BS.ByteString -> IO (Maybe Page, BS.ByteString)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related and out of context: This looks like an Unfold to me.

readPage c columnBytes =
if BS.null columnBytes
Expand All @@ -42,27 +65,8 @@ readPage c columnBytes =

let compressed = BS.take (fromIntegral $ compressedPageSize hdr) rem

fullData <- case c of
ZSTD -> do
result <- Zstd.decompress
drainZstd result compressed []
where
drainZstd (Zstd.Consume f) input acc = do
result <- f input
drainZstd result BS.empty acc
drainZstd (Zstd.Produce chunk next) _ acc = do
result <- next
drainZstd result BS.empty (chunk : acc)
drainZstd (Zstd.Done final) _ acc =
pure $ BS.concat (reverse (final : acc))
drainZstd (Zstd.Error msg msg2) _ _ =
error ("ZSTD error: " ++ msg ++ " " ++ msg2)
SNAPPY -> case Snappy.decompress compressed of
Left e -> error (show e)
Right res -> pure res
UNCOMPRESSED -> pure compressed
GZIP -> pure (LB.toStrict (GZip.decompress (BS.fromStrict compressed)))
other -> error ("Unsupported compression type: " ++ show other)
fullData <- decompressData c compressed

pure
( Just $ Page hdr fullData
, BS.drop (fromIntegral $ compressedPageSize hdr) rem
Expand Down
2 changes: 1 addition & 1 deletion src/DataFrame/IO/Parquet/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ data ParquetType
| PBYTE_ARRAY
| PFIXED_LEN_BYTE_ARRAY
| PARQUET_TYPE_UNKNOWN
deriving (Show, Eq)
deriving (Show, Eq, Enum)

parquetTypeFromInt :: Int32 -> ParquetType
parquetTypeFromInt 0 = PBOOLEAN
Expand Down
188 changes: 188 additions & 0 deletions src/DataFrame/IO/Unstable/Parquet.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
{-# LANGUAGE ExplicitForAll #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedRecordDot #-}

module DataFrame.IO.Unstable.Parquet (readParquetUnstable) where

import Control.Monad.IO.Class (MonadIO (..))
import Data.Bits (Bits (shiftL), (.|.))
import qualified Data.ByteString as BS
import Data.Functor ((<&>))
import Data.List (foldl', transpose)
import qualified Data.Map as Map
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use Data.Map.Strict by default, there is no need to be lazy here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it turns out the data declaration in DataFrame.Internal.DataFrame uses the Data.Map import. While I agree with you that we should probably make it Data.Map.Strict, I'm afraid that change has a footprint a bit too large for me to feel comfortable throwing it in here. We should follow up on it as its own task.

import Data.Maybe (fromJust, fromMaybe, isNothing)
import Data.Text (Text)
import qualified Data.Vector as Vector
import DataFrame.IO.Parquet.Dictionary (readDictVals)
import DataFrame.IO.Parquet.Page (decompressData)
import DataFrame.IO.Parquet.Types (DictVals)
import DataFrame.IO.Unstable.Parquet.PageParser (parsePage)
import DataFrame.IO.Unstable.Parquet.Thrift (
ColumnChunk (..),
ColumnMetaData (..),
CompressionCodec (..),
DictionaryPageHeader (..),
FileMetadata (..),
PageHeader (..),
RowGroup (..),
SchemaElement (..),
pinchCompressionToParquetCompression,
pinchThriftTypeToParquetType,
unField,
)
import DataFrame.IO.Unstable.Parquet.Utils (
ColumnDescription,
PageDescription (PageDescription),
foldColumns,
generateColumnDescriptions,
)
import DataFrame.IO.Utils.RandomAccess (
RandomAccess (..),
Range (Range),
ReaderIO (runReaderIO),
)
import DataFrame.Internal.Column (Column)
import DataFrame.Internal.DataFrame (DataFrame (..))
import Pinch (decodeWithLeftovers)
import qualified Pinch
import Streamly.Data.Stream (Stream)
import qualified Streamly.Data.Stream as Stream
import Streamly.Data.Unfold (Unfold)
import qualified Streamly.Internal.Data.Unfold as Unfold
import qualified System.IO as IO

readParquetUnstable :: FilePath -> IO DataFrame
readParquetUnstable filepath = IO.withFile filepath IO.ReadMode $ \handle -> do
runReaderIO parseParquet handle

parseParquet :: (RandomAccess r, MonadIO r) => r DataFrame
Comment thread
sharmrj marked this conversation as resolved.
Outdated
parseParquet = do
metadata <- parseFileMetadata
let vectorLength = fromIntegral . unField $ metadata.num_rows :: Int
columnStreams = parseColumns metadata
columnList <- mapM (foldColumns vectorLength) columnStreams
let columns = Vector.fromListN (length columnList) columnList
columnNames :: [Text]
columnNames =
map (unField . name)
. filter
( \se ->
(isNothing $ unField $ num_children se)

Check warning on line 71 in src/DataFrame/IO/Unstable/Parquet.hs

View workflow job for this annotation

GitHub Actions / hlint

Suggestion in parseParquet in module DataFrame.IO.Unstable.Parquet: Move brackets to avoid $ ▫︎ Found: "(isNothing $ unField $ num_children se)\n || unField se.num_children == Just 0" ▫︎ Perhaps: "isNothing (unField $ num_children se)\n || unField se.num_children == Just 0"
|| unField se.num_children == Just 0
)
$ unField metadata.schema
columnIndices = Map.fromList $ zip columnNames [0 ..]
dataframeDimensions = (vectorLength, length columnStreams)
return $ DataFrame columns columnIndices dataframeDimensions Map.empty

parseFileMetadata ::
(RandomAccess r) => r FileMetadata
parseFileMetadata = do
footerOffset <- readSuffix 8
let size = getMetadataSize footerOffset
rawMetadata <- readSuffix (size + 8) <&> BS.take size
case Pinch.decode Pinch.compactProtocol rawMetadata of
Left e -> error $ show e
Right metadata -> return metadata
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. You can use maybe
  2. Use of error will make the control flow harder to reason with and manage later.

where
getMetadataSize footer =
let sizes :: [Int]
sizes = map (fromIntegral . BS.index footer) [0 .. 3]
in foldl' (.|.) 0 $ zipWith shiftL sizes [0, 8 .. 24]

parseColumns :: (RandomAccess r, MonadIO r) => FileMetadata -> [Stream r Column]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this: [Stream r ColumnChunk]. That said, I'm not in a position to suggest a better alternative.
Could you help me understand how this fits in the bigger picture?
Each element in this list corresponds to a column?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: I think I see where this is used.
You can return a vector directly here.
Vector (Stream Column) is easier to reason with over [Stream Column]
FYI, Data.Vector == Streamly.Data.Array (Boxed & Unboxed)

parseColumns metadata =
let columnDescriptions = generateColumnDescriptions $ unField $ schema metadata
colChunks = columnChunks metadata
_numColumns = length colChunks
_numDescs = length columnDescriptions
in if _numColumns /= _numDescs
then
error $
"Column count mismatch: got "
<> show _numColumns
<> " columns but the schema implied "
<> show _numDescs
<> " columns"
else zipWith parse colChunks columnDescriptions
where
columnChunks :: (RandomAccess r) => FileMetadata -> [Stream r ColumnChunk]
columnChunks =
map Stream.fromList
. transpose
. map (unField . rg_columns)
. unField
. row_groups

parse ::
(RandomAccess r, MonadIO r) =>
Stream r ColumnChunk -> ColumnDescription -> Stream r Column
parse columnChunkStream description = Stream.unfoldEach (parseColumnChunk description) columnChunkStream

data ColumnChunkState
= ColumnChunkState
{ remainingBytes :: !BS.ByteString
, codec :: !CompressionCodec
, dictionary :: !(Maybe DictVals)
, parquetType :: !Int
}

parseColumnChunk ::
Comment thread
sharmrj marked this conversation as resolved.
Outdated
(RandomAccess r, MonadIO r) => ColumnDescription -> Unfold r ColumnChunk Column
Comment thread
sharmrj marked this conversation as resolved.
Outdated
parseColumnChunk description = Unfold.Unfold step inject
where
inject :: (RandomAccess r) => ColumnChunk -> r ColumnChunkState
inject columnChunk = do
let columnMetadata = fromJust $ unField $ cc_meta_data columnChunk
dataOffset = unField $ cmd_data_page_offset columnMetadata
dictOffset = fromMaybe dataOffset (unField $ cmd_dictionary_page_offset columnMetadata)
startOffset = min dataOffset dictOffset
compressedSize = unField $ cmd_total_compressed_size columnMetadata
chunkCodec = unField $ cmd_codec columnMetadata
parquetType = fromEnum $ pinchThriftTypeToParquetType (unField $ cmd_type columnMetadata)
range = Range (fromIntegral startOffset) (fromIntegral compressedSize)

rawBytes <- readBytes range
return $ ColumnChunkState rawBytes chunkCodec Nothing parquetType

step ::
(RandomAccess r, MonadIO r) =>
ColumnChunkState -> r (Unfold.Step ColumnChunkState Column)
step (ColumnChunkState remaining chunkCodec dict parquetType) = do
if BS.null remaining
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use guards here.

step (ColumnChunkState remaining _ _ _) | BS.null remaining = pure Unfold.Stop

then return Unfold.Stop
else case parsePageHeader remaining of
Left e -> error $ show e
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error is convenient to use while bootstrapping but eventually found undesirable later.
You can maybe add a TODO here to fix this later?

Right (remainder, header) -> do
let compressedPageSize = fromIntegral $ unField $ ph_compressed_page_size header
(pageData, rest) = BS.splitAt compressedPageSize remainder
uncompressedData <-
liftIO $
decompressData (pinchCompressionToParquetCompression chunkCodec) pageData

case unField $ ph_dictionary_page_header header of
Just dictHeader -> do
{-
The dictionary page must be placed at the first position of the column chunk
if it is partly or completely dictionary encoded. At most one dictionary page
can be placed in a column chunk.
This allows us to maintain the parsed DictVals for the chunk and pass it along
to subsequent data pages.
https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L698C1-L712C2
-}
let numValues = fromIntegral $ unField $ diph_num_values dictHeader
newDict = readDictVals (toEnum parquetType) uncompressedData (Just numValues)
step (ColumnChunkState rest chunkCodec (Just newDict) parquetType)
Comment thread
sharmrj marked this conversation as resolved.
Outdated
Nothing -> do
-- It's a data page. Yield it.
column <-
parsePage
description
(PageDescription uncompressedData header chunkCodec dict parquetType)
return $ Unfold.Yield column (ColumnChunkState rest chunkCodec dict parquetType)

parsePageHeader :: BS.ByteString -> Either String (BS.ByteString, PageHeader)
parsePageHeader bytes = case decodeWithLeftovers Pinch.compactProtocol bytes of
Left e -> Left e
Right header -> Right header
Comment thread
sharmrj marked this conversation as resolved.
Outdated
78 changes: 78 additions & 0 deletions src/DataFrame/IO/Unstable/Parquet/PageParser.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ScopedTypeVariables #-}

module DataFrame.IO.Unstable.Parquet.PageParser (parsePage) where

import Control.Monad.IO.Class (MonadIO (liftIO))
import DataFrame.IO.Parquet (applyLogicalType, decodePageData)
import DataFrame.IO.Parquet.Levels (readLevelsV1, readLevelsV2)
import DataFrame.IO.Parquet.Types (parquetTypeFromInt)
import DataFrame.IO.Unstable.Parquet.Thrift
import DataFrame.IO.Unstable.Parquet.Utils (
ColumnDescription (..),
PageDescription (..),
)
import DataFrame.IO.Utils.RandomAccess (RandomAccess)
import DataFrame.Internal.Column (Column)

parsePage ::
(RandomAccess r, MonadIO r) => ColumnDescription -> PageDescription -> r Column
parsePage description (PageDescription pageBytes header _ dictValsM pType') = do
let maxDef = fromIntegral $ maxDefinitionLevel description
maxRep = fromIntegral $ maxRepetitionLevel description
-- We do not have type lengths threaded effectively for Fixed Len yet, assume Nothing for now
-- unless handled correctly.
logicalType = pinchLogicalTypeToLogicalType <$> colLogicalType description
maybeTypeLen = Nothing
pType = parquetTypeFromInt . fromIntegral $ pType'

liftIO $ case unField (ph_data_page_header header) of
Just dph -> do
let n = fromIntegral $ unField (dph_num_values dph)
enc = parquetEncodingFromPinch (unField (dph_encoding dph))
(defLvls, repLvls, afterLvls) = readLevelsV1 n maxDef maxRep pageBytes
nPresent = length (filter (== maxDef) defLvls)
decodePageData
dictValsM
(maxDef, maxRep)
pType
maybeTypeLen
enc
defLvls
repLvls
nPresent
afterLvls
"v1"
Nothing -> case unField (ph_data_page_header_v2 header) of
Just dph2 -> do
let n = fromIntegral $ unField (dph2_num_values dph2)
enc = parquetEncodingFromPinch (unField (dph2_encoding dph2))
(defLvls, repLvls, afterLvls) =
readLevelsV2
n
maxDef
maxRep
(unField $ dph2_definition_levels_byte_length dph2)
(unField $ dph2_repetition_levels_byte_length dph2)
pageBytes
nPresent
| unField (dph2_num_nulls dph2) > 0 =
fromIntegral (unField (dph2_num_values dph2) - unField (dph2_num_nulls dph2))
| otherwise = length (filter (== maxDef) defLvls)
column <-
decodePageData
dictValsM
(maxDef, maxRep)
pType
maybeTypeLen
enc
defLvls
repLvls
nPresent
afterLvls
"v2"
case logicalType of
Nothing -> return column
Just lt -> return $ applyLogicalType lt column
Nothing -> error "Page header is neither v1 nor v2 data page"
Loading
Loading