Skip to content

Commands

Tasks and Commands

CommandExecutionContext dataclass

Context information available to all commands.

This information is usually specified in delphyne.yaml project files, and also locally in @config blocks. All values have defaults. All paths are usually expressed relative to a single workspace root directory, and can be made absolute using the with_root method.

Attributes:

Name Type Description
strategy_dirs Sequence[Path]

A sequence of directories in which strategy modules can be found.

modules Sequence[str]

A sequence of module in which Python object identifiers can be resolved (module names can feature .).

demo_files Sequence[Path]

A sequence of demonstration files (either including or excluding the *.demo.yaml extension).

prompt_dirs Sequence[Path]

A sequence of directories in which to look for prompt templates.

data_dirs Sequence[Path]

A sequence of directories in which to look for data files.

cache_root Path | None

The directory in which to store all request cache subdirectories.

result_refresh_period float | None

The period in seconds at which the current result is computed and communicated to the UI (e.g., the period at which the current trace is exported when running oracular programs). If None, the result is never refreshed (until the command terminates).

status_refresh_period float | None

The period in seconds at which the current status message is communicated to the UI. If None, the status is never refreshed (until the command terminates).

Local conguration blocks

Demonstration and command files can override some configuration information from the delphyne.yaml file by featuring a comment block such as:

# @config
# modules: ["my_strategy_module"]
# demo_files: ["demo.yaml"]
# @end

The comment block must be placed at the start of the file, possibly after other comments.

Source code in src/delphyne/stdlib/tasks.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
@dataclass(frozen=True, kw_only=True)
class CommandExecutionContext:
    """
    Context information available to all commands.

    This information is usually specified in `delphyne.yaml` project
    files, and also locally in `@config` blocks. All values have
    defaults. All paths are usually expressed relative to a single
    workspace root directory, and can be made absolute using the
    `with_root` method.

    Attributes:
        strategy_dirs: A sequence of directories in which strategy
            modules can be found.
        modules: A sequence of module in which Python object identifiers
            can be resolved (module names can feature `.`).
        demo_files: A sequence of demonstration files (either including or
            excluding the `*.demo.yaml` extension).
        prompt_dirs: A sequence of directories in which to look for
            prompt templates.
        data_dirs: A sequence of directories in which to look for data
            files.
        cache_root: The directory in which to store all request cache
            subdirectories.
        result_refresh_period: The period in seconds at which the
            current result is computed and communicated to the UI (e.g.,
            the period at which the current trace is exported when
            running oracular programs). If `None`, the result is never
            refreshed (until the command terminates).
        status_refresh_period: The period in seconds at which the
            current status message is communicated to the UI. If `None`,
            the status is never refreshed (until the command
            terminates).

    !!! info "Local conguration blocks"
        Demonstration and command files can override some configuration
        information from the `delphyne.yaml` file by featuring a comment
        block such as:

        ```yaml
        # @config
        # modules: ["my_strategy_module"]
        # demo_files: ["demo.yaml"]
        # @end
        ```

        The comment block must be placed at the start of the file,
        possibly after other comments.
    """

    strategy_dirs: Sequence[Path] = DEFAULT_STRATEGY_DIRS
    modules: Sequence[str] = ()
    demo_files: Sequence[Path] = ()
    prompt_dirs: Sequence[Path] = DEFAULT_PROMPTS_DIRS
    data_dirs: Sequence[Path] = DEFAULT_DATA_DIRS
    cache_root: Path | None = None
    result_refresh_period: float | None = None
    status_refresh_period: float | None = None

    @property
    def base(self) -> analysis.DemoExecutionContext:
        """
        Obtain a demonstration execution context.
        """
        return analysis.DemoExecutionContext(self.strategy_dirs, self.modules)

    def with_root(self, root: Path) -> "CommandExecutionContext":
        """
        Make all paths absolute given a path to the workspace root.
        """
        return CommandExecutionContext(
            modules=self.modules,
            demo_files=[root / f for f in self.demo_files],
            strategy_dirs=[root / d for d in self.strategy_dirs],
            prompt_dirs=[root / d for d in self.prompt_dirs],
            data_dirs=[root / d for d in self.data_dirs],
            cache_root=None if self.cache_root is None else self.cache_root,
            result_refresh_period=self.result_refresh_period,
            status_refresh_period=self.status_refresh_period,
        )

base property

Obtain a demonstration execution context.

with_root

with_root(root: Path) -> CommandExecutionContext

Make all paths absolute given a path to the workspace root.

Source code in src/delphyne/stdlib/tasks.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def with_root(self, root: Path) -> "CommandExecutionContext":
    """
    Make all paths absolute given a path to the workspace root.
    """
    return CommandExecutionContext(
        modules=self.modules,
        demo_files=[root / f for f in self.demo_files],
        strategy_dirs=[root / d for d in self.strategy_dirs],
        prompt_dirs=[root / d for d in self.prompt_dirs],
        data_dirs=[root / d for d in self.data_dirs],
        cache_root=None if self.cache_root is None else self.cache_root,
        result_refresh_period=self.result_refresh_period,
        status_refresh_period=self.status_refresh_period,
    )

TaskContext

Bases: Protocol

Context object accessible to all tasks for reporting progress and intermediate results.

Source code in src/delphyne/stdlib/tasks.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class TaskContext[T](typing.Protocol):
    """
    Context object accessible to all tasks for reporting progress and
    intermediate results.
    """

    # To be called externally
    def interrupt(self) -> None: ...

    # To be called internally
    def interruption_requested(self) -> bool: ...
    def log(self, message: str) -> None: ...
    def set_status(self, message: str) -> None: ...
    def set_result(self, result: T) -> None: ...
    def raise_internal_error(self, message: str) -> None: ...

run_command

run_command(
    command: Command[A, T],
    args: A,
    ctx: CommandExecutionContext,
    dump_statuses: Path | None = None,
    dump_result: Path | None = None,
    dump_log: Path | None = None,
    on_status: Callable[[str], None] | None = None,
    add_header: bool = True,
    handle_sigint: bool = False,
) -> CommandResult[T | None]

A simple command runner.

Parameters:

Name Type Description Default
command Command[A, T]

The command to run.

required
args A

The arguments to pass to the command.

required
ctx CommandExecutionContext

The command execution context.

required
dump_statuses Path | None

A file in which to dump status messages at regular intervals.

None
dump_result Path | None

A file in which to dump intermediate and final results, whose content is refreshed at regular intervals.

None
dump_log Path | None

A file in which to dump log messages.

None
on_status Callable[[str], None] | None

A function to call every time a status message is issued.

None
add_header bool

If True, the dumped result is prefixed with a header containing the command name and arguments.

True
handle_sigint bool

If True, pressing Ctrl+C sends the command task an interruption request by calling TaskContext.interrupt, allowing it to gracefully terminate instead of abruptly interrupting it.

False

Non-existing directories are created automatically.

Source code in src/delphyne/stdlib/tasks.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
def run_command[A, T](
    command: Command[A, T],
    args: A,
    ctx: CommandExecutionContext,
    dump_statuses: Path | None = None,
    dump_result: Path | None = None,
    dump_log: Path | None = None,
    on_status: Callable[[str], None] | None = None,
    add_header: bool = True,
    handle_sigint: bool = False,
) -> CommandResult[T | None]:
    """
    A simple command runner.

    Arguments:
        command: The command to run.
        args: The arguments to pass to the command.
        ctx: The command execution context.
        dump_statuses: A file in which to dump status messages at
            regular intervals.
        dump_result: A file in which to dump intermediate and final
            results, whose content is refreshed at regular intervals.
        dump_log: A file in which to dump log messages.
        on_status: A function to call every time a status message is
            issued.
        add_header: If `True`, the dumped result is prefixed with a
            header containing the command name and arguments.
        handle_sigint: If `True`, pressing `Ctrl+C` sends the command
            task an interruption request by calling
            `TaskContext.interrupt`, allowing it to gracefully terminate
            instead of abruptly interrupting it.

    Non-existing directories are created automatically.
    """

    class Handler:
        def __init__(self):
            self.result: CommandResult[T | None] = CommandResult([], None)
            self.interrupted = False

        def interrupt(self) -> None:
            self.interrupted = True

        def interruption_requested(self) -> bool:
            return self.interrupted

        def log(self, message: str) -> None:
            if dump_log is not None:
                os.makedirs(dump_log.parent, exist_ok=True)
                with open(dump_log, "a") as f:
                    f.write(message + "\n")

        def set_status(self, message: str) -> None:
            if dump_statuses is not None:
                os.makedirs(dump_statuses.parent, exist_ok=True)
                with open(dump_statuses, "a") as f:
                    f.write(message + "\n")
            if on_status is not None:
                on_status(message)

        def set_result(self, result: CommandResult[T]) -> None:
            self.result = result
            if dump_result is not None:
                ret_ty = command_optional_result_wrapper_type(command)
                ret: Any = ty.pydantic_dump(ret_ty, result)
                if add_header:
                    args_type = command_args_type(command)
                    ret = {
                        "command": command_name(command),
                        "args": ty.pydantic_dump(args_type, args),
                        "outcome": ret,
                    }
                os.makedirs(dump_result.parent, exist_ok=True)
                with open(dump_result, "w") as f:
                    ret_yaml = pretty_yaml(ret)
                    f.write("# delphyne-command\n\n" + ret_yaml)

        def raise_internal_error(self, message: str) -> None:
            error = ("error", f"Internal error: {message}")
            self.result = CommandResult([error], None)

    handler = Handler()
    if not handle_sigint:
        command(handler, ctx, args)
    else:
        watch_sigint(
            task=lambda: command(handler, ctx, args),
            on_sigint=lambda: handler.interrupt(),
        )
    return handler.result

Standard Commands

run_strategy

run_strategy(
    task: TaskContext[CommandResult[RunStrategyResponse]],
    exe: CommandExecutionContext,
    args: RunStrategyArgs,
)

Command for running an oracular program from a serialized specification.

Source code in src/delphyne/stdlib/commands/run_strategy.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def run_strategy(
    task: ta.TaskContext[ta.CommandResult[RunStrategyResponse]],
    exe: ta.CommandExecutionContext,
    args: RunStrategyArgs,
):
    """
    Command for running an oracular program from a serialized
    specification.
    """
    loader = analysis.ObjectLoader(exe.base)
    strategy = loader.load_strategy_instance(args.strategy, args.args)
    policy = loader.load_and_call_function(args.policy, args.policy_args)
    assert isinstance(policy, dp.AbstractPolicy)
    policy = cast(pol.Policy[Any, Any], policy)
    run_loaded_strategy(
        task=task,
        exe=exe,
        args=RunLoadedStrategyArgs(
            strategy=strategy,
            policy=policy,
            num_generated=args.num_generated,
            budget=args.budget,
            cache_dir=args.cache_dir,
            cache_mode=args.cache_mode,
            cache_format=args.cache_format,
            export_raw_trace=args.export_raw_trace,
            export_log=args.export_log,
            export_browsable_trace=args.export_browsable_trace,
        ),
    )

RunStrategyArgs dataclass

Arguments for the run_strategy command that runs an oracular program.

Attributes:

Name Type Description
strategy str

Name of the strategy to run.

args dict[str, object]

Arguments to pass to the strategy constructor.

policy str

Name of the policy to use.

policy_args dict[str, object]

Arguments to pass to the policy constructor.

num_generated int

Number of success values to generate.

budget dict[str, float]

Budget limit (infinite for unspecified metrics).

cache_dir str | None

Subdirectory of the global cache directory to use for caching, or None to disable caching.

cache_mode CacheMode

Cache mode to use.

cache_format CacheFormat

Cache format to use.

export_raw_trace bool

Whether to export the raw execution trace.

export_log bool

Whether to export the log messages.

export_browsable_trace bool

Whether to export a browsable trace, which can be visualized in the VSCode extension (see delphyne.analysis.feedback.Trace).

Source code in src/delphyne/stdlib/commands/run_strategy.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
@dataclass(kw_only=True)
class RunStrategyArgs:
    """
    Arguments for the `run_strategy` command that runs an oracular
    program.

    Attributes:
        strategy: Name of the strategy to run.
        args: Arguments to pass to the strategy constructor.
        policy: Name of the policy to use.
        policy_args: Arguments to pass to the policy constructor.
        num_generated: Number of success values to generate.
        budget: Budget limit (infinite for unspecified metrics).
        cache_dir: Subdirectory of the global cache directory to use for
            caching, or `None` to disable caching.
        cache_mode: Cache mode to use.
        cache_format: Cache format to use.
        export_raw_trace: Whether to export the raw execution trace.
        export_log: Whether to export the log messages.
        export_browsable_trace: Whether to export a browsable trace,
            which can be visualized in the VSCode extension (see
            [delphyne.analysis.feedback.Trace][]).
    """

    strategy: str
    args: dict[str, object]
    policy: str
    policy_args: dict[str, object]
    budget: dict[str, float]
    num_generated: int = 1
    cache_dir: str | None = None
    cache_mode: ca.CacheMode = "read_write"
    cache_format: CacheFormat = "yaml"
    export_raw_trace: bool = True
    export_log: bool = True
    export_browsable_trace: bool = True

answer_query

answer_query(
    task: TaskContext[CommandResult[AnswerQueryResponse]],
    exe: CommandExecutionContext,
    cmd: AnswerQueryArgs,
)

A command for answering a query. See AnswerQueryArgs.

Source code in src/delphyne/stdlib/commands/answer_query.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def answer_query(
    task: ta.TaskContext[ta.CommandResult[AnswerQueryResponse]],
    exe: ta.CommandExecutionContext,
    cmd: AnswerQueryArgs,
):
    """
    A command for answering a query. See `AnswerQueryArgs`.
    """
    with_cache_spec(
        partial(answer_query_with_cache, task, exe, cmd),
        cache_root=exe.cache_root,
        cache_dir=cmd.cache_dir,
        cache_mode=cmd.cache_mode,
        cache_format=cmd.cache_format,
    )

AnswerQueryArgs dataclass

Arguments for the answer_query command.

Attributes:

Name Type Description
query str

The name of the query to answer.

args dict[str, object]

Arguments for the query, as a dictionary of JSON values.

prompt_only bool

If True, a dummy model is used that always errors, so that only the prompt can be seen in the logs.

model str | None

The name of the model to use for answering the query.

num_answers int

The number of answers to generate.

iterative_mode bool

Whether to answer the query in iterative mode (see few_shot for details).

budget dict[str, float] | None

Budget limit (infinite for unspecified metrics).

cache_dir str | None

Subdirectory of the global cache directory to use for caching, or None to disable caching.

cache_mode CacheMode

Cache mode to use.

cache_format CacheFormat

Cache format to use.

Source code in src/delphyne/stdlib/commands/answer_query.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@dataclass(kw_only=True)
class AnswerQueryArgs:
    """
    Arguments for the `answer_query` command.

    Attributes:
        query: The name of the query to answer.
        args: Arguments for the query, as a dictionary of JSON values.
        prompt_only: If `True`, a dummy model is used that always
            errors, so that only the prompt can be seen in the logs.
        model: The name of the model to use for answering the query.
        num_answers: The number of answers to generate.
        iterative_mode: Whether to answer the query in iterative mode
            (see `few_shot` for details).
        budget: Budget limit (infinite for unspecified metrics).
        cache_dir: Subdirectory of the global cache directory to use for
            caching, or `None` to disable caching.
        cache_mode: Cache mode to use.
        cache_format: Cache format to use.
    """

    query: str
    args: dict[str, object]
    prompt_only: bool
    model: str | None = None
    num_answers: int = 1
    iterative_mode: bool = False
    budget: dict[str, float] | None = None
    cache_dir: str | None = None
    cache_mode: ca.CacheMode = "read_write"
    cache_format: CacheFormat = "yaml"