@@ -270,6 +270,7 @@ def load_partition(
270270 partition : Partition ,
271271 items : Iterable [Dict [str , Any ]],
272272 insert_mode : Optional [Methods ] = Methods .insert ,
273+ partition_update_enabled : Optional [bool ] = True ,
273274 ) -> None :
274275 """Load items data for a single partition."""
275276 conn = self .db .connect ()
@@ -441,15 +442,20 @@ def load_partition(
441442 "Available modes are insert, ignore, upsert, and delsert."
442443 f"You entered { insert_mode } ." ,
443444 )
444- logger .debug ("Updating Partition Stats" )
445- cur .execute ("SELECT update_partition_stats_q(%s);" ,(partition .name ,))
446- logger .debug (cur .statusmessage )
447- logger .debug (f"Rows affected: { cur .rowcount } " )
445+ if partition_update_enabled :
446+ logger .debug ("Updating Partition Stats" )
447+ cur .execute ("SELECT update_partition_stats_q(%s);" ,(partition .name ,))
448+ logger .debug (cur .statusmessage )
449+ logger .debug (f"Rows affected: { cur .rowcount } " )
448450 logger .debug (
449451 f"Copying data for { partition } took { time .perf_counter () - t } seconds" ,
450452 )
451453
452- def _partition_update (self , item : Dict [str , Any ]) -> str :
454+ def _partition_update (
455+ self ,
456+ item : Dict [str , Any ],
457+ update_enabled : Optional [bool ] = True ,
458+ ) -> str :
453459 """Update the cached partition with the item information and return the name.
454460
455461 This method will mark the partition as dirty if the bounds of the partition
@@ -515,20 +521,24 @@ def _partition_update(self, item: Dict[str, Any]) -> str:
515521 partition = self ._partition_cache [partition_name ]
516522
517523 if partition :
518- # Only update the partition if the item is outside the current bounds
519- if item ["datetime" ] < partition .datetime_range_min :
520- partition .datetime_range_min = item ["datetime" ]
521- partition .requires_update = True
522- if item ["datetime" ] > partition .datetime_range_max :
523- partition .datetime_range_max = item ["datetime" ]
524- partition .requires_update = True
525- if item ["end_datetime" ] < partition .end_datetime_range_min :
526- partition .end_datetime_range_min = item ["end_datetime" ]
527- partition .requires_update = True
528- if item ["end_datetime" ] > partition .end_datetime_range_max :
529- partition .end_datetime_range_max = item ["end_datetime" ]
530- partition .requires_update = True
524+ if update_enabled :
525+ # Only update the partition if the item is outside the current bounds
526+ if item ["datetime" ] < partition .datetime_range_min :
527+ partition .datetime_range_min = item ["datetime" ]
528+ partition .requires_update = True
529+ if item ["datetime" ] > partition .datetime_range_max :
530+ partition .datetime_range_max = item ["datetime" ]
531+ partition .requires_update = True
532+ if item ["end_datetime" ] < partition .end_datetime_range_min :
533+ partition .end_datetime_range_min = item ["end_datetime" ]
534+ partition .requires_update = True
535+ if item ["end_datetime" ] > partition .end_datetime_range_max :
536+ partition .end_datetime_range_max = item ["end_datetime" ]
537+ partition .requires_update = True
531538 else :
539+ if not update_enabled :
540+ raise Exception (f"Partition { partition_name } does not exist." )
541+
532542 # No partition exists yet; create a new one from item
533543 partition = Partition (
534544 name = partition_name ,
@@ -544,7 +554,11 @@ def _partition_update(self, item: Dict[str, Any]) -> str:
544554
545555 return partition_name
546556
547- def read_dehydrated (self , file : Union [Path , str ] = "stdin" ) -> Generator :
557+ def read_dehydrated (
558+ self ,
559+ file : Union [Path , str ] = "stdin" ,
560+ partition_update_enabled : Optional [bool ] = True ,
561+ ) -> Generator :
548562 if file is None :
549563 file = "stdin"
550564 if isinstance (file , str ):
@@ -575,15 +589,21 @@ def read_dehydrated(self, file: Union[Path, str] = "stdin") -> Generator:
575589 item [field ] = content_value
576590 else :
577591 item [field ] = tab_split [i ]
578- item ["partition" ] = self ._partition_update (item )
592+ item ["partition" ] = self ._partition_update (
593+ item ,
594+ partition_update_enabled ,
595+ )
579596 yield item
580597
581598 def read_hydrated (
582- self , file : Union [Path , str , Iterator [Any ]] = "stdin" ,
599+ self ,
600+ file : Union [Path , str ,
601+ Iterator [Any ]] = "stdin" ,
602+ partition_update_enabled : Optional [bool ] = True ,
583603 ) -> Generator :
584604 for line in read_json (file ):
585605 item = self .format_item (line )
586- item ["partition" ] = self ._partition_update (item )
606+ item ["partition" ] = self ._partition_update (item , partition_update_enabled )
587607 yield item
588608
589609 def load_items (
@@ -592,6 +612,7 @@ def load_items(
592612 insert_mode : Optional [Methods ] = Methods .insert ,
593613 dehydrated : Optional [bool ] = False ,
594614 chunksize : Optional [int ] = 10000 ,
615+ partition_update_enabled : Optional [bool ] = True ,
595616 ) -> None :
596617 """Load items json records."""
597618 self .check_version ()
@@ -602,15 +623,17 @@ def load_items(
602623 self ._partition_cache = {}
603624
604625 if dehydrated and isinstance (file , str ):
605- items = self .read_dehydrated (file )
626+ items = self .read_dehydrated (file , partition_update_enabled )
606627 else :
607- items = self .read_hydrated (file )
628+ items = self .read_hydrated (file , partition_update_enabled )
608629
609630 for chunkin in chunked_iterable (items , chunksize ):
610631 chunk = list (chunkin )
611632 chunk .sort (key = lambda x : x ["partition" ])
612633 for k , g in itertools .groupby (chunk , lambda x : x ["partition" ]):
613- self .load_partition (self ._partition_cache [k ], g , insert_mode )
634+ self .load_partition (
635+ self ._partition_cache [k ], g , insert_mode , partition_update_enabled ,
636+ )
614637
615638 logger .debug (f"Adding data to database took { time .perf_counter () - t } seconds." )
616639
0 commit comments