66import html
77import json
88from dataclasses import dataclass
9- from typing import TYPE_CHECKING , Any
9+ from typing import TYPE_CHECKING , Any , cast
1010
1111from buildbot .interfaces import WorkerSetupError
1212from buildbot .plugins import steps , util
1818from .build_trigger import BuildTrigger , JobsConfig , TriggerConfig
1919from .errors import BuildbotNixError
2020from .models import (
21+ CacheStatus ,
2122 NixEvalJob ,
2223 NixEvalJobError ,
2324 NixEvalJobModel ,
2829from .repo_config import BranchConfig
2930
3031if TYPE_CHECKING :
32+ from collections .abc import AsyncGenerator , Sequence
33+
3134 from buildbot .config .builder import BuilderConfig
3235 from buildbot .locks import MasterLock
3336 from buildbot .process .log import StreamLog
@@ -172,6 +175,49 @@ async def run(self) -> int:
172175
173176 branch_config : BranchConfig = await BranchConfig .extract_during_step (self )
174177
178+ # Check if this is a rebuild and try to reuse evaluation from original build
179+ if not self .build or not self .build .requests :
180+ log .info ("No build requests available, skipping rebuild check" )
181+ else :
182+ buildset_id = self .build .requests [0 ].bsid
183+ if buildset_id is not None :
184+ buildset = await self .master .data .get (("buildsets" , str (buildset_id )))
185+ if (
186+ buildset
187+ and (rebuilt_buildid := buildset .get ("rebuilt_buildid" )) is not None
188+ ):
189+ # This is a rebuild - try to reuse evaluation from original build
190+ jobs = await self ._reconstruct_jobs_from_rebuild (rebuilt_buildid )
191+ if jobs is not None :
192+ # Successfully reconstructed jobs, process them
193+ await self ._process_jobs_and_trigger_builds (jobs , branch_config )
194+ result = util .SUCCESS
195+ if self .build :
196+ await CombinedBuildEvent .produce_event_for_build (
197+ self .master ,
198+ CombinedBuildEvent .FINISHED_NIX_EVAL ,
199+ self .build ,
200+ result ,
201+ warnings_count = self .warnings_count ,
202+ )
203+ return result
204+
205+ # Either not a rebuild or reconstruction failed - run full evaluation
206+ result = await self ._run_nix_eval_jobs (branch_config )
207+
208+ if self .build :
209+ await CombinedBuildEvent .produce_event_for_build (
210+ self .master ,
211+ CombinedBuildEvent .FINISHED_NIX_EVAL ,
212+ self .build ,
213+ result ,
214+ warnings_count = self .warnings_count ,
215+ )
216+
217+ return result
218+
219+ async def _run_nix_eval_jobs (self , branch_config : BranchConfig ) -> int :
220+ """Run nix-eval-jobs and process the results."""
175221 # run nix-eval-jobs --flake .#checks to generate the dict of stages
176222 # !! Careful, the command attribute has to be specified here as the call
177223 # !! to `makeRemoteShellCommand` inside `BranchConfig.extract_during_step`
@@ -214,16 +260,8 @@ async def run(self) -> int:
214260 # Process warnings if any
215261 result = await self ._process_warnings (result , branch_config = branch_config )
216262
217- if self .build :
218- await CombinedBuildEvent .produce_event_for_build (
219- self .master ,
220- CombinedBuildEvent .FINISHED_NIX_EVAL ,
221- self .build ,
222- result ,
223- warnings_count = self .warnings_count ,
224- )
225263 if result in (util .SUCCESS , util .WARNINGS ):
226- # create a ShellCommand for each stage and add them to the build
264+ # Parse the nix-eval-jobs output
227265 jobs : list [NixEvalJob ] = []
228266
229267 for line in self .observer .getStdout ().split ("\n " ):
@@ -235,48 +273,184 @@ async def run(self) -> int:
235273 raise BuildbotNixError (msg ) from e
236274 jobs .append (NixEvalJobModel .validate_python (job ))
237275
238- failed_jobs : list [ NixEvalJobError ] = []
239- successful_jobs : list [ NixEvalJobSuccess ] = []
276+ # Process jobs and trigger builds
277+ await self . _process_jobs_and_trigger_builds ( jobs , branch_config )
240278
241- for job in jobs :
242- # report unbuildable jobs
243- if isinstance (job , NixEvalJobError ):
244- failed_jobs .append (job )
245- elif (
246- job .system in self .nix_eval_config .supported_systems
247- and isinstance (job , NixEvalJobSuccess )
248- ):
249- successful_jobs .append (job )
279+ return result
250280
251- self .number_of_jobs = len (successful_jobs )
281+ async def _process_jobs_and_trigger_builds (
282+ self , jobs : list [NixEvalJob ], branch_config : BranchConfig
283+ ) -> None :
284+ """Process jobs and trigger builds. Used by both normal eval and rebuild paths."""
285+ failed_jobs : list [NixEvalJobError ] = []
286+ successful_jobs : list [NixEvalJobSuccess ] = []
252287
253- if self .build :
254- trigger_config = TriggerConfig (
255- builds_scheduler = f"{ self .project .project_id } -nix-build" ,
256- failed_eval_scheduler = f"{ self .project .project_id } -nix-failed-eval" ,
257- dependency_failed_scheduler = f"{ self .project .project_id } -nix-dependency-failed" ,
258- cached_failure_scheduler = f"{ self .project .project_id } -nix-cached-failure" ,
259- )
288+ for job in jobs :
289+ # report unbuildable jobs
290+ if isinstance (job , NixEvalJobError ):
291+ failed_jobs .append (job )
292+ elif job .system in self .nix_eval_config .supported_systems and isinstance (
293+ job , NixEvalJobSuccess
294+ ):
295+ successful_jobs .append (job )
260296
261- jobs_config = JobsConfig (
262- successful_jobs = successful_jobs ,
263- failed_jobs = failed_jobs ,
264- cache_failed_builds = self .nix_eval_config .cache_failed_builds ,
265- failed_build_report_limit = self .nix_eval_config .failed_build_report_limit ,
266- )
267- self .build .addStepsAfterCurrentStep (
268- [
269- BuildTrigger (
270- project = self .project ,
271- trigger_config = trigger_config ,
272- jobs_config = jobs_config ,
273- nix_attr_prefix = branch_config .attribute ,
274- name = "build flake" ,
275- ),
276- ]
297+ self .number_of_jobs = len (successful_jobs )
298+
299+ if self .build :
300+ trigger_config = TriggerConfig (
301+ builds_scheduler = f"{ self .project .project_id } -nix-build" ,
302+ failed_eval_scheduler = f"{ self .project .project_id } -nix-failed-eval" ,
303+ dependency_failed_scheduler = f"{ self .project .project_id } -nix-dependency-failed" ,
304+ cached_failure_scheduler = f"{ self .project .project_id } -nix-cached-failure" ,
305+ )
306+
307+ jobs_config = JobsConfig (
308+ successful_jobs = successful_jobs ,
309+ failed_jobs = failed_jobs ,
310+ cache_failed_builds = self .nix_eval_config .cache_failed_builds ,
311+ failed_build_report_limit = self .nix_eval_config .failed_build_report_limit ,
312+ )
313+ self .build .addStepsAfterCurrentStep (
314+ [
315+ BuildTrigger (
316+ project = self .project ,
317+ trigger_config = trigger_config ,
318+ jobs_config = jobs_config ,
319+ nix_attr_prefix = branch_config .attribute ,
320+ name = "build flake" ,
321+ ),
322+ ]
323+ )
324+
325+ async def _check_store_paths_batch (
326+ self , paths : list [str ], batch_size : int = 1000
327+ ) -> AsyncGenerator [bool , None ]:
328+ """Check validity of store paths in batches. Yields validity status for each path."""
329+ for i in range (0 , len (paths ), batch_size ):
330+ batch_paths = paths [i : i + batch_size ]
331+ cmd = await self .makeRemoteShellCommand (
332+ command = ["nix-store" , "--check-validity" , * batch_paths ],
333+ collectStdout = True ,
334+ collectStderr = False ,
335+ )
336+ await self .runCommand (cmd )
337+
338+ if cmd .results () == util .SUCCESS :
339+ # All paths in batch are valid
340+ for _ in batch_paths :
341+ yield True
342+ else :
343+ # Check individually to find which are valid
344+ for path in batch_paths :
345+ cmd = await self .makeRemoteShellCommand (
346+ command = ["nix-store" , "--check-validity" , path ],
347+ collectStdout = False ,
348+ collectStderr = False ,
349+ )
350+ await self .runCommand (cmd )
351+ yield cmd .results () == util .SUCCESS
352+
353+ async def _reconstruct_job_from_build (
354+ self , build_id : int , original_build_id : int
355+ ) -> tuple [NixEvalJobSuccess , str ] | None :
356+ """Validate and reconstruct a NixEvalJob from build properties.
357+
358+ Returns tuple of (job, out_path) or None if validation fails.
359+ """
360+ props = await self .master .db .builds .getBuildProperties (build_id )
361+ required_props = ["attr" , "drv_path" , "out_path" , "system" ]
362+
363+ # Validate required properties
364+ for prop in required_props :
365+ if prop not in props or props [prop ][0 ] is None :
366+ log .info (
367+ f"Cannot reconstruct job from build { original_build_id } : missing required property '{ prop } '"
277368 )
369+ return None
370+
371+ # Extract properties
372+ attr = props ["attr" ][0 ]
373+ drv_path = props ["drv_path" ][0 ]
374+ out_path = props ["out_path" ][0 ]
375+ system = props ["system" ][0 ]
376+
377+ job = NixEvalJobSuccess (
378+ attr = attr ,
379+ attrPath = attr .split ("." ),
380+ drvPath = drv_path ,
381+ outputs = {"out" : out_path },
382+ system = system ,
383+ name = attr ,
384+ cacheStatus = CacheStatus .notBuilt ,
385+ neededBuilds = [],
386+ neededSubstitutes = [],
387+ )
278388
279- return result
389+ return job , out_path
390+
391+ async def _reconstruct_jobs_from_rebuild (
392+ self , original_build_id : int
393+ ) -> list [NixEvalJob ] | None :
394+ """Reconstruct job list from the original build's triggered builds."""
395+ # Get all builds triggered by the original eval
396+ triggered_builds = await self .master .db .builds .get_triggered_builds (
397+ original_build_id
398+ )
399+
400+ if not triggered_builds :
401+ return None
402+
403+ # Reconstruct all jobs
404+ jobs = []
405+ outputs_to_check = []
406+
407+ for build in triggered_builds :
408+ result = await self ._reconstruct_job_from_build (build .id , original_build_id )
409+ if result is None :
410+ # Missing required properties, can't reconstruct
411+ return None
412+
413+ job , out_path = result
414+ jobs .append (job )
415+
416+ # Collect outputs that need checking
417+ if build .results == util .SUCCESS and out_path :
418+ outputs_to_check .append ((job , out_path ))
419+
420+ # Batch check which outputs still exist
421+ if outputs_to_check :
422+ output_paths = [path for _ , path in outputs_to_check ]
423+
424+ # Process validity results as they come from the generator
425+ validity_iter = self ._check_store_paths_batch (output_paths )
426+ i = 0
427+ async for is_valid in validity_iter :
428+ if is_valid :
429+ outputs_to_check [i ][0 ].cacheStatus = CacheStatus .local
430+ i += 1
431+
432+ # Verify derivations exist for jobs that need rebuilding
433+ jobs_to_rebuild = [job for job in jobs if job .cacheStatus != CacheStatus .local ]
434+
435+ if jobs_to_rebuild and not await self ._verify_derivations_exist (
436+ jobs_to_rebuild
437+ ):
438+ return None
439+
440+ self .descriptionDone = [f"reused eval from build { original_build_id } " ]
441+ return cast ("list[NixEvalJobError | NixEvalJobSuccess]" , jobs )
442+
443+ async def _verify_derivations_exist (
444+ self , jobs : Sequence [NixEvalJobError | NixEvalJobSuccess ]
445+ ) -> bool :
446+ """Verify all derivations exist for the given jobs."""
447+ drv_paths = [job .drvPath for job in jobs if isinstance (job , NixEvalJobSuccess )]
448+
449+ # Check all derivations - if any is invalid, return False
450+ async for is_valid in self ._check_store_paths_batch (drv_paths ):
451+ if not is_valid :
452+ return False
453+ return True
280454
281455 async def _process_warnings (self , result : int , branch_config : BranchConfig ) -> int :
282456 """Process stderr output for warnings and update build status."""
0 commit comments