Skip to content

Commit bfc6492

Browse files
fix: serialize pushed filters across workers, add StorageSplitOpener
1 parent d3ac8c0 commit bfc6492

6 files changed

Lines changed: 101 additions & 13 deletions

File tree

quickwit/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-datafusion/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ quickwit-metastore = { workspace = true }
2727
quickwit-proto = { workspace = true }
2828
quickwit-query = { workspace = true }
2929
quickwit-search = { workspace = true }
30+
quickwit-storage = { workspace = true }
3031

3132
datafusion = "52"
3233
datafusion-datasource = "52"

quickwit/quickwit-datafusion/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,6 @@ pub mod worker;
99
pub use flight::{QuickwitFlightService, build_flight_service};
1010
pub use resolver::QuickwitWorkerResolver;
1111
pub use session::QuickwitSessionBuilder;
12-
pub use split_opener::{SplitIndexOpener, SplitRegistry};
12+
pub use split_opener::{SplitIndexOpener, SplitRegistry, StorageSplitOpener};
1313
pub use table_provider::{OpenerFactory, QuickwitTableProvider};
1414
pub use worker::build_worker_session_builder;

quickwit/quickwit-datafusion/src/split_opener.rs

Lines changed: 96 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,21 @@ use async_trait::async_trait;
66
use dashmap::DashMap;
77
use datafusion::common::Result;
88
use datafusion::error::DataFusionError;
9+
use quickwit_proto::search::SplitIdAndFooterOffsets;
10+
use quickwit_search::SearcherContext;
11+
use quickwit_storage::Storage;
912
use tantivy::Index;
1013
use tantivy_datafusion::IndexOpener;
1114

1215
/// Registry of opened tantivy indexes, keyed by split ID.
13-
///
14-
/// For integration tests this is populated before query execution.
15-
/// In production this would be replaced by `open_index_with_caches()`.
16+
/// Used for integration tests. Production uses [`StorageSplitOpener`].
1617
pub type SplitRegistry = DashMap<String, Index>;
1718

19+
// ── Test opener (DashMap-backed) ────────────────────────────────────
20+
1821
/// An [`IndexOpener`] backed by an in-memory [`SplitRegistry`].
1922
///
20-
/// Planning-time metadata (schema, segment sizes) is stored inline so
21-
/// that the opener can answer schema/partition queries without touching
22-
/// the registry. The actual [`open`](IndexOpener::open) call looks up
23-
/// the registry at execution time.
23+
/// For integration tests only. Production uses [`StorageSplitOpener`].
2424
#[derive(Clone)]
2525
pub struct SplitIndexOpener {
2626
split_id: String,
@@ -44,8 +44,6 @@ impl SplitIndexOpener {
4444
}
4545
}
4646

47-
/// Build an opener by extracting schema and segment sizes from an
48-
/// already-opened index, then inserting it into the registry.
4947
pub fn from_index(split_id: String, index: Index, registry: Arc<SplitRegistry>) -> Self {
5048
let tantivy_schema = index.schema();
5149
let segment_sizes = index
@@ -76,7 +74,6 @@ impl fmt::Debug for SplitIndexOpener {
7674
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
7775
f.debug_struct("SplitIndexOpener")
7876
.field("split_id", &self.split_id)
79-
.field("segment_sizes", &self.segment_sizes)
8077
.finish()
8178
}
8279
}
@@ -111,3 +108,92 @@ impl IndexOpener for SplitIndexOpener {
111108
self
112109
}
113110
}
111+
112+
// ── Production opener (storage-backed) ──────────────────────────────
113+
114+
/// An [`IndexOpener`] that downloads and opens splits from object
115+
/// storage using Quickwit's caching infrastructure.
116+
///
117+
/// At execution time (on the worker), calls `open_index_with_caches()`
118+
/// to download the split bundle from S3/GCS/local storage, warm the
119+
/// footer cache + fast field cache, and return an opened tantivy `Index`.
120+
///
121+
/// Planning-time metadata (schema, segment sizes) is stored inline —
122+
/// no I/O during plan construction.
123+
#[derive(Clone)]
124+
pub struct StorageSplitOpener {
125+
split_id: String,
126+
tantivy_schema: tantivy::schema::Schema,
127+
segment_sizes: Vec<u32>,
128+
searcher_context: Arc<SearcherContext>,
129+
storage: Arc<dyn Storage>,
130+
footer_offsets: SplitIdAndFooterOffsets,
131+
}
132+
133+
impl StorageSplitOpener {
134+
pub fn new(
135+
split_id: String,
136+
tantivy_schema: tantivy::schema::Schema,
137+
segment_sizes: Vec<u32>,
138+
searcher_context: Arc<SearcherContext>,
139+
storage: Arc<dyn Storage>,
140+
split_footer_start: u64,
141+
split_footer_end: u64,
142+
) -> Self {
143+
let footer_offsets = SplitIdAndFooterOffsets {
144+
split_id: split_id.clone(),
145+
split_footer_start,
146+
split_footer_end,
147+
..Default::default()
148+
};
149+
Self {
150+
split_id,
151+
tantivy_schema,
152+
segment_sizes,
153+
searcher_context,
154+
storage,
155+
footer_offsets,
156+
}
157+
}
158+
}
159+
160+
impl fmt::Debug for StorageSplitOpener {
161+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
162+
f.debug_struct("StorageSplitOpener")
163+
.field("split_id", &self.split_id)
164+
.finish()
165+
}
166+
}
167+
168+
#[async_trait]
169+
impl IndexOpener for StorageSplitOpener {
170+
async fn open(&self) -> Result<Index> {
171+
let (index, _hot_directory) = quickwit_search::leaf::open_index_with_caches(
172+
&self.searcher_context,
173+
self.storage.clone(),
174+
&self.footer_offsets,
175+
None, // tokenizer_manager — TODO: pass from doc mapper
176+
None, // ephemeral cache — TODO: pass from searcher context
177+
)
178+
.await
179+
.map_err(|e| DataFusionError::Execution(format!("open split {}: {e}", self.split_id)))?;
180+
181+
Ok(index)
182+
}
183+
184+
fn schema(&self) -> tantivy::schema::Schema {
185+
self.tantivy_schema.clone()
186+
}
187+
188+
fn segment_sizes(&self) -> Vec<u32> {
189+
self.segment_sizes.clone()
190+
}
191+
192+
fn identifier(&self) -> &str {
193+
&self.split_id
194+
}
195+
196+
fn as_any(&self) -> &dyn Any {
197+
self
198+
}
199+
}

quickwit/quickwit-search/src/leaf.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ fn configure_storage_retries(
145145
/// - A fast fields cache given by `SearcherContext.storage_long_term_cache`.
146146
/// - An ephemeral unbounded cache directory (whose lifetime is tied to the returned `Index` if no
147147
/// `ByteRangeCache` is provided).
148-
pub(crate) async fn open_index_with_caches(
148+
pub async fn open_index_with_caches(
149149
searcher_context: &SearcherContext,
150150
index_storage: Arc<dyn Storage>,
151151
split_and_footer_offsets: &SplitIdAndFooterOffsets,

quickwit/quickwit-search/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ mod collector;
2323
mod error;
2424
mod fetch_docs;
2525
mod find_trace_ids_collector;
26-
mod leaf;
26+
pub mod leaf;
2727
mod leaf_cache;
2828
mod list_fields;
2929
mod list_fields_cache;

0 commit comments

Comments
 (0)