Skip to content

Multi-source convert(): intermediate parquet paths collide across sources, causing write/read race (ArrowInvalid / FileNotFoundError) #442

@kenibrewer

Description

@kenibrewer

Summary

When cytotable.convert() is called with a source_path that contains multiple per-source subdirectories (e.g. analyses/1/analysis/, analyses/2/analysis/, analyses/3/analysis/, each holding a full set of CellProfiler CSVs), the intermediate parquet filenames produced by _source_pageset_to_parquet collide across sources. The destination path is derived only from the table stem and the immediate parent directory name (always analysis/), so every site writes to the identical path, e.g.:

<dest>/nuclei/analysis/Nuclei-1-1000.parquet   ← written by site 1, 2, and 3 simultaneously

This causes two distinct failure modes depending on thread concurrency:

  • max_threads >= 2 (race): _prepend_column_name (the downstream reader) reads a file that a concurrent _source_pageset_to_parquet writer has just opened/truncated but not yet flushed, yielding:

    pyarrow.lib.ArrowInvalid: Could not open Parquet input source
    '.../nuclei/analysis/Nuclei-1-1000.parquet':
    Parquet file size is 4 bytes, smaller than the minimum file footer (8 bytes)
    
  • max_threads=1 (serialized, still broken): The path collision means a later writer's AppFuture resolves to the same path string as an earlier site's. The earlier site's _prepend_column_name then tries to read the file but finds it missing (either because the later writer hasn't run yet, or because it wrote to a different actual file that was then unlinked), yielding:

    FileNotFoundError: .../cytoplasm/analysis/Cytoplasm-1-1000.parquet
    

The single-source case (source_path pointing to exactly one analysis/ directory) works correctly because there is no collision.

Affected versions

  • CytoTable 0.0.15 — container community.wave.seqera.io/library/pip_cytotable:e5e76f6f7c7bea96
  • CytoTable 1.2.0 — container community.wave.seqera.io/library/pip_cytotable:2c29a8bfc594d5ea (verified below)

Where the bug lives

All references are to cytotable/convert.py on main.

_source_pageset_to_parquet (lines 386–542) — the writer

# lines 430-432: destination path uses only table stem + parent dir name ("analysis")
source_dest_path = (
    f"{dest_path}/{str(AnyPath(source_group_name).stem).lower()}/"
    f"{str(source['source_path'].parent.name).lower()}"     # always "analysis"
)
# line 496: filename includes pageset range but NOT any per-source unique identifier
result_filepath = f"{result_filepath_base}-{pageset[0]}-{pageset[1]}.parquet"

For source group analyses/1/analysis/Nuclei.csv and analyses/2/analysis/Nuclei.csv, this produces the identical path <dest>/nuclei/analysis/Nuclei-1-1000.parquet.

_prepend_column_name (lines 545–685) — the reader that races the writer

# line 600: reads the parquet written above
table = parquet.read_table(
    source=table_path, memory_map=CYTOTABLE_ARROW_USE_MEMORY_MAPPING
)

table_path is the path string resolved from the AppFuture returned by _source_pageset_to_parquet. Because multiple sources share the same path string, the path resolution provides no guarantee that the correct version of the file (from this source's writer) is what's present on disk when the read executes.

_concat_source_group (lines 688–839) — second reader that also fails

# line 819: reads same colliding path
writer.write_table(
    parquet.read_table(
        table,
        schema=writer_schema,
        memory_map=CYTOTABLE_ARROW_USE_MEMORY_MAPPING,
    )
)

Minimal reproduction (verified on CytoTable 1.2.0)

1. Generate synthetic data

mkdir -p /tmp/cytotable_repro
python3 - <<'PY'
import csv, os, random
random.seed(42)
SITES = [
    {"dir": "1", "plate": "Plate1", "well": "A01", "site": 1},
    {"dir": "2", "plate": "Plate1", "well": "A01", "site": 2},
    {"dir": "3", "plate": "Plate1", "well": "A02", "site": 1},
]
N_IMAGES, N_OBJECTS = 5, 200
rval = lambda: round(random.gauss(50, 15), 6)

def write_image_csv(path, plate, well, site):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    with open(path, "w", newline="") as f:
        w = csv.writer(f)
        w.writerow(["ImageNumber","Metadata_Plate","Metadata_Well","Metadata_Site",
                    "Image_FileName_DAPI","Image_FileName_GFP","Image_FileName_Mito",
                    "Image_FileName_Ph_golgi","Image_FileName_Syto14",
                    "Count_Cells","Count_Nuclei","Count_Cytoplasm",
                    "Intensity_MeanIntensity_DAPI","Intensity_MeanIntensity_GFP"])
        for i in range(1, N_IMAGES+1):
            w.writerow([i, plate, well, site,
                        f"f{i:02d}-ch1.tiff", f"f{i:02d}-ch2.tiff",
                        f"f{i:02d}-ch3.tiff", f"f{i:02d}-ch4.tiff",
                        f"f{i:02d}-ch5.tiff",
                        N_OBJECTS, N_OBJECTS, N_OBJECTS, rval(), rval()])

def write_object_csv(path, tbl, extra):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    fcols = ([f"{tbl}_AreaShape_{m}" for m in ["Area","Compactness","Eccentricity","EquivalentDiameter",
        "Extent","FormFactor","MajorAxisLength","MaxFeretDiameter","MaximumRadius","MeanRadius",
        "MedianRadius","MinFeretDiameter","MinorAxisLength","Orientation","Perimeter","Solidity"]]
        + [f"{tbl}_Intensity_{m}_DAPI" for m in ["IntegratedIntensity","MeanIntensity","MaxIntensity",
        "MinIntensity","StdIntensity","LowerQuartileIntensity","UpperQuartileIntensity",
        "MADIntensity","MassDisplacement","MedianIntensity"]]
        + [f"{tbl}_Texture_{m}_DAPI_3_00_256" for m in ["AngularSecondMoment","Contrast","Correlation",
        "DifferenceEntropy","DifferenceVariance","Entropy","IMC1","IMC2","InfoMeas1","InfoMeas2",
        "InverseDifferenceMoment","SumAverage","SumEntropy","SumVariance","Variance"]])
    with open(path, "w", newline="") as f:
        w = csv.writer(f)
        w.writerow(["ImageNumber","ObjectNumber"] + extra + fcols)
        n = 0
        for i in range(1, N_IMAGES+1):
            for _ in range(N_OBJECTS):
                n += 1
                w.writerow([i, n] + [n]*len(extra) + [rval() for _ in fcols])

for s in SITES:
    d = f"/tmp/cytotable_repro/analyses/{s['dir']}/analysis"
    write_image_csv(f"{d}/Image.csv", s["plate"], s["well"], s["site"])
    write_object_csv(f"{d}/Cells.csv", "Cells", ["Parent_Nuclei"])
    write_object_csv(f"{d}/Nuclei.csv", "Nuclei", [])
    write_object_csv(f"{d}/Cytoplasm.csv", "Cytoplasm", ["Parent_Cells","Parent_Nuclei"])
    print(f"Generated site {s['dir']}")
PY

2. Multi-source — fails (ArrowInvalid, max_threads=2)

docker run --rm -v /tmp/cytotable_repro:/data \
  community.wave.seqera.io/library/pip_cytotable:2c29a8bfc594d5ea \
  python3 -c "
import os, sys, traceback
from importlib.metadata import version
from cytotable import convert
from parsl.config import Config
from parsl.executors import ThreadPoolExecutor
print('CytoTable:', version('cytotable'))
try:
    convert(source_path='/data/analyses', source_datatype='csv',
            dest_path='/data/multi_t2.parquet', dest_datatype='parquet',
            preset='cellprofiler_csv',
            parsl_config=Config(executors=[ThreadPoolExecutor(max_threads=2)]))
    print('SUCCESS')
except Exception as e:
    print('FAILURE:', type(e).__name__, str(e)[:200])
    traceback.print_exc()
"

Observed output:

CytoTable: 1.2.0
FAILURE: ArrowInvalid Could not open Parquet input source
'/data/multi_t2.parquet/nuclei/analysis/Nuclei-1-1000.parquet':
Parquet file size is 4 bytes, smaller than the minimum file footer (8 bytes)
...
  File "cytotable/convert.py", line 600, in _prepend_column_name
    table = parquet.read_table(
pyarrow.lib.ArrowInvalid: Could not open Parquet input source
'.../nuclei/analysis/Nuclei-1-1000.parquet':
Parquet file size is 4 bytes, smaller than the minimum file footer (8 bytes)

3. Multi-source — fails (FileNotFoundError, max_threads=1)

docker run --rm -v /tmp/cytotable_repro:/data \
  community.wave.seqera.io/library/pip_cytotable:2c29a8bfc594d5ea \
  python3 -c "
import os, sys, traceback
from importlib.metadata import version
from cytotable import convert
from parsl.config import Config
from parsl.executors import ThreadPoolExecutor
print('CytoTable:', version('cytotable'))
try:
    convert(source_path='/data/analyses', source_datatype='csv',
            dest_path='/data/multi_t1.parquet', dest_datatype='parquet',
            preset='cellprofiler_csv',
            parsl_config=Config(executors=[ThreadPoolExecutor(max_threads=1)]))
    print('SUCCESS')
except Exception as e:
    print('FAILURE:', type(e).__name__, str(e)[:200])
    traceback.print_exc()
"

Observed output:

CytoTable: 1.2.0
FAILURE: FileNotFoundError /data/multi_t1.parquet/cytoplasm/analysis/Cytoplasm-1-1000.parquet
...
  File "cytotable/convert.py", line 819, in _concat_source_group
    parquet.read_table(
FileNotFoundError: /data/multi_t1.parquet/cytoplasm/analysis/Cytoplasm-1-1000.parquet

The max_threads=1 → FileNotFoundError shift is the smoking gun: even forced serialization doesn't fix it because the path collision itself is the root cause, not thread interleaving. A _source_pageset_to_parquet task for one source writes Nuclei-1-1000.parquet, a later task for a different source overwrites the same path, and the original source's _prepend_column_name or _concat_source_group task finds unexpected (or missing) content.

4. Single-source control — succeeds (max_threads=2, concat only)

docker run --rm -v /tmp/cytotable_repro:/data \
  community.wave.seqera.io/library/pip_cytotable:2c29a8bfc594d5ea \
  python3 -c "
import os
from importlib.metadata import version
from cytotable import convert
from parsl.config import Config
from parsl.executors import ThreadPoolExecutor
print('CytoTable:', version('cytotable'))
convert(source_path='/data/analyses/1/analysis', source_datatype='csv',
        dest_path='/data/single.parquet', dest_datatype='parquet',
        preset='cellprofiler_csv', join=False,
        parsl_config=Config(executors=[ThreadPoolExecutor(max_threads=2)]))
print('SUCCESS')
"

Observed output: SUCCESS — confirms the bug is specific to the multi-source / cross-source-concat code path.

Suggested fix direction

The intermediate parquet path in _source_pageset_to_parquet (lines 430–432) must be made unique per source. The simplest fix is to include a per-source discriminator in the path — for example a hash of the full source path, or the index of the source in the sorted source list — so that concurrent writers for different sources never touch the same file:

# current (collides across sources with the same parent dir name):
source_dest_path = f"{dest_path}/{table_stem}/{source['source_path'].parent.name}"

# fix: include a per-source unique component, e.g. the grandparent dir name:
source_dest_path = (
    f"{dest_path}/{table_stem}/"
    f"{source['source_path'].parent.parent.name}/"   # e.g. "1", "2", "3"
    f"{source['source_path'].parent.name}"
)

Alternatively, the _concat_source_group call graph could be restructured so that each per-source intermediate result lands in a truly unique scratch location, with the concat step aggregating them into the canonical path.

Context

Encountered while implementing per-plate CytoTable invocation in nf-core/cellpainting (issue nf-core/cellpainting#40), where CytoTable is called with a list of per-site analysis directories for a single plate.

See also

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions