@@ -285,9 +285,15 @@ def _gen_subtask_info(
285285 )
286286 for i , fetch_chunk in zip (build_fetch_index_to_chunks , fetch_chunks ):
287287 inp_chunks [i ] = fetch_chunk
288- copied_op = chunk .op .copy ()
289- copied_op ._key = chunk .op .key
290288 for out_chunk in chunk .op .outputs :
289+ # Note: `dtypes`, `index_value`, and `columns_value` are lazily
290+ # initialized, so we should initialize them here.
291+ if hasattr (out_chunk , "dtypes" ):
292+ out_chunk .dtypes
293+ if hasattr (out_chunk , "index_value" ):
294+ out_chunk .index_value
295+ if hasattr (out_chunk , "columns_value" ):
296+ out_chunk .columns_value
291297 processed .add (out_chunk )
292298 chunk_graph .add_node (out_chunk )
293299 if out_chunk in self ._final_result_chunks_set :
@@ -381,10 +387,9 @@ def _gen_map_reduce_info(
381387 # record analyzer map reduce id for mapper op
382388 # copied chunk exists because map chunk must have
383389 # been processed before shuffle proxy
384- copied_map_chunk = self ._chunk_to_copied [map_chunk ]
385- if not hasattr (copied_map_chunk , "extra_params" ): # pragma: no cover
386- copied_map_chunk .extra_params = dict ()
387- copied_map_chunk .extra_params ["analyzer_map_reduce_id" ] = map_reduce_id
390+ if not hasattr (map_chunk , "extra_params" ): # pragma: no cover
391+ map_chunk .extra_params = dict ()
392+ map_chunk .extra_params ["analyzer_map_reduce_id" ] = map_reduce_id
388393 reducer_bands = [assign_results [r .outputs [0 ]] for r in reducer_ops ]
389394 map_reduce_info = MapReduceInfo (
390395 map_reduce_id = map_reduce_id ,
0 commit comments