@@ -82,26 +82,45 @@ def do_batch_get(batch_keys):
8282 sleepy_time = 1 # Start with 1 second of sleep, then exponentially increase.
8383 retrieved = {key : [] for key in batch_keys }
8484 while tries < max_tries :
85- response = dynamodb .batch_get_item (RequestItems = batch_keys )
86- # Collect any retrieved items and retry unprocessed keys.
87- for key in response .get ("Responses" , []):
88- retrieved [key ] += response ["Responses" ][key ]
89- unprocessed = response ["UnprocessedKeys" ]
90- if len (unprocessed ) > 0 :
91- batch_keys = unprocessed
92- unprocessed_count = sum (
93- [len (batch_key ["Keys" ]) for batch_key in batch_keys .values ()]
94- )
95- logger .info (
96- "%s unprocessed keys returned. Sleep, then retry." , unprocessed_count
97- )
98- tries += 1
99- if tries < max_tries :
100- logger .info ("Sleeping for %s seconds." , sleepy_time )
101- time .sleep (sleepy_time )
102- sleepy_time = min (sleepy_time * 2 , 32 )
103- else :
104- break
85+ try :
86+ response = dynamodb .batch_get_item (RequestItems = batch_keys )
87+ # Collect any retrieved items and retry unprocessed keys.
88+ for key in response .get ("Responses" , []):
89+ retrieved [key ] += response ["Responses" ][key ]
90+ unprocessed = response ["UnprocessedKeys" ]
91+ if len (unprocessed ) > 0 :
92+ batch_keys = unprocessed
93+ unprocessed_count = sum (
94+ [len (batch_key ["Keys" ]) for batch_key in batch_keys .values ()]
95+ )
96+ logger .info (
97+ "%s unprocessed keys returned. Sleep, then retry." , unprocessed_count
98+ )
99+ tries += 1
100+ if tries < max_tries :
101+ logger .info ("Sleeping for %s seconds." , sleepy_time )
102+ time .sleep (sleepy_time )
103+ sleepy_time = min (sleepy_time * 2 , 32 )
104+ else :
105+ break
106+ except ClientError as error :
107+ if error .response ["Error" ]["Code" ] in ["ProvisionedThroughputExceeded" , "ThrottlingException" ,"GsiProvisionedThroughputExceeded" , "RequestLimitExceeded" ] ughputExceeded "]:
108+ # Check for new throttlingReasons field
109+ if "throttlingReason" in error .response :
110+ for reason in error .response ["throttlingReason" ]:
111+ logger .warning (
112+ "Throttling detected - Reason: %s, Resource: %s" ,reason .get ("reason" ),reason .get ("resource" )
113+ )
114+ else :
115+ # Fallback to previous message
116+ logger .warning ("Throttling detected: %s" , error .response ["Error" ]["Message" ])
117+ tries += 1
118+ if tries < max_tries :
119+ logger .info ("Sleeping for %s seconds." , sleepy_time )
120+ time .sleep (sleepy_time )
121+ sleepy_time = min (sleepy_time * 2 , 32 )
122+ else :
123+ raise
105124
106125 return retrieved
107126
@@ -129,7 +148,9 @@ def fill_table(table, table_data):
129148 for item in table_data :
130149 writer .put_item (Item = item )
131150 logger .info ("Loaded data into table %s." , table .name )
132- except ClientError :
151+ except ClientError as error :
152+ if "throttlingReason" in error .response :
153+ logger .error ("Batch write throttled with reasons: %s" , error .response ["throttlingReason" ])
133154 logger .exception ("Couldn't load data into table %s." , table .name )
134155 raise
135156
@@ -215,6 +236,10 @@ def archive_movies(movie_table, movie_data):
215236 "Got expected exception when trying to put duplicate records into the "
216237 "archive table."
217238 )
239+ elif error .response ["Error" ]["Code" ] in ["ProvisionedThroughputExceededException" , "ThrottlingException" ]:
240+ if "throttlingReasons" in error .response :
241+ logger .error ("Archive operation throttled: %s" , error .response ["throttlingReasons" ])
242+ raise
218243 else :
219244 logger .exception (
220245 "Got unexpected exception when trying to put duplicate records into "
0 commit comments