Skip to content

Policy Environments

Main Class

PolicyEnv

The global environment accessible to policies.

It can be used for:

  • Fetching prompts, data, and examples.
  • Caching LLM requests.
  • Tracing nodes, query, answers, and logging information.

Attributes:

Name Type Description
cache

The (optional) request cache.

data_manager

The data manager.

templates

The prompt templates manager.

tracer

The tracer, which can also be used for logging.

examples

The example database.

log_long_computations

see constructor.

Source code in src/delphyne/stdlib/environments.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
class PolicyEnv:
    """
    The global environment accessible to policies.

    It can be used for:

    - Fetching prompts, data, and examples.
    - Caching LLM requests.
    - Tracing nodes, query, answers, and logging information.

    Attributes:
        cache: The (optional) request cache.
        data_manager: The data manager.
        templates: The prompt templates manager.
        tracer: The tracer, which can also be used for logging.
        examples: The example database.
        log_long_computations: see constructor.
    """

    def __init__(
        self,
        *,
        prompt_dirs: Sequence[Path] = (),
        demonstration_files: Sequence[Path] = (),
        data_dirs: Sequence[Path] = (),
        cache: md.LLMCache | None = None,
        override_answers: dp.AnswerDatabase | None = None,
        log_level: dp.LogLevel = "info",
        log_long_computations: tuple[dp.LogLevel, float] | None = None,
        do_not_match_identical_queries: bool = False,
    ):
        """
        Args:
            prompt_dirs: A sequence of directories where Jinja prompt
                templates can be found.
            demonstration_files: A sequence of paths to demonstration
                files (with or without extension `.demo.yaml`), to
                create an example database from.
            data_dirs: A sequence of directories where data files can be
                found.
            cache: A request cache, or `None` to disable caching.
            override_answers: If provided, a database of answers that
                must be used to override LLM calls whenever possible.
                Individual prompting policies such as `few_shot` are
                responsible for consulting this global database using
                the `overriden_answer` method.
            log_level: The minimum log level to record. Messages with a
                lower level will be ignored.
            log_long_computations: if set, log computations taking more
                than the given number of seconds at the given severity
                level. This settings can be locally overriden by
                `elim_compute`.
            do_not_match_identical_queries: See `ExampleDatabase`.
        """
        self.data_manager = DataManager(data_dirs)
        self.templates = TemplatesManager(prompt_dirs, self.data_manager)
        self.examples = ExampleDatabase(do_not_match_identical_queries)
        self.tracer = dp.Tracer(log_level=log_level)
        self.log_long_computations = log_long_computations
        self.cache = cache
        self.override_answers = override_answers
        for path in demonstration_files:
            for demo in loaders.load_demo_file(path):
                self.examples.add_demonstration(demo)
        self._hindsight_feedback: HindsightFeedbackDict = {}

    def add_hindsight_feedback(
        self, node_id: int, feedback: HindsightFeedback
    ) -> None:
        with self.tracer.lock:
            self._hindsight_feedback[node_id] = feedback

    def get_hindsight_feedback(self) -> HindsightFeedbackDict:
        with self.tracer.lock:
            return self._hindsight_feedback.copy()

    def overriden_answer(
        self, query: dp.AbstractQuery[Any]
    ) -> dp.Answer | None:
        """
        Attempt to fetch an answer from the override database and return
        it if it exists, while logging the event.
        """

        if self.override_answers is None:
            return None
        serialized_query = dp.SerializedQuery.make(query)
        ret = self.override_answers.fetch(serialized_query)
        if ret is not None:
            meta = {
                "source": ad.pp_located_answer_source(ret.source),
                "query_name": query.query_name(),
                "query_args": query.serialize_args(),
                "answer": ret.answer,
            }
            self.info("llm_override", meta)
            return ret.answer

    def log(
        self,
        level: dp.LogLevel,
        message: str,
        metadata: object | None = None,
        *,
        loc: dp.Tree[Any, Any, Any] | dp.AttachedQuery[Any] | None = None,
    ) -> None:
        """
        Log a message.

        Arguments:
            level: The severity level of the message.
            message: The message to log.
            metadata: Additional metadata to log, as a dictionary of JSON
                values.
            loc: Tree or attached query that the message is about, if
                relevant.
        """

        match loc:
            case None:
                location = None
            case dp.Tree():
                location = dp.Location(loc.ref, None)
            case dp.AttachedQuery(_, ref):
                location = dp.Location(ref[0], ref[1])
        self.tracer.log(level, message, metadata, location)

    def trace(
        self,
        message: str,
        metadata: object | None = None,
        *,
        loc: dp.Tree[Any, Any, Any] | dp.AttachedQuery[Any] | None = None,
    ) -> None:
        """
        Log a message with "trace" severity level.

        See `log` method.
        """
        self.log("trace", message, metadata, loc=loc)

    def debug(
        self,
        message: str,
        metadata: object | None = None,
        *,
        loc: dp.Tree[Any, Any, Any] | dp.AttachedQuery[Any] | None = None,
    ) -> None:
        """
        Log a message with "debug" severity level.

        See `log` method.
        """
        self.log("debug", message, metadata, loc=loc)

    def info(
        self,
        message: str,
        metadata: object | None = None,
        *,
        loc: dp.Tree[Any, Any, Any] | dp.AttachedQuery[Any] | None = None,
    ) -> None:
        """
        Log a message with "info" severity level.

        See `log` method.
        """
        self.log("info", message, metadata, loc=loc)

    def warn(
        self,
        message: str,
        metadata: object | None = None,
        *,
        loc: dp.Tree[Any, Any, Any] | dp.AttachedQuery[Any] | None = None,
    ) -> None:
        """
        Log a message with "warn" severity level.

        See `log` method.
        """
        self.log("warn", message, metadata, loc=loc)

    def error(
        self,
        message: str,
        metadata: object | None = None,
        *,
        loc: dp.Tree[Any, Any, Any] | dp.AttachedQuery[Any] | None = None,
    ) -> None:
        """
        Log a message with "error" severity level.

        See `log` method.
        """
        self.log("error", message, metadata, loc=loc)

__init__

__init__(
    *,
    prompt_dirs: Sequence[Path] = (),
    demonstration_files: Sequence[Path] = (),
    data_dirs: Sequence[Path] = (),
    cache: LLMCache | None = None,
    override_answers: AnswerDatabase | None = None,
    log_level: LogLevel = "info",
    log_long_computations: tuple[LogLevel, float] | None = None,
    do_not_match_identical_queries: bool = False,
)

Parameters:

Name Type Description Default
prompt_dirs Sequence[Path]

A sequence of directories where Jinja prompt templates can be found.

()
demonstration_files Sequence[Path]

A sequence of paths to demonstration files (with or without extension .demo.yaml), to create an example database from.

()
data_dirs Sequence[Path]

A sequence of directories where data files can be found.

()
cache LLMCache | None

A request cache, or None to disable caching.

None
override_answers AnswerDatabase | None

If provided, a database of answers that must be used to override LLM calls whenever possible. Individual prompting policies such as few_shot are responsible for consulting this global database using the overriden_answer method.

None
log_level LogLevel

The minimum log level to record. Messages with a lower level will be ignored.

'info'
log_long_computations tuple[LogLevel, float] | None

if set, log computations taking more than the given number of seconds at the given severity level. This settings can be locally overriden by elim_compute.

None
do_not_match_identical_queries bool False
Source code in src/delphyne/stdlib/environments.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
def __init__(
    self,
    *,
    prompt_dirs: Sequence[Path] = (),
    demonstration_files: Sequence[Path] = (),
    data_dirs: Sequence[Path] = (),
    cache: md.LLMCache | None = None,
    override_answers: dp.AnswerDatabase | None = None,
    log_level: dp.LogLevel = "info",
    log_long_computations: tuple[dp.LogLevel, float] | None = None,
    do_not_match_identical_queries: bool = False,
):
    """
    Args:
        prompt_dirs: A sequence of directories where Jinja prompt
            templates can be found.
        demonstration_files: A sequence of paths to demonstration
            files (with or without extension `.demo.yaml`), to
            create an example database from.
        data_dirs: A sequence of directories where data files can be
            found.
        cache: A request cache, or `None` to disable caching.
        override_answers: If provided, a database of answers that
            must be used to override LLM calls whenever possible.
            Individual prompting policies such as `few_shot` are
            responsible for consulting this global database using
            the `overriden_answer` method.
        log_level: The minimum log level to record. Messages with a
            lower level will be ignored.
        log_long_computations: if set, log computations taking more
            than the given number of seconds at the given severity
            level. This settings can be locally overriden by
            `elim_compute`.
        do_not_match_identical_queries: See `ExampleDatabase`.
    """
    self.data_manager = DataManager(data_dirs)
    self.templates = TemplatesManager(prompt_dirs, self.data_manager)
    self.examples = ExampleDatabase(do_not_match_identical_queries)
    self.tracer = dp.Tracer(log_level=log_level)
    self.log_long_computations = log_long_computations
    self.cache = cache
    self.override_answers = override_answers
    for path in demonstration_files:
        for demo in loaders.load_demo_file(path):
            self.examples.add_demonstration(demo)
    self._hindsight_feedback: HindsightFeedbackDict = {}

overriden_answer

overriden_answer(query: AbstractQuery[Any]) -> Answer | None

Attempt to fetch an answer from the override database and return it if it exists, while logging the event.

Source code in src/delphyne/stdlib/environments.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
def overriden_answer(
    self, query: dp.AbstractQuery[Any]
) -> dp.Answer | None:
    """
    Attempt to fetch an answer from the override database and return
    it if it exists, while logging the event.
    """

    if self.override_answers is None:
        return None
    serialized_query = dp.SerializedQuery.make(query)
    ret = self.override_answers.fetch(serialized_query)
    if ret is not None:
        meta = {
            "source": ad.pp_located_answer_source(ret.source),
            "query_name": query.query_name(),
            "query_args": query.serialize_args(),
            "answer": ret.answer,
        }
        self.info("llm_override", meta)
        return ret.answer

log

log(
    level: LogLevel,
    message: str,
    metadata: object | None = None,
    *,
    loc: Tree[Any, Any, Any] | AttachedQuery[Any] | None = None,
) -> None

Log a message.

Parameters:

Name Type Description Default
level LogLevel

The severity level of the message.

required
message str

The message to log.

required
metadata object | None

Additional metadata to log, as a dictionary of JSON values.

None
loc Tree[Any, Any, Any] | AttachedQuery[Any] | None

Tree or attached query that the message is about, if relevant.

None
Source code in src/delphyne/stdlib/environments.py
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
def log(
    self,
    level: dp.LogLevel,
    message: str,
    metadata: object | None = None,
    *,
    loc: dp.Tree[Any, Any, Any] | dp.AttachedQuery[Any] | None = None,
) -> None:
    """
    Log a message.

    Arguments:
        level: The severity level of the message.
        message: The message to log.
        metadata: Additional metadata to log, as a dictionary of JSON
            values.
        loc: Tree or attached query that the message is about, if
            relevant.
    """

    match loc:
        case None:
            location = None
        case dp.Tree():
            location = dp.Location(loc.ref, None)
        case dp.AttachedQuery(_, ref):
            location = dp.Location(ref[0], ref[1])
    self.tracer.log(level, message, metadata, location)

trace

trace(
    message: str,
    metadata: object | None = None,
    *,
    loc: Tree[Any, Any, Any] | AttachedQuery[Any] | None = None,
) -> None

Log a message with "trace" severity level.

See log method.

Source code in src/delphyne/stdlib/environments.py
454
455
456
457
458
459
460
461
462
463
464
465
466
def trace(
    self,
    message: str,
    metadata: object | None = None,
    *,
    loc: dp.Tree[Any, Any, Any] | dp.AttachedQuery[Any] | None = None,
) -> None:
    """
    Log a message with "trace" severity level.

    See `log` method.
    """
    self.log("trace", message, metadata, loc=loc)

debug

debug(
    message: str,
    metadata: object | None = None,
    *,
    loc: Tree[Any, Any, Any] | AttachedQuery[Any] | None = None,
) -> None

Log a message with "debug" severity level.

See log method.

Source code in src/delphyne/stdlib/environments.py
468
469
470
471
472
473
474
475
476
477
478
479
480
def debug(
    self,
    message: str,
    metadata: object | None = None,
    *,
    loc: dp.Tree[Any, Any, Any] | dp.AttachedQuery[Any] | None = None,
) -> None:
    """
    Log a message with "debug" severity level.

    See `log` method.
    """
    self.log("debug", message, metadata, loc=loc)

info

info(
    message: str,
    metadata: object | None = None,
    *,
    loc: Tree[Any, Any, Any] | AttachedQuery[Any] | None = None,
) -> None

Log a message with "info" severity level.

See log method.

Source code in src/delphyne/stdlib/environments.py
482
483
484
485
486
487
488
489
490
491
492
493
494
def info(
    self,
    message: str,
    metadata: object | None = None,
    *,
    loc: dp.Tree[Any, Any, Any] | dp.AttachedQuery[Any] | None = None,
) -> None:
    """
    Log a message with "info" severity level.

    See `log` method.
    """
    self.log("info", message, metadata, loc=loc)

warn

warn(
    message: str,
    metadata: object | None = None,
    *,
    loc: Tree[Any, Any, Any] | AttachedQuery[Any] | None = None,
) -> None

Log a message with "warn" severity level.

See log method.

Source code in src/delphyne/stdlib/environments.py
496
497
498
499
500
501
502
503
504
505
506
507
508
def warn(
    self,
    message: str,
    metadata: object | None = None,
    *,
    loc: dp.Tree[Any, Any, Any] | dp.AttachedQuery[Any] | None = None,
) -> None:
    """
    Log a message with "warn" severity level.

    See `log` method.
    """
    self.log("warn", message, metadata, loc=loc)

error

error(
    message: str,
    metadata: object | None = None,
    *,
    loc: Tree[Any, Any, Any] | AttachedQuery[Any] | None = None,
) -> None

Log a message with "error" severity level.

See log method.

Source code in src/delphyne/stdlib/environments.py
510
511
512
513
514
515
516
517
518
519
520
521
522
def error(
    self,
    message: str,
    metadata: object | None = None,
    *,
    loc: dp.Tree[Any, Any, Any] | dp.AttachedQuery[Any] | None = None,
) -> None:
    """
    Log a message with "error" severity level.

    See `log` method.
    """
    self.log("error", message, metadata, loc=loc)

InvalidDemoFile dataclass

Bases: Exception

Exception raised when a demonstration file could not be parsed.

Source code in src/delphyne/stdlib/answer_loaders.py
118
119
120
121
122
123
124
125
@dataclass
class InvalidDemoFile(Exception):
    """
    Exception raised when a demonstration file could not be parsed.
    """

    file: Path
    exn: Exception

Example Database

Example dataclass

An example, usable for few-shot prompting.

Attributes:

Name Type Description
query SerializedQuery

The corresponding serialized query.

answer Answer

The answer to the query.

tags Sequence[str]

A sequence of tags associated with the example, which policies can use to select appropriate examples.

Source code in src/delphyne/stdlib/environments.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
@dataclass(kw_only=True)
class Example:
    """
    An example, usable for few-shot prompting.

    Attributes:
        query: The corresponding serialized query.
        answer: The answer to the query.
        tags: A sequence of tags associated with the example, which
            policies can use to select appropriate examples.
    """

    query: dp.SerializedQuery
    answer: dp.Answer
    tags: Sequence[str]

ExampleDatabase dataclass

A simple example database.

Attributes:

Name Type Description
do_not_match_identical_queries bool

If set to True, the examples method won't return examples that match identical queries (i.e., with the exact same arguments). This is useful in the context of writing demonstrations, where one may want to see how an LLM would answer a query, even when a ground-truth answer is provided already.

Source code in src/delphyne/stdlib/environments.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
@dataclass
class ExampleDatabase:
    """
    A simple example database.

    Attributes:
        do_not_match_identical_queries: If set to `True`, the `examples`
            method won't return examples that match identical queries
            (i.e., with the exact same arguments). This is useful in the
            context of writing demonstrations, where one may want to see
            how an LLM would answer a query, even when a ground-truth
            answer is provided already.
    """

    # TODO: add provenance info for better error messages.

    do_not_match_identical_queries: bool = False

    # Maps each query name to a list of
    _examples: dict[_QueryName, list[Example]] = field(
        default_factory=lambda: defaultdict(list)
    )

    def add_query_demonstration(self, demo: dp.QueryDemo):
        """
        Add all examples from a standalone query demonstration to the
        database.
        """
        if not demo.answers:
            return
        if (ex := demo.answers[0].example) is not None and not ex:
            # If the user explicitly asked not to include the example.
            # TODO: What if the user asked to include several answers?
            # Right now, we only allow the first one to be added.
            return
        demo_answer = demo.answers[0]
        answer = dm.translate_answer(demo_answer)
        serialized = dp.SerializedQuery.from_json(demo.query, demo.args)
        example = Example(
            query=serialized, answer=answer, tags=demo_answer.tags
        )
        self._examples[demo.query].append(example)

    def add_demonstration(self, demo: dp.Demo):
        """
        Add all examples from a demonstration to the database.
        """
        if isinstance(demo, dp.QueryDemo):
            self.add_query_demonstration(demo)
        else:
            assert isinstance(demo, dp.StrategyDemo)
            for q in demo.queries:
                self.add_query_demonstration(q)

    def examples(self, query: dp.SerializedQuery) -> Iterable[Example]:
        """
        Obtain all potential examples that can be used for few-shot
        prompting with a given query.
        """
        for ex in self._examples[query.name]:
            if self.do_not_match_identical_queries:
                if ex.query == query:
                    continue
            yield ex

add_query_demonstration

add_query_demonstration(demo: QueryDemo)

Add all examples from a standalone query demonstration to the database.

Source code in src/delphyne/stdlib/environments.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def add_query_demonstration(self, demo: dp.QueryDemo):
    """
    Add all examples from a standalone query demonstration to the
    database.
    """
    if not demo.answers:
        return
    if (ex := demo.answers[0].example) is not None and not ex:
        # If the user explicitly asked not to include the example.
        # TODO: What if the user asked to include several answers?
        # Right now, we only allow the first one to be added.
        return
    demo_answer = demo.answers[0]
    answer = dm.translate_answer(demo_answer)
    serialized = dp.SerializedQuery.from_json(demo.query, demo.args)
    example = Example(
        query=serialized, answer=answer, tags=demo_answer.tags
    )
    self._examples[demo.query].append(example)

add_demonstration

add_demonstration(demo: Demo)

Add all examples from a demonstration to the database.

Source code in src/delphyne/stdlib/environments.py
154
155
156
157
158
159
160
161
162
163
def add_demonstration(self, demo: dp.Demo):
    """
    Add all examples from a demonstration to the database.
    """
    if isinstance(demo, dp.QueryDemo):
        self.add_query_demonstration(demo)
    else:
        assert isinstance(demo, dp.StrategyDemo)
        for q in demo.queries:
            self.add_query_demonstration(q)

examples

examples(query: SerializedQuery) -> Iterable[Example]

Obtain all potential examples that can be used for few-shot prompting with a given query.

Source code in src/delphyne/stdlib/environments.py
165
166
167
168
169
170
171
172
173
174
def examples(self, query: dp.SerializedQuery) -> Iterable[Example]:
    """
    Obtain all potential examples that can be used for few-shot
    prompting with a given query.
    """
    for ex in self._examples[query.name]:
        if self.do_not_match_identical_queries:
            if ex.query == query:
                continue
        yield ex

SerializedQuery dataclass

A hashable representation of a query.

Attributes:

Name Type Description
name str

The name of the query.

args str

The serialized arguments of the query, as a canonical JSON string. Object keys are sorted so that equality is defined modulo key order.

Source code in src/delphyne/core/answer_databases.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
@dataclass(frozen=True)
class SerializedQuery:
    """
    A hashable representation of a query.

    Attributes:
        name: The name of the query.
        args: The serialized arguments of the query, as a canonical JSON
            string. Object keys are sorted so that equality is defined
            modulo key order.
    """

    name: str
    args_str: str

    @staticmethod
    def _dump_json(obj: Any) -> str:
        return json.dumps(obj, sort_keys=True)

    @staticmethod
    def make(query: AbstractQuery[Any]) -> "SerializedQuery":
        args = SerializedQuery._dump_json(query.serialize_args())
        return SerializedQuery(query.query_name(), args)

    @staticmethod
    def from_json(name: str, args: dict[str, Any]) -> "SerializedQuery":
        return SerializedQuery(name, SerializedQuery._dump_json(args))

    @property
    def args_dict(self):
        return json.loads(self.args_str)

    def parse[T: AbstractQuery[Any]](self, type: type[T]) -> T:
        return type.parse_instance(self.args_dict)

Data and Templates Managers

DataManager

Utility class for loading and accessing external data.

Attributes:

Name Type Description
data

A dictionary containing all loaded data files. Each file corresponds to a key in the dictionary (stripped of the extension).

Source code in src/delphyne/stdlib/environments.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class DataManager:
    """
    Utility class for loading and accessing external data.

    Attributes:
        data: A dictionary containing all loaded data files. Each file
            corresponds to a key in the dictionary (stripped of the
            extension).
    """

    def __init__(self, data_dirs: Sequence[Path]):
        """
        Find all files with extension `*.data.yaml` in the `data_dirs`,
        parse them and save everything in a big dict. If two files have
        the same name, raise an error.
        """
        self.data = _load_data(data_dirs)

__init__

__init__(data_dirs: Sequence[Path])

Find all files with extension *.data.yaml in the data_dirs, parse them and save everything in a big dict. If two files have the same name, raise an error.

Source code in src/delphyne/stdlib/environments.py
39
40
41
42
43
44
45
def __init__(self, data_dirs: Sequence[Path]):
    """
    Find all files with extension `*.data.yaml` in the `data_dirs`,
    parse them and save everything in a big dict. If two files have
    the same name, raise an error.
    """
    self.data = _load_data(data_dirs)

TemplatesManager

Bases: AbstractTemplatesManager

A class for managing Jinja prompt templates.

Templates are configured with the trim_blocks and lstrip_blocks options set to True (no newlines are inserted after blocks and indentation can be used within blocks without affecting the output). The keep_trailing_newline option is set to False so trailing new lines at the end of template files are ignored.

Templates are first searched in the provided prompt folders and then in the standard library (delphyne.stdlib.templates). For example, to show standard formatting instructions, you can include the following in your instance prompts:

{% include 'stdlib/format.jinja' %}

All templates automatically have access to the following global objects:

  • A yaml filter for converting an object into a YAML string.
  • A json filter for converting an object into a JSON string.
  • A fail function that takes an error message as an argument and raises an exception on Python side.
  • A data dictionary containing all loaded data files.
Source code in src/delphyne/stdlib/environments.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
class TemplatesManager(dp.AbstractTemplatesManager):
    """
    A class for managing Jinja prompt templates.

    Templates are configured with the `trim_blocks` and `lstrip_blocks`
    options set to `True` (no newlines are inserted after blocks and
    indentation can be used within blocks without affecting the output).
    The `keep_trailing_newline` option is set to `False` so trailing new
    lines at the end of template files are ignored.

    Templates are first searched in the provided prompt folders and then
    in the standard library (`delphyne.stdlib.templates`). For example,
    to show standard formatting instructions, you can include the
    following in your instance prompts:

    ```jinja
    {% include 'stdlib/format.jinja' %}
    ```

    All templates automatically have access to the following global
    objects:

    - A `yaml` filter for converting an object into a YAML string.
    - A `json` filter for converting an object into a JSON string.
    - A `fail` function that takes an error message as an argument and
      raises an exception on Python side.
    - A `data` dictionary containing all loaded data files.
    """

    def __init__(self, prompt_dirs: Sequence[Path], data_manager: DataManager):
        """
        Args:
            prompt_dirs: A sequence of directories where Jinja prompt
                templates can be found.
            data_manager: A sequence of directories where data files can be
                found.
        """
        self.prompt_folders = prompt_dirs
        self.data_manager = data_manager
        loader = jinja2.ChoiceLoader(
            [
                jinja2.FileSystemLoader(self.prompt_folders),
                jinja2.PackageLoader("delphyne.stdlib"),
            ]
        )
        self.env = jinja2.Environment(
            loader=loader,
            trim_blocks=True,
            lstrip_blocks=True,
            keep_trailing_newline=False,
        )
        self.env.filters["yaml"] = dump_yaml_object
        self.env.filters["json"] = _dump_json_object
        self.env.globals["fail"] = _fail_from_template  # type: ignore

    @override
    def prompt(
        self,
        *,
        query_name: str,
        prompt_kind: Literal["system", "instance"] | str,
        template_args: dict[str, Any],
        default_template: str | None = None,
    ) -> str:
        suffix = "." + prompt_kind
        template_name = f"{query_name}{suffix}{JINJA_EXTENSION}"
        try:
            template = self.env.get_template(template_name)
        except jinja2.TemplateNotFound:
            if default_template is not None:
                template = self.env.from_string(default_template)
            else:
                raise dp.TemplateFileMissing(template_name)
        try:
            assert "data" not in template_args
            template_args |= {"data": self.data_manager.data}
            return template.render(template_args)
        except jinja2.TemplateNotFound as e:
            raise dp.TemplateError(template_name, e)
        except jinja2.UndefinedError as e:
            raise dp.TemplateError(template_name, e)
        except jinja2.TemplateSyntaxError as e:
            raise dp.TemplateError(template_name, e)

__init__

__init__(prompt_dirs: Sequence[Path], data_manager: DataManager)

Parameters:

Name Type Description Default
prompt_dirs Sequence[Path]

A sequence of directories where Jinja prompt templates can be found.

required
data_manager DataManager

A sequence of directories where data files can be found.

required
Source code in src/delphyne/stdlib/environments.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def __init__(self, prompt_dirs: Sequence[Path], data_manager: DataManager):
    """
    Args:
        prompt_dirs: A sequence of directories where Jinja prompt
            templates can be found.
        data_manager: A sequence of directories where data files can be
            found.
    """
    self.prompt_folders = prompt_dirs
    self.data_manager = data_manager
    loader = jinja2.ChoiceLoader(
        [
            jinja2.FileSystemLoader(self.prompt_folders),
            jinja2.PackageLoader("delphyne.stdlib"),
        ]
    )
    self.env = jinja2.Environment(
        loader=loader,
        trim_blocks=True,
        lstrip_blocks=True,
        keep_trailing_newline=False,
    )
    self.env.filters["yaml"] = dump_yaml_object
    self.env.filters["json"] = _dump_json_object
    self.env.globals["fail"] = _fail_from_template  # type: ignore

TemplateFileMissing dataclass

Bases: Exception

Exception raised when a template file is missing.

This exception should only be raised when a top-level template file is missing. If an include statement fails within a template, a TemplateError exception should be raised instead.

Source code in src/delphyne/core/queries.py
144
145
146
147
148
149
150
151
152
153
154
@dataclass
class TemplateFileMissing(Exception):
    """
    Exception raised when a template file is missing.

    This exception should only be raised when a top-level template file
    is missing. If an `include` statement fails within a template, a
    `TemplateError` exception should be raised instead.
    """

    file: str

Tracer

Tracer

A mutable trace along with a mutable list of log messages.

Both components are protected by a lock to ensure thread-safety (some policies spawn multiple concurrent threads).

Attributes:

Name Type Description
trace

A mutable trace.

messages list[LogMessage]

A mutable list of log messages.

lock

A reentrant lock protecting access to the trace and log. The lock is publicly exposed so that threads can log several successive messages without other threads interleaving new messages in between.

Source code in src/delphyne/core/traces.py
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
class Tracer:
    """
    A mutable trace along with a mutable list of log messages.

    Both components are protected by a lock to ensure thread-safety
    (some policies spawn multiple concurrent threads).

    Attributes:
        trace: A mutable trace.
        messages: A mutable list of log messages.
        lock: A reentrant lock protecting access to the trace and log.
            The lock is publicly exposed so that threads can log several
            successive messages without other threads interleaving new
            messages in between.
    """

    # TODO: there are cleaner ways to achieve good message order beyong
    # exposing the lock.

    def __init__(self, log_level: LogLevel = "info"):
        """
        Parameters:
            log_level: The minimum severity level of messages to log.
        """
        self.trace = Trace()
        self.messages: list[LogMessage] = []
        self.log_level: LogLevel = log_level

        # Different threads may be logging information or appending to
        # the trace in parallel.
        self.lock = threading.RLock()

    def global_node_id(self, node: refs.GlobalNodePath) -> refs.NodeId:
        """
        Ensure that a node at a given reference is present in the trace
        and return the corresponding node identififier.
        """
        with self.lock:
            return self.trace.convert_global_node_path(node)

    def trace_node(self, node: refs.GlobalNodePath) -> None:
        """
        Ensure that a node at a given reference is present in the trace.

        Returns the associated node identifier.

        See `tracer_hook` for registering a hook that automatically
        calls this method on all encountered nodes.
        """
        self.global_node_id(node)

    def trace_query(self, query: AttachedQuery[Any]) -> None:
        """
        Ensure that a query at a given reference is present in the
        trace, even if no answer is provided for it.
        """
        serialized = (query.query.query_name(), query.query.serialize_args())
        with self.lock:
            self.trace.register_query(query.ref, serialized)

    def trace_answer(
        self, space: refs.GlobalSpacePath, answer: refs.Answer
    ) -> None:
        """
        Ensure that a given query answer is present in the trace, even
        it is is not used to reach a node.
        """
        with self.lock:
            self.trace.convert_answer_ref((space, answer))

    def log(
        self,
        level: LogLevel,
        message: str,
        metadata: object | None = None,
        location: Location | None = None,
    ):
        """
        Log a message, with optional metadata and location information.
        The metadata must be exportable to JSON using Pydantic.
        """
        if not log_level_greater_or_equal(level, self.log_level):
            return
        time = datetime.now()
        with self.lock:
            short_location = None
            if location is not None:
                short_location = self.trace.convert_location(location)
            self.messages.append(
                LogMessage(
                    message=message,
                    level=level,
                    time=time,
                    metadata=metadata,
                    location=short_location,
                )
            )

    def export_log(
        self, *, remove_timing_info: bool = False
    ) -> Iterable[ExportableLogMessage]:
        """
        Export the log into an easily serializable format.
        """
        with self.lock:
            for m in self.messages:
                node = None
                space = None
                if (loc := m.location) is not None:
                    node = loc.node.id
                    if loc.space is not None:
                        space = pprint.space_ref(loc.space)
                yield ExportableLogMessage(
                    message=m.message,
                    level=m.level,
                    time=m.time if not remove_timing_info else None,
                    node=node,
                    space=space,
                    metadata=pydantic_dump(object, m.metadata),
                )

    def export_trace(self) -> ExportableTrace:
        """
        Export the trace into an easily serializable format.
        """
        with self.lock:
            return self.trace.export()

__init__

__init__(log_level: LogLevel = 'info')

Parameters:

Name Type Description Default
log_level LogLevel

The minimum severity level of messages to log.

'info'
Source code in src/delphyne/core/traces.py
586
587
588
589
590
591
592
593
594
595
596
597
def __init__(self, log_level: LogLevel = "info"):
    """
    Parameters:
        log_level: The minimum severity level of messages to log.
    """
    self.trace = Trace()
    self.messages: list[LogMessage] = []
    self.log_level: LogLevel = log_level

    # Different threads may be logging information or appending to
    # the trace in parallel.
    self.lock = threading.RLock()

global_node_id

global_node_id(node: GlobalNodePath) -> NodeId

Ensure that a node at a given reference is present in the trace and return the corresponding node identififier.

Source code in src/delphyne/core/traces.py
599
600
601
602
603
604
605
def global_node_id(self, node: refs.GlobalNodePath) -> refs.NodeId:
    """
    Ensure that a node at a given reference is present in the trace
    and return the corresponding node identififier.
    """
    with self.lock:
        return self.trace.convert_global_node_path(node)

trace_node

trace_node(node: GlobalNodePath) -> None

Ensure that a node at a given reference is present in the trace.

Returns the associated node identifier.

See tracer_hook for registering a hook that automatically calls this method on all encountered nodes.

Source code in src/delphyne/core/traces.py
607
608
609
610
611
612
613
614
615
616
def trace_node(self, node: refs.GlobalNodePath) -> None:
    """
    Ensure that a node at a given reference is present in the trace.

    Returns the associated node identifier.

    See `tracer_hook` for registering a hook that automatically
    calls this method on all encountered nodes.
    """
    self.global_node_id(node)

trace_query

trace_query(query: AttachedQuery[Any]) -> None

Ensure that a query at a given reference is present in the trace, even if no answer is provided for it.

Source code in src/delphyne/core/traces.py
618
619
620
621
622
623
624
625
def trace_query(self, query: AttachedQuery[Any]) -> None:
    """
    Ensure that a query at a given reference is present in the
    trace, even if no answer is provided for it.
    """
    serialized = (query.query.query_name(), query.query.serialize_args())
    with self.lock:
        self.trace.register_query(query.ref, serialized)

trace_answer

trace_answer(space: GlobalSpacePath, answer: Answer) -> None

Ensure that a given query answer is present in the trace, even it is is not used to reach a node.

Source code in src/delphyne/core/traces.py
627
628
629
630
631
632
633
634
635
def trace_answer(
    self, space: refs.GlobalSpacePath, answer: refs.Answer
) -> None:
    """
    Ensure that a given query answer is present in the trace, even
    it is is not used to reach a node.
    """
    with self.lock:
        self.trace.convert_answer_ref((space, answer))

log

log(
    level: LogLevel,
    message: str,
    metadata: object | None = None,
    location: Location | None = None,
)

Log a message, with optional metadata and location information. The metadata must be exportable to JSON using Pydantic.

Source code in src/delphyne/core/traces.py
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
def log(
    self,
    level: LogLevel,
    message: str,
    metadata: object | None = None,
    location: Location | None = None,
):
    """
    Log a message, with optional metadata and location information.
    The metadata must be exportable to JSON using Pydantic.
    """
    if not log_level_greater_or_equal(level, self.log_level):
        return
    time = datetime.now()
    with self.lock:
        short_location = None
        if location is not None:
            short_location = self.trace.convert_location(location)
        self.messages.append(
            LogMessage(
                message=message,
                level=level,
                time=time,
                metadata=metadata,
                location=short_location,
            )
        )

export_log

export_log(*, remove_timing_info: bool = False) -> Iterable[ExportableLogMessage]

Export the log into an easily serializable format.

Source code in src/delphyne/core/traces.py
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
def export_log(
    self, *, remove_timing_info: bool = False
) -> Iterable[ExportableLogMessage]:
    """
    Export the log into an easily serializable format.
    """
    with self.lock:
        for m in self.messages:
            node = None
            space = None
            if (loc := m.location) is not None:
                node = loc.node.id
                if loc.space is not None:
                    space = pprint.space_ref(loc.space)
            yield ExportableLogMessage(
                message=m.message,
                level=m.level,
                time=m.time if not remove_timing_info else None,
                node=node,
                space=space,
                metadata=pydantic_dump(object, m.metadata),
            )

export_trace

export_trace() -> ExportableTrace

Export the trace into an easily serializable format.

Source code in src/delphyne/core/traces.py
688
689
690
691
692
693
def export_trace(self) -> ExportableTrace:
    """
    Export the trace into an easily serializable format.
    """
    with self.lock:
        return self.trace.export()