1616// under the License.
1717
1818use std:: collections:: HashMap ;
19- use std:: ops:: Deref ;
20- use std:: sync:: { Arc , RwLock } ;
19+ use std:: sync:: Arc ;
2120
22- use futures:: StreamExt ;
23- use futures:: channel:: mpsc:: { Sender , channel} ;
24- use tokio:: sync:: Notify ;
25-
26- use crate :: runtime:: spawn;
2721use crate :: scan:: { DeleteFileContext , FileScanTaskDeleteFile } ;
2822use crate :: spec:: { DataContentType , DataFile , Struct } ;
2923
3024/// Index of delete files
31- #[ derive( Debug , Clone ) ]
25+ #[ derive( Debug , Default ) ]
3226pub ( crate ) struct DeleteFileIndex {
33- state : Arc < RwLock < DeleteFileIndexState > > ,
34- }
35-
36- #[ derive( Debug ) ]
37- enum DeleteFileIndexState {
38- Populating ( Arc < Notify > ) ,
39- Populated ( PopulatedDeleteFileIndex ) ,
40- }
41-
42- #[ derive( Debug ) ]
43- struct PopulatedDeleteFileIndex {
4427 #[ allow( dead_code) ]
4528 global_deletes : Vec < Arc < DeleteFileContext > > ,
4629 eq_deletes_by_partition : HashMap < Struct , Vec < Arc < DeleteFileContext > > > ,
4730 pos_deletes_by_partition : HashMap < Struct , Vec < Arc < DeleteFileContext > > > ,
48- // TODO: do we need this?
49- // pos_deletes_by_path: HashMap<String, Vec<Arc<DeleteFileContext>>>,
50-
5131 // TODO: Deletion Vector support
5232}
5333
54- impl DeleteFileIndex {
55- /// create a new `DeleteFileIndex` along with the sender that populates it with delete files
56- pub ( crate ) fn new ( ) -> ( DeleteFileIndex , Sender < DeleteFileContext > ) {
57- // TODO: what should the channel limit be?
58- let ( tx, rx) = channel ( 10 ) ;
59- let notify = Arc :: new ( Notify :: new ( ) ) ;
60- let state = Arc :: new ( RwLock :: new ( DeleteFileIndexState :: Populating (
61- notify. clone ( ) ,
62- ) ) ) ;
63- let delete_file_stream = rx. boxed ( ) ;
64-
65- spawn ( {
66- let state = state. clone ( ) ;
67- async move {
68- let delete_files = delete_file_stream. collect :: < Vec < _ > > ( ) . await ;
69-
70- let populated_delete_file_index = PopulatedDeleteFileIndex :: new ( delete_files) ;
71-
72- {
73- let mut guard = state. write ( ) . unwrap ( ) ;
74- * guard = DeleteFileIndexState :: Populated ( populated_delete_file_index) ;
75- }
76- notify. notify_waiters ( ) ;
77- }
78- } ) ;
79-
80- ( DeleteFileIndex { state } , tx)
81- }
82-
83- /// Gets all the delete files that apply to the specified data file.
84- pub ( crate ) async fn get_deletes_for_data_file (
85- & self ,
86- data_file : & DataFile ,
87- seq_num : Option < i64 > ,
88- ) -> Vec < FileScanTaskDeleteFile > {
89- let notifier = {
90- let guard = self . state . read ( ) . unwrap ( ) ;
91- match * guard {
92- DeleteFileIndexState :: Populating ( ref notifier) => notifier. clone ( ) ,
93- DeleteFileIndexState :: Populated ( ref index) => {
94- return index. get_deletes_for_data_file ( data_file, seq_num) ;
95- }
96- }
97- } ;
98-
99- notifier. notified ( ) . await ;
100-
101- let guard = self . state . read ( ) . unwrap ( ) ;
102- match guard. deref ( ) {
103- DeleteFileIndexState :: Populated ( index) => {
104- index. get_deletes_for_data_file ( data_file, seq_num)
105- }
106- _ => unreachable ! ( "Cannot be any other state than loaded" ) ,
107- }
108- }
109- }
110-
111- impl PopulatedDeleteFileIndex {
112- /// Creates a new populated delete file index from a list of delete file contexts, which
113- /// allows for fast lookup when determining which delete files apply to a given data file.
114- ///
115- /// 1. The partition information is extracted from each delete file's manifest entry.
116- /// 2. If the partition is empty and the delete file is not a positional delete,
117- /// it is added to the `global_deletes` vector
118- /// 3. Otherwise, the delete file is added to one of two hash maps based on its content type.
119- fn new ( files : Vec < DeleteFileContext > ) -> PopulatedDeleteFileIndex {
120- let mut eq_deletes_by_partition: HashMap < Struct , Vec < Arc < DeleteFileContext > > > =
121- HashMap :: default ( ) ;
122- let mut pos_deletes_by_partition: HashMap < Struct , Vec < Arc < DeleteFileContext > > > =
123- HashMap :: default ( ) ;
124-
125- let mut global_deletes: Vec < Arc < DeleteFileContext > > = vec ! [ ] ;
126-
127- files. into_iter ( ) . for_each ( |ctx| {
34+ impl Extend < DeleteFileContext > for DeleteFileIndex {
35+ fn extend < T : IntoIterator < Item = DeleteFileContext > > ( & mut self , iter : T ) {
36+ // 1. The partition information is extracted from each delete file's manifest entry.
37+ // 2. If the partition is empty and the delete file is not a positional delete,
38+ // it is added to the `global_deletes` vector
39+ // 3. Otherwise, the delete file is added to one of two hash maps based on its content type.
40+ for ctx in iter {
12841 let arc_ctx = Arc :: new ( ctx) ;
12942
13043 let partition = arc_ctx. manifest_entry . data_file ( ) . partition ( ) ;
13144
132- // The spec states that "Equality delete files stored with an unpartitioned spec are applied as global deletes".
45+ // The spec states that "Equality delete files stored with an unpartitioned spec
46+ // are applied as global deletes".
13347 if partition. fields ( ) . is_empty ( ) {
13448 // TODO: confirm we're good to skip here if we encounter a pos del
13549 if arc_ctx. manifest_entry . content_type ( ) != DataContentType :: PositionDeletes {
136- global_deletes. push ( arc_ctx) ;
137- return ;
50+ self . global_deletes . push ( arc_ctx) ;
51+ continue ;
13852 }
13953 }
14054
14155 let destination_map = match arc_ctx. manifest_entry . content_type ( ) {
142- DataContentType :: PositionDeletes => & mut pos_deletes_by_partition,
143- DataContentType :: EqualityDeletes => & mut eq_deletes_by_partition,
56+ DataContentType :: PositionDeletes => & mut self . pos_deletes_by_partition ,
57+ DataContentType :: EqualityDeletes => & mut self . eq_deletes_by_partition ,
14458 _ => unreachable ! ( ) ,
14559 } ;
14660
@@ -150,17 +64,13 @@ impl PopulatedDeleteFileIndex {
15064 entry. push ( arc_ctx. clone ( ) ) ;
15165 } )
15266 . or_insert ( vec ! [ arc_ctx. clone( ) ] ) ;
153- } ) ;
154-
155- PopulatedDeleteFileIndex {
156- global_deletes,
157- eq_deletes_by_partition,
158- pos_deletes_by_partition,
15967 }
16068 }
69+ }
16170
71+ impl DeleteFileIndex {
16272 /// Determine all the delete files that apply to the provided `DataFile`.
163- fn get_deletes_for_data_file (
73+ pub ( crate ) fn get_deletes_for_data_file (
16474 & self ,
16575 data_file : & DataFile ,
16676 seq_num : Option < i64 > ,
0 commit comments