@@ -199,7 +199,8 @@ impl HotTierManager {
199199 . await ?;
200200
201201 let mut stream_hot_tier: StreamHotTier = serde_json:: from_slice ( & bytes) ?;
202- stream_hot_tier. oldest_date_time_entry = self . get_oldest_date_time_entry ( stream) . await ?;
202+ stream_hot_tier. oldest_date_time_entry =
203+ self . get_oldest_date_time_entry ( stream, tenant_id) . await ?;
203204
204205 Ok ( stream_hot_tier)
205206 }
@@ -450,15 +451,15 @@ impl HotTierManager {
450451 self . put_hot_tier ( stream, & mut stream_hot_tier, tenant_id)
451452 . await ?;
452453 file_processed = true ;
453- let path = self . get_stream_path_for_date ( stream, & date) ;
454+ let path = self . get_stream_path_for_date ( stream, & date, tenant_id ) ;
454455 let mut hot_tier_manifest = HotTierManager :: get_hot_tier_manifest_from_path ( path) . await ?;
455456 hot_tier_manifest. files . push ( parquet_file. clone ( ) ) ;
456457 hot_tier_manifest
457458 . files
458459 . sort_by_key ( |file| file. file_path . clone ( ) ) ;
459460 // write the manifest file to the hot tier directory
460461 let manifest_path = self
461- . get_stream_path_for_date ( stream, & date)
462+ . get_stream_path_for_date ( stream, & date, tenant_id )
462463 . join ( "hottier.manifest.json" ) ;
463464 fs:: create_dir_all ( manifest_path. parent ( ) . unwrap ( ) ) . await ?;
464465 fs:: write ( manifest_path, serde_json:: to_vec ( & hot_tier_manifest) ?) . await ?;
@@ -467,9 +468,18 @@ impl HotTierManager {
467468 }
468469
469470 ///fetch the list of dates available in the hot tier directory for the stream and sort them
470- pub async fn fetch_hot_tier_dates ( & self , stream : & str ) -> Result < Vec < NaiveDate > , HotTierError > {
471+ pub async fn fetch_hot_tier_dates (
472+ & self ,
473+ stream : & str ,
474+ tenant_id : & Option < String > ,
475+ ) -> Result < Vec < NaiveDate > , HotTierError > {
471476 let mut date_list = Vec :: new ( ) ;
472- let path = self . hot_tier_path . join ( stream) ;
477+ let path = if let Some ( tenant) = tenant_id. as_ref ( ) {
478+ self . hot_tier_path . join ( tenant) . join ( stream)
479+ } else {
480+ self . hot_tier_path . join ( stream)
481+ } ;
482+ // let path = self.hot_tier_path.join(stream);
473483 if !path. exists ( ) {
474484 return Ok ( date_list) ;
475485 }
@@ -524,37 +534,47 @@ impl HotTierManager {
524534 }
525535
526536 /// get hot tier path for the stream and date
527- pub fn get_stream_path_for_date ( & self , stream : & str , date : & NaiveDate ) -> PathBuf {
528- self . hot_tier_path . join ( stream) . join ( format ! ( "date={date}" ) )
537+ pub fn get_stream_path_for_date (
538+ & self ,
539+ stream : & str ,
540+ date : & NaiveDate ,
541+ tenant_id : & Option < String > ,
542+ ) -> PathBuf {
543+ if let Some ( tenant) = tenant_id. as_ref ( ) {
544+ self . hot_tier_path
545+ . join ( tenant)
546+ . join ( stream)
547+ . join ( format ! ( "date={date}" ) )
548+ } else {
549+ self . hot_tier_path . join ( stream) . join ( format ! ( "date={date}" ) )
550+ }
529551 }
530552
531553 /// Returns the list of manifest files present in hot tier directory for the stream
532554 pub async fn get_hot_tier_manifest_files (
533555 & self ,
534- stream : & str ,
535556 manifest_files : & mut Vec < File > ,
536557 ) -> Result < Vec < File > , HotTierError > {
537- // Fetch the list of hot tier parquet files for the given stream.
538- let mut hot_tier_files = self . get_hot_tier_parquet_files ( stream) . await ?;
539-
540- // Retain only the files in `hot_tier_files` that also exist in `manifest_files`.
541- hot_tier_files. retain ( |file| {
542- manifest_files
543- . iter ( )
544- . any ( |manifest_file| manifest_file. file_path . eq ( & file. file_path ) )
545- } ) ;
558+ // Check which query-relevant files exist locally in the hot tier directory.
559+ let mut hot_tier_files = Vec :: new ( ) ;
560+ let mut remaining = Vec :: with_capacity ( manifest_files. len ( ) ) ;
561+
562+ for file in manifest_files. drain ( ..) {
563+ let hot_tier_path = self . hot_tier_path . join ( & file. file_path ) ;
564+ if let Ok ( meta) = fs:: metadata ( & hot_tier_path) . await
565+ && meta. len ( ) == file. file_size
566+ {
567+ hot_tier_files. push ( file) ;
568+ continue ;
569+ }
546570
547- // Sort `hot_tier_files` in descending order by file path.
548- hot_tier_files . sort_unstable_by ( |a , b| b . file_path . cmp ( & a . file_path ) ) ;
571+ remaining . push ( file ) ;
572+ }
549573
550- // Update `manifest_files` to exclude files that are present in the filtered `hot_tier_files`.
551- manifest_files. retain ( |manifest_file| {
552- hot_tier_files
553- . iter ( )
554- . all ( |file| !file. file_path . eq ( & manifest_file. file_path ) )
555- } ) ;
574+ * manifest_files = remaining;
556575
557- // Sort `manifest_files` in descending order by file path.
576+ // Sort both lists in descending order by file path.
577+ hot_tier_files. sort_unstable_by ( |a, b| b. file_path . cmp ( & a. file_path ) ) ;
558578 manifest_files. sort_unstable_by ( |a, b| b. file_path . cmp ( & a. file_path ) ) ;
559579
560580 Ok ( hot_tier_files)
@@ -564,16 +584,17 @@ impl HotTierManager {
564584 pub async fn get_hot_tier_parquet_files (
565585 & self ,
566586 stream : & str ,
587+ tenant_id : & Option < String > ,
567588 ) -> Result < Vec < File > , HotTierError > {
568589 // Fetch list of dates for the given stream
569- let date_list = self . fetch_hot_tier_dates ( stream) . await ?;
590+ let date_list = self . fetch_hot_tier_dates ( stream, tenant_id ) . await ?;
570591
571592 // Create an unordered iter of futures to async collect files
572593 let mut tasks = FuturesUnordered :: new ( ) ;
573594
574595 // For each date, fetch the manifest and extract parquet files
575596 for date in date_list {
576- let path = self . get_stream_path_for_date ( stream, & date) ;
597+ let path = self . get_stream_path_for_date ( stream, & date, tenant_id ) ;
577598 tasks. push ( async move {
578599 HotTierManager :: get_hot_tier_manifest_from_path ( path)
579600 . await
@@ -621,9 +642,9 @@ impl HotTierManager {
621642 tenant_id : & Option < String > ,
622643 ) -> Result < bool , HotTierError > {
623644 let mut delete_successful = false ;
624- let dates = self . fetch_hot_tier_dates ( stream) . await ?;
645+ let dates = self . fetch_hot_tier_dates ( stream, tenant_id ) . await ?;
625646 ' loop_dates: for date in dates {
626- let path = self . get_stream_path_for_date ( stream, & date) ;
647+ let path = self . get_stream_path_for_date ( stream, & date, tenant_id ) ;
627648 if !path. exists ( ) {
628649 continue ;
629650 }
@@ -712,14 +733,15 @@ impl HotTierManager {
712733 pub async fn get_oldest_date_time_entry (
713734 & self ,
714735 stream : & str ,
736+ tenant_id : & Option < String > ,
715737 ) -> Result < Option < String > , HotTierError > {
716- let date_list = self . fetch_hot_tier_dates ( stream) . await ?;
738+ let date_list = self . fetch_hot_tier_dates ( stream, tenant_id ) . await ?;
717739 if date_list. is_empty ( ) {
718740 return Ok ( None ) ;
719741 }
720742
721743 for date in date_list {
722- let path = self . get_stream_path_for_date ( stream, & date) ;
744+ let path = self . get_stream_path_for_date ( stream, & date, tenant_id ) ;
723745 let hours_dir = ReadDirStream :: new ( fs:: read_dir ( & path) . await ?) ;
724746 let mut hours: Vec < DirEntry > = hours_dir. try_collect ( ) . await ?;
725747 hours. retain ( |entry| {
0 commit comments