@@ -19,6 +19,7 @@ use std::sync::Arc;
1919
2020use futures:: StreamExt ;
2121use futures:: stream:: BoxStream ;
22+ use tracing:: field:: Empty ;
2223
2324use crate :: delete_file_index:: DeleteFileIndex ;
2425use crate :: expr:: { Bind , BoundPredicate , Predicate } ;
@@ -53,14 +54,26 @@ pub(crate) struct ManifestEntryContext {
5354 pub bound_predicates : Option < Arc < BoundPredicates > > ,
5455 pub partition_spec_id : i32 ,
5556 pub snapshot_schema : SchemaRef ,
57+ pub ( crate ) span : tracing:: Span ,
5658}
5759
5860impl ManifestFileContext {
5961 /// Consumes this [`ManifestFileContext`], fetching its Manifest from FileIO and then
6062 /// streaming its constituent [`ManifestEntries`]
6163 pub ( crate ) async fn fetch_manifest_and_stream_entries (
6264 self ,
65+ parent_span : tracing:: Span ,
6366 ) -> Result < BoxStream < ' static , Result < ManifestEntryContext > > > {
67+ let manifest_span = tracing:: debug_span!(
68+ parent: parent_span,
69+ "iceberg.scan.plan.process_manifest" ,
70+ iceberg. scan. plan. manifest. file_path = self . manifest_file. manifest_path,
71+ iceberg. scan. plan. manifest. entries_count = Empty ,
72+ ) ;
73+
74+ let span = manifest_span. clone ( ) ;
75+ let _guard = manifest_span. enter ( ) ;
76+
6477 let ManifestFileContext {
6578 object_cache,
6679 manifest_file,
@@ -73,15 +86,30 @@ impl ManifestFileContext {
7386
7487 let manifest = object_cache. get_manifest ( & manifest_file) . await ?;
7588
89+ span. record (
90+ "iceberg.scan.plan.manifest.entries_count" ,
91+ manifest. entries ( ) . len ( ) ,
92+ ) ;
93+
7694 Ok ( async_stream:: stream! {
7795 for manifest_entry in manifest. entries( ) {
96+ let manifest_entry_span = tracing:: debug_span!(
97+ parent: span. clone( ) ,
98+ "iceberg.scan.plan.process_data_file" ,
99+ iceberg. scam. plan. data_file. file_path = manifest_entry. file_path( ) ,
100+ "iceberg.scan.plan_data_file.type" = Empty ,
101+ iceberg. scan. plan. data_file. skipped = Empty ,
102+ iceberg. scan. plan. data_file. skipped_reason = Empty ,
103+ ) ;
104+
78105 yield Ok ( ManifestEntryContext {
79106 manifest_entry: manifest_entry. clone( ) ,
80107 expression_evaluator_cache: expression_evaluator_cache. clone( ) ,
81108 field_ids: field_ids. clone( ) ,
82109 partition_spec_id: manifest_file. partition_spec_id,
83110 bound_predicates: bound_predicates. clone( ) ,
84111 snapshot_schema: snapshot_schema. clone( ) ,
112+ span: manifest_entry_span,
85113 } ) ;
86114 }
87115 }
@@ -144,7 +172,11 @@ pub(crate) struct PlanContext {
144172}
145173
146174impl PlanContext {
147- #[ tracing:: instrument( skip_all) ]
175+ #[ tracing:: instrument(
176+ skip_all,
177+ level = "debug" ,
178+ fields( iceberg. scan. plan. manifest_list. file_path = ?self . snapshot. manifest_list( ) ) ,
179+ ) ]
148180 pub ( crate ) async fn get_manifest_list ( & self ) -> Result < Arc < ManifestList > > {
149181 self . object_cache
150182 . as_ref ( )
@@ -175,14 +207,19 @@ impl PlanContext {
175207
176208 #[ tracing:: instrument(
177209 skip_all,
210+ level = "debug" ,
211+ name = "iceberg.scan.plan.process_manifest_list" ,
178212 fields(
179- manifest_list. len = manifest_list. entries( ) . len( ) ,
213+ iceberg . scan . plan . manifest_list. entries_count = manifest_list. entries( ) . len( ) ,
180214 )
181215 ) ]
182- pub ( crate ) fn build_manifest_file_context_iter (
216+ pub ( crate ) fn build_manifest_file_contexts (
183217 & self ,
184218 manifest_list : Arc < ManifestList > ,
185- ) -> impl Iterator < Item = Result < ManifestFileContext > > {
219+ ) -> (
220+ Vec < Result < ManifestFileContext > > ,
221+ Vec < Result < ManifestFileContext > > ,
222+ ) {
186223 let has_predicate = self . predicate . is_some ( ) ;
187224
188225 ( 0 ..manifest_list. entries ( ) . len ( ) )
@@ -198,24 +235,28 @@ impl PlanContext {
198235 . get ( manifest_file. partition_spec_id , predicate. clone ( ) )
199236 . eval ( & manifest_file) ?
200237 {
201- tracing:: trace!( file_path = manifest_file. manifest_path, "iceberg.scan.manifest_file.skipped" ) ;
202- metrics:: counter!( "iceberg.scan.manifest_file.skipped" , "reason" => "partition" ) . increment ( 1 ) ;
238+ tracing:: debug!(
239+ iceberg. scan. plan. manifest. file_path = manifest_file. manifest_path,
240+ iceberg. scan. plan. manifest. skip_reason = "partition" ,
241+ "iceberg.scan.plan.manifest_file.skipped"
242+ ) ;
243+ metrics:: counter!( "iceberg.scan.plan.manifest_file.skipped" , "reason" => "partition" ) . increment ( 1 ) ;
203244 return Ok ( None ) ; // Skip this file.
204245 }
205246 Some ( predicate)
206247 } else {
207248 None
208249 } ;
209250
210- tracing:: trace!( file_path = manifest_file. manifest_path, "iceberg.scan.manifest_file.included" ) ;
211- metrics:: counter!( "iceberg.scan.manifest_file.included" ) . increment ( 1 ) ;
251+ metrics:: counter!( "iceberg.scan.plan.manifest_file.included" ) . increment ( 1 ) ;
212252
213253 let context = self
214254 . create_manifest_file_context ( manifest_file, partition_bound_predicate) ?;
215255 Ok ( Some ( context) )
216256 } ) ( )
217257 . transpose ( )
218258 } )
259+ . partition ( |ctx| ctx. as_ref ( ) . map_or ( true , |ctx| ctx. is_delete ( ) ) )
219260 }
220261
221262 fn create_manifest_file_context (
0 commit comments