Skip to content

Commit 12a4138

Browse files
committed
Add integration test
1 parent 9501994 commit 12a4138

1 file changed

Lines changed: 194 additions & 2 deletions

File tree

quickwit/quickwit-indexing/src/mature_merge.rs

Lines changed: 194 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -298,8 +298,8 @@ async fn merge_mature_single_index(
298298
data_dir_path: &std::path::Path,
299299
config: &MatureMergeConfig,
300300
node_id: NodeId,
301+
now: OffsetDateTime,
301302
) -> anyhow::Result<IndexMergeSummary> {
302-
let now = OffsetDateTime::now_utc();
303303
let index_id = index_metadata.index_config.index_id.clone();
304304
let operations = fetch_splits_and_plan(&index_metadata, metastore, now, config).await?;
305305
let num_merges_planned = operations.len();
@@ -502,6 +502,7 @@ pub async fn merge_mature_all_indexes(
502502
let node_id = node_id.clone();
503503
let semaphore = Arc::clone(&semaphore);
504504
async move {
505+
let now = OffsetDateTime::now_utc();
505506
merge_mature_single_index(
506507
index_metadata,
507508
metastore_ref,
@@ -510,6 +511,7 @@ pub async fn merge_mature_all_indexes(
510511
data_dir_path,
511512
config_ref,
512513
node_id,
514+
now,
513515
)
514516
.await
515517
}
@@ -527,12 +529,14 @@ mod tests {
527529
use std::sync::Arc;
528530

529531
use quickwit_common::temp_dir::TempDirectory;
532+
use quickwit_config::ConfigFormat;
530533
use quickwit_metastore::{
531534
IndexMetadata, IndexMetadataResponseExt, SplitMaturity, SplitMetadata,
535+
UpdateIndexRequestExt,
532536
};
533537
use quickwit_proto::metastore::{
534538
IndexMetadataRequest, ListSplitsRequest, MetastoreService, MetastoreServiceClient,
535-
MockMetastoreService,
539+
MockMetastoreService, UpdateIndexRequest,
536540
};
537541
use quickwit_proto::types::NodeId;
538542
use quickwit_storage::RamStorage;
@@ -644,6 +648,194 @@ mod tests {
644648
assert_eq!(published_after.len(), 1);
645649
assert_eq!(published_after[0].num_docs, 4);
646650
assert_eq!(published_after[0].maturity, SplitMaturity::Mature);
651+
assert_eq!(
652+
published_after[0].time_range,
653+
Some(1_631_072_713..=1_631_072_716)
654+
);
655+
656+
test_sandbox.assert_quit().await;
657+
Ok(())
658+
}
659+
660+
#[tokio::test]
661+
async fn test_merge_mature_single_index_schema_evolution() -> anyhow::Result<()> {
662+
let doc_mapping_v1_yaml = r#"
663+
field_mappings:
664+
- name: ts
665+
type: datetime
666+
input_formats: [unix_timestamp]
667+
fast: true
668+
- name: label
669+
type: text
670+
fast: true
671+
tokenizer: lowercase
672+
timestamp_field: ts
673+
"#;
674+
let test_sandbox =
675+
TestSandbox::create("test-index-schema-evo", doc_mapping_v1_yaml, "", &["label"])
676+
.await?;
677+
678+
// Index 3 docs with v1 mapping (lowercase tokenizer, no secondary timestamp).
679+
for i in 0..3u64 {
680+
test_sandbox
681+
.add_documents(std::iter::once(
682+
serde_json::json!({"label": format!("Doc{i}"), "ts": 1_631_072_713u64 + i}),
683+
))
684+
.await?;
685+
}
686+
687+
let metastore = test_sandbox.metastore();
688+
let index_uid = test_sandbox.index_uid();
689+
690+
let v1_splits: Vec<SplitMetadata> = metastore
691+
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
692+
.await?
693+
.collect_splits_metadata()
694+
.await?;
695+
assert_eq!(v1_splits.len(), 3);
696+
let v1_doc_mapping_uid = v1_splits[0].doc_mapping_uid;
697+
698+
// Update the index config: change tokenizer to `default` and add a secondary timestamp.
699+
let index_metadata_v1 = metastore
700+
.index_metadata(IndexMetadataRequest::for_index_id(
701+
index_uid.index_id.to_string(),
702+
))
703+
.await?
704+
.deserialize_index_metadata()?;
705+
let doc_mapping_v2 = ConfigFormat::Yaml.parse(
706+
r#"
707+
field_mappings:
708+
- name: ts
709+
type: datetime
710+
input_formats: [unix_timestamp]
711+
fast: true
712+
- name: label
713+
type: text
714+
fast: true
715+
tokenizer: default
716+
- name: ts2
717+
type: datetime
718+
input_formats: [unix_timestamp]
719+
fast: true
720+
timestamp_field: ts
721+
secondary_timestamp_field: ts2
722+
"#
723+
.as_bytes(),
724+
)?;
725+
let update_request = UpdateIndexRequest::try_from_updates(
726+
index_uid.clone(),
727+
&doc_mapping_v2,
728+
&index_metadata_v1.index_config.indexing_settings,
729+
&index_metadata_v1.index_config.ingest_settings,
730+
&index_metadata_v1.index_config.search_settings,
731+
&index_metadata_v1.index_config.retention_policy_opt,
732+
)?;
733+
metastore.update_index(update_request).await?;
734+
735+
// Index 3 more docs with v2 mapping (default tokenizer, secondary timestamp present).
736+
for i in 3..6u64 {
737+
test_sandbox
738+
.add_documents(std::iter::once(serde_json::json!({
739+
"label": format!("Doc{i}"),
740+
"ts": 1_631_072_713u64 + i,
741+
"ts2": 1_631_072_713u64 + i + 1000,
742+
})))
743+
.await?;
744+
}
745+
746+
let all_splits: Vec<SplitMetadata> = metastore
747+
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
748+
.await?
749+
.collect_splits_metadata()
750+
.await?;
751+
assert_eq!(all_splits.len(), 6);
752+
let v2_doc_mapping_uid = all_splits
753+
.iter()
754+
.find(|s| s.doc_mapping_uid != v1_doc_mapping_uid)
755+
.unwrap()
756+
.doc_mapping_uid;
757+
assert_eq!(
758+
all_splits
759+
.iter()
760+
.filter(|s| s.doc_mapping_uid == v1_doc_mapping_uid)
761+
.count(),
762+
3
763+
);
764+
assert_eq!(
765+
all_splits
766+
.iter()
767+
.filter(|s| s.doc_mapping_uid == v2_doc_mapping_uid)
768+
.count(),
769+
3
770+
);
771+
772+
let index_metadata_v2 = metastore
773+
.index_metadata(IndexMetadataRequest::for_index_id(
774+
index_uid.index_id.to_string(),
775+
))
776+
.await?
777+
.deserialize_index_metadata()?;
778+
let data_dir = TempDirectory::for_test();
779+
let semaphore = Arc::new(Semaphore::new(2));
780+
// Splits have the default 48h maturation period. Pass a `now` far enough in the future
781+
// so all splits (both v1 and v2) are mature at `now - MATURITY_BUFFER (6h)`.
782+
let now = OffsetDateTime::now_utc() + time::Duration::days(3);
783+
// Override min_merge_group_size to 2 so that 3-split groups qualify.
784+
let config = MatureMergeConfig {
785+
min_merge_group_size: 2,
786+
..MatureMergeConfig::default()
787+
};
788+
789+
let summary = merge_mature_single_index(
790+
index_metadata_v2,
791+
&metastore,
792+
&test_sandbox.storage_resolver(),
793+
semaphore,
794+
data_dir.path(),
795+
&config,
796+
test_sandbox.node_id(),
797+
now,
798+
)
799+
.await?;
800+
801+
// Both the v1 and v2 groups (3 splits each, different doc_mapping_uid) get merged.
802+
assert_eq!(summary.num_merges_planned, 2);
803+
assert_eq!(summary.outcome.num_published_merges, 2);
804+
assert_eq!(summary.outcome.num_replaced_splits, 6);
805+
806+
let published_after: Vec<SplitMetadata> = metastore
807+
.list_splits(ListSplitsRequest::try_from_list_splits_query(
808+
&ListSplitsQuery::for_index(index_uid).with_split_state(SplitState::Published),
809+
)?)
810+
.await?
811+
.collect_splits_metadata()
812+
.await?;
813+
assert_eq!(published_after.len(), 2);
814+
815+
// The merged v1 split preserves the original doc_mapping_uid, time range, and has no
816+
// secondary_time_range because the v1 schema had no secondary timestamp field.
817+
let merged_v1 = published_after
818+
.iter()
819+
.find(|s| s.doc_mapping_uid == v1_doc_mapping_uid)
820+
.expect("merged v1 split must exist");
821+
assert_eq!(merged_v1.num_docs, 3);
822+
assert_eq!(merged_v1.maturity, SplitMaturity::Mature);
823+
assert_eq!(merged_v1.time_range, Some(1_631_072_713..=1_631_072_715));
824+
assert_eq!(merged_v1.secondary_time_range, None);
825+
826+
// The merged v2 split has the updated doc_mapping_uid and a secondary_time_range
827+
// derived from the ts2 field.
828+
let merged_v2 = published_after
829+
.iter()
830+
.find(|s| s.doc_mapping_uid == v2_doc_mapping_uid)
831+
.expect("merged v2 split must exist");
832+
assert_eq!(merged_v2.num_docs, 3);
833+
assert_eq!(merged_v2.maturity, SplitMaturity::Mature);
834+
assert_eq!(merged_v2.time_range, Some(1_631_072_716..=1_631_072_718));
835+
assert_eq!(
836+
merged_v2.secondary_time_range,
837+
Some(1_631_073_716..=1_631_073_718)
838+
);
647839

648840
test_sandbox.assert_quit().await;
649841
Ok(())

0 commit comments

Comments
 (0)