Skip to content

Experiments

Experiment dataclass

An experiment that consists in running an oracular program on a set of different hyperparameter combinations.

This class allows defining and running experiments. It supports the use of multiple workers, and allows interrupting and resuming experiments (the persistent experiment state is stored in a file on disk). Failed configurations can be selectively retried. By activating caching, a successful experiment can be replicated (or some of its configurations replayed with a debugger) without issuing calls to LLMs or to tools with non-replicable outputs.

Class Type Parameters:

Name Bound or Constraints Description Default
C ExperimentConfig

Type parameter for the configuration type, which is a dataclass that holds all experiment hyperparameters.

required

Attributes:

Name Type Description
config_class type[C]

The associated configuration class, which defines the hyperparameters of the experiment and how to map them to arguments of the run_strategy command.

context ExecutionContext

Command execution context, which contains the kind of information usually provided in the delphyne.yaml file (experiments do not recognize such files). Note that the cache_root argument should not be set, since it is disregarded and overriden by the Experiment class.

output_dir Path | str

The directory where all experiment data is stored (persistent state, results, logs, caches...), either as an absolute path or relative to the workspace root specified in context. The directory is created if it does not alredy exist.

configs Sequence[C] | None

A sequence of configurations to run. If None is provided and the experiment already has a persistent state stored on disk, the list of configurations is loaded from there upon loading.

configs_context object | None

A global context parameter to be passed to all configurations' instantiation method. This value must be picklable since it is sent to remote worker processes.

name str | None

Experiment name, which is stored in the persistent state file when provided and is otherwise not used.

description str | None

Experiment description, which is stored in the persistent state file when provided and is otherwise not used.

config_naming Callable[[C, UUID], str] | None

A function for attributing string identifiers to configurations, which maps a configuration along with a fresh UUID to a name. By default, the UUID alone is used.

cache_requests bool

Whether or not to enable caching of LLM requests and expensive computations (see Compute). When this is done, the experiment can be reliably replicated, without issuing LLM calls.

workers_setup WorkersSetup[Any] | None

If provided, specifies the setup work to be performed on all processes (see WorkersSetup).

log_level LogLevel | None

Minimum log level to record. Messages with a lower level will be ignored. (Override the corresponding RunStrategyArgs setting if provided.)

export_raw_trace bool | None

Whether to export the raw trace for all configuration runs. (Override the corresponding RunStrategyArgs setting if provided.)

export_log bool | None

Whether to export the log messages for all configuration runs. (Override the corresponding RunStrategyArgs setting if provided.)

export_browsable_trace bool | None

Whether to export a browsable trace for all configuration runs, which can be visualized in the VSCode extension (see delphyne.analysis.feedback.Trace). Note that such traces can be large and can be generated after the fact using the Delphyne CLI. (Override the corresponding RunStrategyArgs setting if provided.)

verbose_snapshots bool | None

If True, when a snapshot is requested, all result information (raw trace, log, browsable trace) is dumped, regardless of other settings.

Tips
  • New hyperparameters can be added to the C type without invalidating an existing experiment's persistent state, by providing default values for them.
Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
@dataclass(kw_only=True)
class Experiment[C: ExperimentConfig]:
    """
    An experiment that consists in running an oracular program on a set of
    different hyperparameter combinations.

    This class allows defining and running experiments. It supports the
    use of multiple workers, and allows interrupting and resuming
    experiments (the persistent experiment state is stored in a file on
    disk). Failed configurations can be selectively retried. By
    activating caching, a successful experiment can be replicated (or
    some of its configurations replayed with a debugger) without issuing
    calls to LLMs or to tools with non-replicable outputs.

    Type Parameters:
        C: Type parameter for the configuration type, which is a
            dataclass that holds all experiment hyperparameters.

    Attributes:
        config_class: The associated configuration class, which defines
            the hyperparameters of the experiment and how to map them to
            arguments of the `run_strategy` command.
        context: Command execution context, which contains the kind of
            information usually provided in the `delphyne.yaml` file
            (experiments do not recognize such files). Note that the
            `cache_root` argument should not be set, since it is
            disregarded and overriden by the `Experiment` class.
        output_dir: The directory where all experiment data is stored
            (persistent state, results, logs, caches...), either as an
            absolute path or relative to the workspace root specified in
            `context`. The directory is created if it does not alredy
            exist.
        configs: A sequence of configurations to run. If `None` is
            provided and the experiment already has a persistent state
            stored on disk, the list of configurations is loaded from
            there upon loading.
        configs_context: A global context parameter to be passed to all
            configurations' instantiation method. This value must be
            picklable since it is sent to remote worker processes.
        name: Experiment name, which is stored in the persistent state
            file when provided and is otherwise not used.
        description: Experiment description, which is stored in the
            persistent state file when provided and is otherwise not used.
        config_naming: A function for attributing string identifiers to
            configurations, which maps a configuration along with a
            fresh UUID to a name. By default, the UUID alone is used.
        cache_requests: Whether or not to enable caching of LLM requests
            and expensive computations (see `Compute`). When this is
            done, the experiment can be reliably replicated, without
            issuing LLM calls.
        workers_setup: If provided, specifies the setup work to be
            performed on all processes (see `WorkersSetup`).
        log_level: Minimum log level to record. Messages with a lower
            level will be ignored. (Override the corresponding
            `RunStrategyArgs` setting if provided.)
        export_raw_trace: Whether to export the raw trace for all
            configuration runs. (Override the corresponding
            `RunStrategyArgs` setting if provided.)
        export_log: Whether to export the log messages for all
            configuration runs. (Override the corresponding
            `RunStrategyArgs` setting if provided.)
        export_browsable_trace: Whether to export a browsable trace for
            all configuration runs, which can be visualized in the VSCode
            extension (see `delphyne.analysis.feedback.Trace`). Note that
            such traces can be large and can be generated after the fact
            using the Delphyne CLI. (Override the corresponding
            `RunStrategyArgs` setting if provided.)
        verbose_snapshots: If `True`, when a snapshot is requested, all
            result information (raw trace, log, browsable trace) is
            dumped, regardless of other settings.

    ## Tips

    - New hyperparameters can be added to the `C` type without
      invalidating an existing experiment's persistent state, by
      providing default values for them.
    """

    config_class: type[C]
    context: ExecutionContext
    output_dir: Path | str  # relative path expected
    configs: Sequence[C] | None = None
    configs_context: object | None = None
    name: str | None = None
    description: str | None = None
    config_naming: Callable[[C, uuid.UUID], str] | None = None
    cache_requests: bool = True
    workers_setup: WorkersSetup[Any] | None = None
    log_level: dp.LogLevel | None = None
    export_raw_trace: bool | None = None
    export_log: bool | None = None
    export_browsable_trace: bool | None = None
    verbose_snapshots: bool | None = None

    def __post_init__(self):
        # We override the cache root directory.
        self.context = replace(
            self.context, cache_root=self.absolute_output_dir
        )

    @property
    def absolute_output_dir(self) -> Path:
        """
        Get the absolute output directory, by combining the
        `context.root` and `output_dir` paths.
        """
        if self.context.workspace_root is None:
            raise ValueError("No workspace root is specified.")
        return self.context.workspace_root / self.output_dir

    def load(self) -> Self:
        """
        Load the experiment.

        If no persistent state exists on disk, it is created (with all
        configurations marked with "todo" status). If some experiment
        state exists on disk, it is loaded. If more configurations are
        specified in `self.configs` than are specified on disk, the
        missing configurations are added to the persistent state and
        marked with "todo". If the persistent state contains
        configurations that are not specified in `self.configs`, a
        warning is shown. Use the `clean_index` method to remove these
        configurations from the persistent state.

        Return `self`, so as to allow chaining.
        """
        if not self._dir_exists():
            # If we create the experiment for the first time
            output_dir = self.absolute_output_dir
            print(f"Creating experiment directory: {output_dir}.")
            output_dir.mkdir(parents=True, exist_ok=True)
            state = ExperimentState[C](self.name, self.description, {})
            self._save_state(state)
        if self.configs is not None:
            self._add_configs_if_needed(self.configs)
            # Print a warning if the state on disk features additional configs.
            state = self._load_state()
            assert state is not None
            assert len(self.configs) <= len(state.configs)
            if len(self.configs) < len(state.configs):
                print(
                    f"Warning: {len(state.configs) - len(self.configs)} "
                    "additional configuration(s) found in the state."
                )
        return self

    def is_done(self) -> bool:
        """
        Check if the experiment is done, i.e., all configurations are
        marked as "done".
        """
        state = self._load_state()
        assert state is not None
        return all(info.status == "done" for info in state.configs.values())

    def clean_index(self) -> None:
        """
        Remove from the persistent state file all configurations that
        are not mentioned in `self.configs`.
        """
        state = self._load_state()
        assert state is not None
        assert self.configs is not None
        in_config = set(_config_unique_repr(c) for c in self.configs)
        to_delete = [
            c
            for c, i in state.configs.items()
            if _config_unique_repr(i.params) not in in_config
        ]
        print(f"Removing {len(to_delete)} configuration(s) from the state.")
        for c in to_delete:
            del state.configs[c]
        self._save_state(state)

    def mark_errors_as_todos(self):
        """
        Update the persistent state to mark all configurations with
        status "failed" as "todo". They will be retried when the
        `resume` method is called.
        """
        state = self._load_state()
        assert state is not None
        for _, info in state.configs.items():
            if info.status == "failed":
                info.status = "todo"
        self._save_state(state)

    def resume(
        self,
        max_workers: int = 1,
        log_progress: bool = True,
        interactive: bool = False,
    ):
        """
        Resume the experiment, running all configurations with state
        "todo". Every configuration run results in marking the
        configuration's state with either "failed" (in case an uncaught
        exception was raised) or "done".

        The whole process can be interrupted using Ctrl-C, in which case
        the persistent experiment state is stored on disk, a message is
        printed saying so, and Ctrl-C can be hit again until all workers
        are successfully terminated.

        A summary file is produced at the end of the experiment using
        the `summary_file` method if all configurations were run
        successfully.

        Attributes:
            max_workers: Number of parallel process workers to use.
            log_progress: Whether to show a progress bar in the console.
            interactive: If `True`, pressing `Enter` at any point during
                execution prints the current status of all workers and
                dumps a snapshot of ongoing tasks on disk. This is
                useful to investigate seemingly stuck tasks.
        """
        with mp.Manager() as manager:
            self._resume_with_manager(
                manager,
                max_workers=max_workers,
                log_progress=log_progress,
                interactive=interactive,
            )

    def _resume_with_manager(
        self,
        manager: SyncManager,
        max_workers: int,
        log_progress: bool,
        interactive: bool,
    ) -> None:
        state = self._load_state()
        assert state is not None
        worker_send: Queue[_WorkerSent] = manager.Queue()
        worker_receive: dict[str, Queue[_WorkerReceived]] = {}

        # To avoid race conditions, we store start times and end times
        # in a separate place and update the state on saving (see
        # `save_state` local function below). The `ongoing` list
        # contains all keys that are in `start_times` but not in
        # `end_times`.

        start_times: dict[str, datetime] = {}
        end_times: dict[str, datetime] = {}
        ongoing: list[str] = []

        # Lock protecting `worker_receive`, `start_times`, `end_times`
        # and `ongoing`.
        lock: threading.Lock = threading.Lock()

        def save_state():
            now = datetime.now()
            with lock:
                for name, start in start_times.items():
                    config = state.configs[name]
                    config.start_time = start
                    end = end_times.get(name, None)
                    if end is not None:
                        config.end_time = end
                        config.interruption_time = None
                    else:
                        assert name in ongoing
                        config.end_time = None
                        config.interruption_time = now
            self._save_state(state)

        def make_snapshot():
            # Print elapsed time for all ongoing tasks
            print(f"Ongoing tasks: {len(ongoing)}.")
            now = datetime.now()
            durations = [(t, now - start_times[t]) for t in ongoing]
            durations.sort(key=lambda x: x[1], reverse=True)
            for name, dt in durations:
                print(f"    {name}: {dt}")
            # Generate snapshot directory
            snapshot_name = str(datetime.now()).replace(" ", "_")
            snapshot_name = snapshot_name.replace(":", "-")
            snapshot_name = snapshot_name.replace(".", "_")
            snapshot_dir = (
                self.absolute_output_dir / SNAPSHOTS_DIR / snapshot_name
            )
            snapshot_dir.mkdir(parents=True, exist_ok=True)
            # Generate snapshot index
            index: list[str] = []
            for name, dt in durations:
                index.append(f"- {name}:")
                status_file = name + SNAPSHOT_STATUS_SUFFIX
                result_file = name + SNAPSHOT_RESULT_SUFFIX
                index.append(f"  - Running for: {dt}")
                index.append(f"  - [Status](./{status_file})")
                index.append(f"  - [Result](./{result_file})")
            index_file = snapshot_dir / SNAPSHOT_INDEX_FILE
            print(f"Creating snapshot: {index_file}")
            with open(index_file, "w") as f:
                f.write("# Snapshot\n\n")
                f.write(f"Taken at {datetime.now()}\n\n")
                f.write("\n".join(index) + "\n")
            # Send snapshot queries
            for name in ongoing:
                ask = worker_receive.get(name, None)
                if ask is None:
                    continue
                ask.put(_AskSnapshot(snapshot_dir))

        def process_worker_messages():
            while True:
                msg = worker_send.get()
                match msg:
                    case _ConfigStarted():
                        with lock:
                            start_times[msg.config_name] = msg.time
                            ongoing.append(msg.config_name)
                            worker_receive[msg.config_name] = msg.respond
                    case _ConfigSnapshot():
                        status_file = msg.snapshot_dir / (
                            msg.config_name + SNAPSHOT_STATUS_SUFFIX
                        )
                        result_file = msg.snapshot_dir / (
                            msg.config_name + SNAPSHOT_RESULT_SUFFIX
                        )
                        with open(status_file, "w") as f:
                            f.write(msg.status_messge or "")
                        with open(result_file, "w") as f:
                            f.write(msg.result or "")
                    case "done":
                        break

        def monitor_input():
            while True:
                input()
                with lock:
                    make_snapshot()

        threading.Thread(target=process_worker_messages).start()
        if interactive:
            # The thread must be a daemon thread so the call to `input`
            # is interrupted when the main program exits.
            threading.Thread(target=monitor_input, daemon=True).start()

        pool_args: dict[str, Any] = {}
        if self.workers_setup is not None:
            setup_arg = self.workers_setup.common()
            pool_args["initializer"] = self.workers_setup.per_worker
            pool_args["initargs"] = (setup_arg,)

        # Launching and completing all tasks
        with ProcessPoolExecutor(
            max_workers=max_workers, **pool_args
        ) as executor:
            futures = [
                executor.submit(
                    _run_config,
                    context=self.context,
                    configs_context=self.configs_context,
                    worker_send=worker_send,
                    worker_receive=manager.Queue(),
                    config_name=name,
                    config_dir=self._config_dir(name),
                    config=info.params,
                    cache_requests=self.cache_requests,
                    log_level=self.log_level,
                    export_raw_trace=self.export_raw_trace,
                    export_log=self.export_log,
                    export_browsable_trace=self.export_browsable_trace,
                    verbose_snapshots=self.verbose_snapshots,
                )
                for name, info in state.configs.items()
                if info.status == "todo"
            ]
            if log_progress:
                _print_progress(state)
            try:
                for future in as_completed(futures):
                    name, success = future.result()
                    state.configs[name].status = (
                        "done" if success else "failed"
                    )
                    with lock:
                        end_times[name] = datetime.now()
                        ongoing.remove(name)
                    if log_progress:
                        _print_progress(state)
                save_state()
                all_successes = all(
                    info.status == "done" for info in state.configs.values()
                )
                if all_successes:
                    print(
                        "\nExperiment successful.\nProducing summary file..."
                    )
                    self.save_summary()
                else:
                    print("\nWarning: some configurations failed.")
            except KeyboardInterrupt:
                print("\nExperiment interrupted. Saving state...")
                save_state()
                print("State saved.")
            worker_send.put("done")

    def replay_config_by_name(self, config_name: str) -> None:
        """
        Replay a configuration with a given name, reusing the cache if
        it exists.

        This way, one can debug the execution of an experiment after the
        fact, without any LLMs being called. Note that one can also
        replay a configuration that failed with an exception within a
        debugger to investigate it.
        """
        state = self._load_state()
        assert state is not None
        assert config_name is not None
        info = state.configs[config_name]
        cmdargs = info.params.instantiate(self.configs_context)
        cmdargs.cache_file = _relative_cache_path(config_name)
        cmdargs.embeddings_cache_file = _relative_embeddings_cache_path(
            config_name
        )
        cmdargs.cache_mode = "replay"
        run_command(
            command=cmd.run_strategy,
            args=cmdargs,
            ctx=self.context,
            dump_statuses=None,
            dump_result=None,
            dump_log=None,
        )

    def replay_config(self, config: C) -> None:
        """
        Replay a configuration. See `replay_config_by_name` for details.
        """
        config_name = self._existing_config_name(config)
        assert config_name is not None
        self.replay_config_by_name(config_name)

    def replay_all_configs(self):
        """
        Replay all configurations, replicating the experiment.
        """
        state = self._load_state()
        assert state is not None
        for config_name in state.configs:
            print(f"Replaying configuration: {config_name}...")
            self.replay_config_by_name(config_name)

    def config_success_values_by_name(
        self, config_name: str, *, type: Any
    ) -> Sequence[Any]:
        """
        Load the success values associated with a given configuration,
        identified by name.
        """
        result_file = (
            _config_dir_path(self.absolute_output_dir, config_name)
            / RESULT_FILE
        )
        return al.load_success_values_from_command_file(result_file, type)

    def config_success_values(self, config: C, *, type: Any) -> Sequence[Any]:
        """
        Load the success values associated with a given configuration.
        """
        config_name = self._existing_config_name(config)
        assert config_name is not None
        return self.config_success_values_by_name(config_name, type=type)

    def save_summary(
        self, ignore_missing: bool = False, add_timing: bool = False
    ):
        """
        Save a summary of the results in a CSV file.

        Arguments:
            ignore_missing: If `True`, configurations whose status is
                "failed" or "todo" are ignored. Otherwise, an error is
                raised.
            add_timing: If `True`, adds a `duration` column to the
                summary, which indicates the wall-clock time spent on
                each configuration.
        """

        output_dir = self.absolute_output_dir
        data = _results_summary(
            output_dir,
            ignore_missing=ignore_missing,
            add_timing=add_timing,
        )
        frame = pd.DataFrame(data)
        summary_file = output_dir / RESULTS_SUMMARY
        frame.to_csv(summary_file, index=False)  # type: ignore

    def load_summary(self):
        """
        Load the summary file into a DataFrame.

        The summary file should have been created before using the
        `save_summary` method.
        """

        output_dir = self.absolute_output_dir
        summary_file = output_dir / RESULTS_SUMMARY
        data = pd.DataFrame, pd.read_csv(summary_file)  # type: ignore
        return data

    def get_status(self) -> dict[str, int]:
        """
        Get the status of the experiment configurations.

        Returns:
            A dictionary with keys 'todo', 'done', 'failed' and their
            counts (i.e., number of configurations with this status).
        """
        state = self._load_state()
        assert state is not None
        statuses = state.configs.values()
        num_todo = sum(1 for c in statuses if c.status == "todo")
        num_done = sum(1 for c in statuses if c.status == "done")
        num_failed = sum(1 for c in statuses if c.status == "failed")
        return {"todo": num_todo, "done": num_done, "failed": num_failed}

    def run_cli(self):
        """
        Run a CLI application that allows controlling the experiment
        from the shell. See `ExperimentCLI` for details.
        """
        fire.Fire(ExperimentCLI(self))  # type: ignore

    def _config_dir(self, config_name: str) -> Path:
        return _config_dir_path(self.absolute_output_dir, config_name)

    def _add_configs_if_needed(self, configs: Sequence[C]) -> None:
        state = self._load_state()
        assert state is not None
        rev = state.inverse_mapping()
        num_added = 0
        for c in configs:
            existing_name = rev(c)
            if existing_name is not None:
                continue
            pass
            num_added += 1
            id = uuid.uuid4()
            if self.config_naming is not None:
                name = self.config_naming(c, id)
            else:
                name = str(id)
            state.configs[name] = ConfigInfo(c, status="todo")
        if num_added > 0:
            print(f"Adding {num_added} new configuration(s).")
        self._save_state(state)

    def _dir_exists(self) -> bool:
        return (
            self.absolute_output_dir.exists()
            and self.absolute_output_dir.is_dir()
        )

    def _state_type(self) -> type[ExperimentState[C]]:
        return ExperimentState[self.config_class]

    def _load_state(self) -> ExperimentState[C] | None:
        with open(self.absolute_output_dir / EXPERIMENT_STATE_FILE, "r") as f:
            parsed = yaml.safe_load(f)
            return pydantic_load(self._state_type(), parsed)

    def _save_state(self, state: ExperimentState[C]) -> None:
        with open(self.absolute_output_dir / EXPERIMENT_STATE_FILE, "w") as f:
            to_save = pydantic_dump(self._state_type(), state)
            yaml.safe_dump(to_save, f, sort_keys=False)

    def _existing_config_name(self, config: C) -> str | None:
        state = self._load_state()
        assert state is not None
        for name, info in state.configs.items():
            if info.params == config:
                return name
        return None

absolute_output_dir property

absolute_output_dir: Path

Get the absolute output directory, by combining the context.root and output_dir paths.

load

load() -> Self

Load the experiment.

If no persistent state exists on disk, it is created (with all configurations marked with "todo" status). If some experiment state exists on disk, it is loaded. If more configurations are specified in self.configs than are specified on disk, the missing configurations are added to the persistent state and marked with "todo". If the persistent state contains configurations that are not specified in self.configs, a warning is shown. Use the clean_index method to remove these configurations from the persistent state.

Return self, so as to allow chaining.

Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def load(self) -> Self:
    """
    Load the experiment.

    If no persistent state exists on disk, it is created (with all
    configurations marked with "todo" status). If some experiment
    state exists on disk, it is loaded. If more configurations are
    specified in `self.configs` than are specified on disk, the
    missing configurations are added to the persistent state and
    marked with "todo". If the persistent state contains
    configurations that are not specified in `self.configs`, a
    warning is shown. Use the `clean_index` method to remove these
    configurations from the persistent state.

    Return `self`, so as to allow chaining.
    """
    if not self._dir_exists():
        # If we create the experiment for the first time
        output_dir = self.absolute_output_dir
        print(f"Creating experiment directory: {output_dir}.")
        output_dir.mkdir(parents=True, exist_ok=True)
        state = ExperimentState[C](self.name, self.description, {})
        self._save_state(state)
    if self.configs is not None:
        self._add_configs_if_needed(self.configs)
        # Print a warning if the state on disk features additional configs.
        state = self._load_state()
        assert state is not None
        assert len(self.configs) <= len(state.configs)
        if len(self.configs) < len(state.configs):
            print(
                f"Warning: {len(state.configs) - len(self.configs)} "
                "additional configuration(s) found in the state."
            )
    return self

is_done

is_done() -> bool

Check if the experiment is done, i.e., all configurations are marked as "done".

Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
329
330
331
332
333
334
335
336
def is_done(self) -> bool:
    """
    Check if the experiment is done, i.e., all configurations are
    marked as "done".
    """
    state = self._load_state()
    assert state is not None
    return all(info.status == "done" for info in state.configs.values())

clean_index

clean_index() -> None

Remove from the persistent state file all configurations that are not mentioned in self.configs.

Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
def clean_index(self) -> None:
    """
    Remove from the persistent state file all configurations that
    are not mentioned in `self.configs`.
    """
    state = self._load_state()
    assert state is not None
    assert self.configs is not None
    in_config = set(_config_unique_repr(c) for c in self.configs)
    to_delete = [
        c
        for c, i in state.configs.items()
        if _config_unique_repr(i.params) not in in_config
    ]
    print(f"Removing {len(to_delete)} configuration(s) from the state.")
    for c in to_delete:
        del state.configs[c]
    self._save_state(state)

mark_errors_as_todos

mark_errors_as_todos()

Update the persistent state to mark all configurations with status "failed" as "todo". They will be retried when the resume method is called.

Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
357
358
359
360
361
362
363
364
365
366
367
368
def mark_errors_as_todos(self):
    """
    Update the persistent state to mark all configurations with
    status "failed" as "todo". They will be retried when the
    `resume` method is called.
    """
    state = self._load_state()
    assert state is not None
    for _, info in state.configs.items():
        if info.status == "failed":
            info.status = "todo"
    self._save_state(state)

resume

resume(max_workers: int = 1, log_progress: bool = True, interactive: bool = False)

Resume the experiment, running all configurations with state "todo". Every configuration run results in marking the configuration's state with either "failed" (in case an uncaught exception was raised) or "done".

The whole process can be interrupted using Ctrl-C, in which case the persistent experiment state is stored on disk, a message is printed saying so, and Ctrl-C can be hit again until all workers are successfully terminated.

A summary file is produced at the end of the experiment using the summary_file method if all configurations were run successfully.

Attributes:

Name Type Description
max_workers

Number of parallel process workers to use.

log_progress

Whether to show a progress bar in the console.

interactive

If True, pressing Enter at any point during execution prints the current status of all workers and dumps a snapshot of ongoing tasks on disk. This is useful to investigate seemingly stuck tasks.

Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
def resume(
    self,
    max_workers: int = 1,
    log_progress: bool = True,
    interactive: bool = False,
):
    """
    Resume the experiment, running all configurations with state
    "todo". Every configuration run results in marking the
    configuration's state with either "failed" (in case an uncaught
    exception was raised) or "done".

    The whole process can be interrupted using Ctrl-C, in which case
    the persistent experiment state is stored on disk, a message is
    printed saying so, and Ctrl-C can be hit again until all workers
    are successfully terminated.

    A summary file is produced at the end of the experiment using
    the `summary_file` method if all configurations were run
    successfully.

    Attributes:
        max_workers: Number of parallel process workers to use.
        log_progress: Whether to show a progress bar in the console.
        interactive: If `True`, pressing `Enter` at any point during
            execution prints the current status of all workers and
            dumps a snapshot of ongoing tasks on disk. This is
            useful to investigate seemingly stuck tasks.
    """
    with mp.Manager() as manager:
        self._resume_with_manager(
            manager,
            max_workers=max_workers,
            log_progress=log_progress,
            interactive=interactive,
        )

replay_config_by_name

replay_config_by_name(config_name: str) -> None

Replay a configuration with a given name, reusing the cache if it exists.

This way, one can debug the execution of an experiment after the fact, without any LLMs being called. Note that one can also replay a configuration that failed with an exception within a debugger to investigate it.

Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
def replay_config_by_name(self, config_name: str) -> None:
    """
    Replay a configuration with a given name, reusing the cache if
    it exists.

    This way, one can debug the execution of an experiment after the
    fact, without any LLMs being called. Note that one can also
    replay a configuration that failed with an exception within a
    debugger to investigate it.
    """
    state = self._load_state()
    assert state is not None
    assert config_name is not None
    info = state.configs[config_name]
    cmdargs = info.params.instantiate(self.configs_context)
    cmdargs.cache_file = _relative_cache_path(config_name)
    cmdargs.embeddings_cache_file = _relative_embeddings_cache_path(
        config_name
    )
    cmdargs.cache_mode = "replay"
    run_command(
        command=cmd.run_strategy,
        args=cmdargs,
        ctx=self.context,
        dump_statuses=None,
        dump_result=None,
        dump_log=None,
    )

replay_config

replay_config(config: C) -> None

Replay a configuration. See replay_config_by_name for details.

Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
611
612
613
614
615
616
617
def replay_config(self, config: C) -> None:
    """
    Replay a configuration. See `replay_config_by_name` for details.
    """
    config_name = self._existing_config_name(config)
    assert config_name is not None
    self.replay_config_by_name(config_name)

replay_all_configs

replay_all_configs()

Replay all configurations, replicating the experiment.

Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
619
620
621
622
623
624
625
626
627
def replay_all_configs(self):
    """
    Replay all configurations, replicating the experiment.
    """
    state = self._load_state()
    assert state is not None
    for config_name in state.configs:
        print(f"Replaying configuration: {config_name}...")
        self.replay_config_by_name(config_name)

config_success_values_by_name

config_success_values_by_name(config_name: str, *, type: Any) -> Sequence[Any]

Load the success values associated with a given configuration, identified by name.

Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
629
630
631
632
633
634
635
636
637
638
639
640
def config_success_values_by_name(
    self, config_name: str, *, type: Any
) -> Sequence[Any]:
    """
    Load the success values associated with a given configuration,
    identified by name.
    """
    result_file = (
        _config_dir_path(self.absolute_output_dir, config_name)
        / RESULT_FILE
    )
    return al.load_success_values_from_command_file(result_file, type)

config_success_values

config_success_values(config: C, *, type: Any) -> Sequence[Any]

Load the success values associated with a given configuration.

Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
642
643
644
645
646
647
648
def config_success_values(self, config: C, *, type: Any) -> Sequence[Any]:
    """
    Load the success values associated with a given configuration.
    """
    config_name = self._existing_config_name(config)
    assert config_name is not None
    return self.config_success_values_by_name(config_name, type=type)

save_summary

save_summary(ignore_missing: bool = False, add_timing: bool = False)

Save a summary of the results in a CSV file.

Parameters:

Name Type Description Default
ignore_missing bool

If True, configurations whose status is "failed" or "todo" are ignored. Otherwise, an error is raised.

False
add_timing bool

If True, adds a duration column to the summary, which indicates the wall-clock time spent on each configuration.

False
Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
def save_summary(
    self, ignore_missing: bool = False, add_timing: bool = False
):
    """
    Save a summary of the results in a CSV file.

    Arguments:
        ignore_missing: If `True`, configurations whose status is
            "failed" or "todo" are ignored. Otherwise, an error is
            raised.
        add_timing: If `True`, adds a `duration` column to the
            summary, which indicates the wall-clock time spent on
            each configuration.
    """

    output_dir = self.absolute_output_dir
    data = _results_summary(
        output_dir,
        ignore_missing=ignore_missing,
        add_timing=add_timing,
    )
    frame = pd.DataFrame(data)
    summary_file = output_dir / RESULTS_SUMMARY
    frame.to_csv(summary_file, index=False)  # type: ignore

load_summary

load_summary()

Load the summary file into a DataFrame.

The summary file should have been created before using the save_summary method.

Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
675
676
677
678
679
680
681
682
683
684
685
686
def load_summary(self):
    """
    Load the summary file into a DataFrame.

    The summary file should have been created before using the
    `save_summary` method.
    """

    output_dir = self.absolute_output_dir
    summary_file = output_dir / RESULTS_SUMMARY
    data = pd.DataFrame, pd.read_csv(summary_file)  # type: ignore
    return data

get_status

get_status() -> dict[str, int]

Get the status of the experiment configurations.

Returns:

Type Description
dict[str, int]

A dictionary with keys 'todo', 'done', 'failed' and their

dict[str, int]

counts (i.e., number of configurations with this status).

Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
def get_status(self) -> dict[str, int]:
    """
    Get the status of the experiment configurations.

    Returns:
        A dictionary with keys 'todo', 'done', 'failed' and their
        counts (i.e., number of configurations with this status).
    """
    state = self._load_state()
    assert state is not None
    statuses = state.configs.values()
    num_todo = sum(1 for c in statuses if c.status == "todo")
    num_done = sum(1 for c in statuses if c.status == "done")
    num_failed = sum(1 for c in statuses if c.status == "failed")
    return {"todo": num_todo, "done": num_done, "failed": num_failed}

run_cli

run_cli()

Run a CLI application that allows controlling the experiment from the shell. See ExperimentCLI for details.

Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
704
705
706
707
708
709
def run_cli(self):
    """
    Run a CLI application that allows controlling the experiment
    from the shell. See `ExperimentCLI` for details.
    """
    fire.Fire(ExperimentCLI(self))  # type: ignore

ExperimentConfig

Bases: Protocol

A configuration is a dataclass that holds a set of hyperparameters, which induce a run_strategy call.

Note

The following arguments must not be set since they are managed by the Experiment class. Any specified value may be discarded.

  • cache_file
  • embeddings_cache_file
  • cache_mode

The following arguments may be set, but the Experiment class offers options to override them.

  • log_level
  • export_raw_trace
  • export_log
  • export_browsable_trace
  • export_all_on_pull
Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class ExperimentConfig(Protocol):
    """
    A configuration is a dataclass that holds a set of hyperparameters,
    which induce a `run_strategy` call.

    !!! note
        The following arguments must not be set since they are managed
        by the `Experiment` class. Any specified value may be discarded.

        - `cache_file`
        - `embeddings_cache_file`
        - `cache_mode`

        The following arguments may be set, but the `Experiment` class
        offers options to override them.

        - `log_level`
        - `export_raw_trace`
        - `export_log`
        - `export_browsable_trace`
        - `export_all_on_pull`
    """

    def instantiate(self, context: object) -> cmd.RunStrategyArgs:
        """
        Instantiate the configuration into a `run_strategy` command
        instance.

        Arguments:
            context: Additional global context information that can be
                optionally passed by the experiment. By default,
                experiments just pass `None`.
        """
        ...

instantiate

instantiate(context: object) -> RunStrategyArgs

Instantiate the configuration into a run_strategy command instance.

Parameters:

Name Type Description Default
context object

Additional global context information that can be optionally passed by the experiment. By default, experiments just pass None.

required
Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
 97
 98
 99
100
101
102
103
104
105
106
107
def instantiate(self, context: object) -> cmd.RunStrategyArgs:
    """
    Instantiate the configuration into a `run_strategy` command
    instance.

    Arguments:
        context: Additional global context information that can be
            optionally passed by the experiment. By default,
            experiments just pass `None`.
    """
    ...

ExperimentState dataclass

Persistent state of an experiment, stored on disk as a YAML file.

Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
@dataclass
class ExperimentState[C: ExperimentConfig]:
    """
    Persistent state of an experiment, stored on disk as a YAML file.
    """

    name: str | None
    description: str | None
    configs: dict[str, ConfigInfo[C]]

    def inverse_mapping(self) -> Callable[[C], str | None]:
        """
        Compute an inverse function mapping configurations to their
        unique names (or None if not in the state).
        """
        tab: dict[str, str] = {}
        for name, info in self.configs.items():
            tab[_config_unique_repr(info.params)] = name

        def reverse(config: C) -> str | None:
            return tab.get(_config_unique_repr(config), None)

        return reverse

inverse_mapping

inverse_mapping() -> Callable[[C], str | None]

Compute an inverse function mapping configurations to their unique names (or None if not in the state).

Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
143
144
145
146
147
148
149
150
151
152
153
154
155
def inverse_mapping(self) -> Callable[[C], str | None]:
    """
    Compute an inverse function mapping configurations to their
    unique names (or None if not in the state).
    """
    tab: dict[str, str] = {}
    for name, info in self.configs.items():
        tab[_config_unique_repr(info.params)] = name

    def reverse(config: C) -> str | None:
        return tab.get(_config_unique_repr(config), None)

    return reverse

ConfigInfo dataclass

Information stored in the persistent configuration state for each configuration.

Attributes:

Name Type Description
params C

The configuration.

status Literal['todo', 'done', 'failed']

Status of the configuration.

start_time datetime | None

Time at which the configuration execution started.

end_time datetime | None

Time at which the configuration execution ended.

interruption_time datetime | None

If the configuration execution was interrupted, the time at which the interruption happened (the status must then be todo).

Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
@dataclass
class ConfigInfo[C: ExperimentConfig]:
    """
    Information stored in the persistent configuration state for each
    configuration.

    Attributes:
        params: The configuration.
        status: Status of the configuration.
        start_time: Time at which the configuration execution started.
        end_time: Time at which the configuration execution ended.
        interruption_time: If the configuration execution was interrupted,
            the time at which the interruption happened (the `status`
            must then be `todo`).
    """

    params: C
    status: Literal["todo", "done", "failed"]
    start_time: datetime | None = None
    end_time: datetime | None = None
    interruption_time: datetime | None = None

WorkersSetup dataclass

Specification for the setup work that must be performed on all processes.

Attributes:

Name Type Description
common Callable[[], T]

A function that is called one on the main process. It must return a picklable object.

per_worker Callable[[T], None]

A function that is called on each worker and passed the result of common as an argument. This function must be a top-level function since it is pickled and sent to a remote process.

Example

Suppose one wants to spawn a single proving server that is used by all workers. One can setup the server in common, return some access information (e.g. a port number), and then have per_worker configure each worker to connect to the server.

Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
@dataclass
class WorkersSetup[T]:
    """
    Specification for the setup work that must be performed on all
    processes.

    Attributes:
        common: A function that is called one on the **main process**.
            It must return a picklable object.
        per_worker: A function that is called on **each worker** and
            passed the result of `common` as an argument. This function
            must be a top-level function since it is pickled and sent to
            a remote process.

    !!! tip "Example"
        Suppose one wants to spawn a single proving server that is used
        by all workers. One can setup the server in `common`, return
        some access information (e.g. a port number), and then have
        `per_worker` configure each worker to connect to the server.
    """

    common: Callable[[], T]
    per_worker: Callable[[T], None]

ExperimentCLI

A CLI application for controlling an experiment from the shell.

Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
class ExperimentCLI:
    """
    A CLI application for controlling an experiment from the shell.
    """

    def __init__(self, experiment: Experiment[Any]):
        self.experiment = experiment

    def run(
        self,
        *,
        max_workers: int = 1,
        retry_errors: bool = False,
        interactive: bool = False,
        log: bool | None = None,
        log_level: str | None = None,
        cache: bool | None = None,
        raw_trace: bool | None = None,
        browsable_trace: bool | None = None,
        verbose_snapshots: bool | None = None,
    ):
        """
        Start or resume the experiment.

        Attributes:
            max_workers: Number of parallel process workers to use.
            retry_errors: Mark failed configurations to be retried.
            log_level: If provided, overrides the `log_level` argument of
                the command returned by the `experiment` function.
            interactive: If `True`, pressing `Enter` at any point during
                execution prints the current status of all workers and
                dumps a snapshot of ongoing tasks on disk.
            cache: If provided, override the `cache_requests` setting of
                the experiment.
            log: If provided, override the `export_log` setting of the
                experiment.
            log_level: If provided, override the `log_level` setting of
                the experiment.
            raw_trace: If provided, override the `export_raw_trace`
                setting of the experiment.
            browsable_trace: If provided, override the
                `export_browsable_trace` setting of the experiment.
            verbose_snapshots: If provided, override the
                `verbose_snapshots` setting of the experiment.
        """
        if cache is not None:
            self.experiment.cache_requests = cache
        if raw_trace is not None:
            self.experiment.export_raw_trace = raw_trace
        if browsable_trace is not None:
            self.experiment.export_browsable_trace = browsable_trace
        if log is not None:
            self.experiment.export_log = log
        if verbose_snapshots is not None:
            self.experiment.verbose_snapshots = verbose_snapshots
        if log_level is not None:
            assert dp.valid_log_level(log_level), (
                f"Invalid log level: {log_level}"
            )
            self.experiment.log_level = log_level

        self.experiment.load()
        if retry_errors:
            self.experiment.mark_errors_as_todos()
        self.experiment.resume(
            max_workers=max_workers, interactive=interactive
        )

    def status(self):
        """
        Print the status of the experiment.
        """
        status_counts = self.experiment.get_status()
        print(
            f"Experiment '{self.experiment.name}':\n"
            f"  - {status_counts['todo']} configurations to do\n"
            f"  - {status_counts['done']} configurations done\n"
            f"  - {status_counts['failed']} configurations failed"
        )

    def replay(self, config: str | None = None):
        """
        Replay one or all configurations.

        Arguments:
            config: The name of the configuration to replay. If not
                provided, all configurations are replayed.
        """
        self.experiment.load()
        if config is None:
            self.experiment.replay_all_configs()
        else:
            self.experiment.replay_config_by_name(config)

    def clean_index(self):
        """
        Clean unregistered configurations from the persistent state
        file.
        """
        self.experiment.load().clean_index()

    def force_summary(self, add_timing: bool = False):
        """
        Force the generation of a summary file, even if not all
        configurations were successfully run.
        """
        self.experiment.load().save_summary(
            ignore_missing=True, add_timing=add_timing
        )

run

run(
    *,
    max_workers: int = 1,
    retry_errors: bool = False,
    interactive: bool = False,
    log: bool | None = None,
    log_level: str | None = None,
    cache: bool | None = None,
    raw_trace: bool | None = None,
    browsable_trace: bool | None = None,
    verbose_snapshots: bool | None = None,
)

Start or resume the experiment.

Attributes:

Name Type Description
max_workers

Number of parallel process workers to use.

retry_errors

Mark failed configurations to be retried.

log_level

If provided, overrides the log_level argument of the command returned by the experiment function.

interactive

If True, pressing Enter at any point during execution prints the current status of all workers and dumps a snapshot of ongoing tasks on disk.

cache

If provided, override the cache_requests setting of the experiment.

log

If provided, override the export_log setting of the experiment.

log_level

If provided, override the log_level setting of the experiment.

raw_trace

If provided, override the export_raw_trace setting of the experiment.

browsable_trace

If provided, override the export_browsable_trace setting of the experiment.

verbose_snapshots

If provided, override the verbose_snapshots setting of the experiment.

Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
def run(
    self,
    *,
    max_workers: int = 1,
    retry_errors: bool = False,
    interactive: bool = False,
    log: bool | None = None,
    log_level: str | None = None,
    cache: bool | None = None,
    raw_trace: bool | None = None,
    browsable_trace: bool | None = None,
    verbose_snapshots: bool | None = None,
):
    """
    Start or resume the experiment.

    Attributes:
        max_workers: Number of parallel process workers to use.
        retry_errors: Mark failed configurations to be retried.
        log_level: If provided, overrides the `log_level` argument of
            the command returned by the `experiment` function.
        interactive: If `True`, pressing `Enter` at any point during
            execution prints the current status of all workers and
            dumps a snapshot of ongoing tasks on disk.
        cache: If provided, override the `cache_requests` setting of
            the experiment.
        log: If provided, override the `export_log` setting of the
            experiment.
        log_level: If provided, override the `log_level` setting of
            the experiment.
        raw_trace: If provided, override the `export_raw_trace`
            setting of the experiment.
        browsable_trace: If provided, override the
            `export_browsable_trace` setting of the experiment.
        verbose_snapshots: If provided, override the
            `verbose_snapshots` setting of the experiment.
    """
    if cache is not None:
        self.experiment.cache_requests = cache
    if raw_trace is not None:
        self.experiment.export_raw_trace = raw_trace
    if browsable_trace is not None:
        self.experiment.export_browsable_trace = browsable_trace
    if log is not None:
        self.experiment.export_log = log
    if verbose_snapshots is not None:
        self.experiment.verbose_snapshots = verbose_snapshots
    if log_level is not None:
        assert dp.valid_log_level(log_level), (
            f"Invalid log level: {log_level}"
        )
        self.experiment.log_level = log_level

    self.experiment.load()
    if retry_errors:
        self.experiment.mark_errors_as_todos()
    self.experiment.resume(
        max_workers=max_workers, interactive=interactive
    )

status

status()

Print the status of the experiment.

Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
def status(self):
    """
    Print the status of the experiment.
    """
    status_counts = self.experiment.get_status()
    print(
        f"Experiment '{self.experiment.name}':\n"
        f"  - {status_counts['todo']} configurations to do\n"
        f"  - {status_counts['done']} configurations done\n"
        f"  - {status_counts['failed']} configurations failed"
    )

replay

replay(config: str | None = None)

Replay one or all configurations.

Parameters:

Name Type Description Default
config str | None

The name of the configuration to replay. If not provided, all configurations are replayed.

None
Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
def replay(self, config: str | None = None):
    """
    Replay one or all configurations.

    Arguments:
        config: The name of the configuration to replay. If not
            provided, all configurations are replayed.
    """
    self.experiment.load()
    if config is None:
        self.experiment.replay_all_configs()
    else:
        self.experiment.replay_config_by_name(config)

clean_index

clean_index()

Clean unregistered configurations from the persistent state file.

Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
1058
1059
1060
1061
1062
1063
def clean_index(self):
    """
    Clean unregistered configurations from the persistent state
    file.
    """
    self.experiment.load().clean_index()

force_summary

force_summary(add_timing: bool = False)

Force the generation of a summary file, even if not all configurations were successfully run.

Source code in src/delphyne/stdlib/experiments/experiment_launcher.py
1065
1066
1067
1068
1069
1070
1071
1072
def force_summary(self, add_timing: bool = False):
    """
    Force the generation of a summary file, even if not all
    configurations were successfully run.
    """
    self.experiment.load().save_summary(
        ignore_missing=True, add_timing=add_timing
    )