Skip to content

Policy Environments

Main Class

PolicyEnv

The global environment accessible to policies.

It can be used for:

  • Fetching prompts, data, and examples.
  • Caching LLM requests.
  • Tracing nodes, query, answers, and logging information.

Attributes:

Name Type Description
cache

The (optional) request cache.

data_manager

The data manager.

templates

The prompt templates manager.

tracer

The tracer, which can also be used for logging.

examples

The example database.

log_long_computations

See constructor.

random

A random number generator.

Source code in src/delphyne/stdlib/environments.py
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
class PolicyEnv:
    """
    The global environment accessible to policies.

    It can be used for:

    - Fetching prompts, data, and examples.
    - Caching LLM requests.
    - Tracing nodes, query, answers, and logging information.

    Attributes:
        cache: The (optional) request cache.
        data_manager: The data manager.
        templates: The prompt templates manager.
        tracer: The tracer, which can also be used for logging.
        examples: The example database.
        log_long_computations: See constructor.
        random: A random number generator.
    """

    def __init__(
        self,
        *,
        object_loader: ObjectLoader | None = None,
        prompt_dirs: Sequence[Path] = (),
        demonstration_files: Sequence[Path] = (),
        data_dirs: Sequence[Path] = (),
        cache: md.LLMCache | None = None,
        embeddings_cache: em.EmbeddingsCache | None = None,
        global_embeddings_cache_file: Path | None = None,
        override_answers: dp.AnswerDatabase | None = None,
        log_level: dp.LogLevel = "info",
        log_long_computations: tuple[dp.LogLevel, float] | None = None,
        random_seed: int = 0,
    ):
        """
        Args:
            object_loader: An object loader. This is useful in
                particular for loading query objects from their
                serialized representation.
            prompt_dirs: A sequence of directories where Jinja prompt
                templates can be found.
            demonstration_files: A sequence of paths to demonstration
                files (with or without extension `.demo.yaml`), to
                create an example database from.
            data_dirs: A sequence of directories where data files can be
                found.
            cache: A request cache, or `None` to disable caching.
            embeddings_cache: An embeddings cache, or `None` to disable
                embeddings caching.
            global_embeddings_cache_file: Global cache file that stores
                common embeddings (e.g. embeddings of examples).
            override_answers: If provided, a database of answers that
                must be used to override LLM calls whenever possible.
                Individual prompting policies such as `few_shot` are
                responsible for consulting this global database using
                the `overriden_answer` method.
            log_level: The minimum log level to record. Messages with a
                lower level will be ignored.
            log_long_computations: if set, log computations taking more
                than the given number of seconds at the given severity
                level. This settings can be locally overriden by
                `elim_compute`.
            random_seed: The seed with which to initialize the random
                number generator.
        """
        self.data_manager = DataManager(data_dirs)
        self.templates = TemplatesManager(prompt_dirs, self.data_manager)
        self.examples = ExampleDatabase(
            global_embeddings_cache_file=global_embeddings_cache_file,
            templates_manager=self.templates,
            object_loader=object_loader,
        )
        self.object_loader = object_loader
        self.tracer = dp.Tracer(log_level=log_level)
        self.log_long_computations = log_long_computations
        self.cache = cache
        self.embeddings_cache = embeddings_cache
        self.override_answers = override_answers
        self.random = random.Random(random_seed)
        for path in demonstration_files:
            for demo in loaders.load_demo_file(path):
                self.examples.add_demonstration(demo)

    def overriden_answer(
        self, query: dp.AbstractQuery[Any]
    ) -> dp.Answer | None:
        """
        Attempt to fetch an answer from the override database and return
        it if it exists, while logging the event.
        """

        if self.override_answers is None:
            return None
        serialized_query = dp.SerializedQuery.make(query)
        ret = self.override_answers.fetch(serialized_query)
        if ret is not None:
            meta = {
                "source": ad.pp_located_answer_source(ret.source),
                "query_name": query.query_name(),
                "query_args": query.serialize_args(),
                "answer": ret.answer,
            }
            self.info("llm_override", meta)
            return ret.answer

    def log(
        self,
        level: dp.LogLevel,
        message: str,
        metadata: object | None = None,
        *,
        loc: dp.Tree[Any, Any, Any] | dp.AttachedQuery[Any] | None = None,
        related: Sequence[dp.LogMessageId | None] = (),
    ) -> dp.LogMessageId | None:
        """
        Log a message.

        Arguments:
            level: The severity level of the message.
            message: The message to log.
            metadata: Additional metadata to log, as a dictionary of JSON
                values.
            loc: Tree or attached query that the message is about, if
                relevant.
        """
        location = loc.ref if loc is not None else None
        return self.tracer.log(
            level, message, metadata, location=location, related=related
        )

    def trace(
        self,
        message: str,
        metadata: object | None = None,
        *,
        loc: dp.Tree[Any, Any, Any] | dp.AttachedQuery[Any] | None = None,
        related: Sequence[dp.LogMessageId | None] = (),
    ) -> dp.LogMessageId | None:
        """
        Log a message with "trace" severity level.

        See `log` method.
        """
        return self.log("trace", message, metadata, loc=loc, related=related)

    def debug(
        self,
        message: str,
        metadata: object | None = None,
        *,
        loc: dp.Tree[Any, Any, Any] | dp.AttachedQuery[Any] | None = None,
        related: Sequence[dp.LogMessageId | None] = (),
    ) -> dp.LogMessageId | None:
        """
        Log a message with "debug" severity level.

        See `log` method.
        """
        return self.log("debug", message, metadata, loc=loc, related=related)

    def info(
        self,
        message: str,
        metadata: object | None = None,
        *,
        loc: dp.Tree[Any, Any, Any] | dp.AttachedQuery[Any] | None = None,
        related: Sequence[dp.LogMessageId | None] = (),
    ) -> dp.LogMessageId | None:
        """
        Log a message with "info" severity level.

        See `log` method.
        """
        return self.log("info", message, metadata, loc=loc, related=related)

    def warn(
        self,
        message: str,
        metadata: object | None = None,
        *,
        loc: dp.Tree[Any, Any, Any] | dp.AttachedQuery[Any] | None = None,
        related: Sequence[dp.LogMessageId | None] = (),
    ) -> dp.LogMessageId | None:
        """
        Log a message with "warn" severity level.

        See `log` method.
        """
        return self.log("warn", message, metadata, loc=loc, related=related)

    def error(
        self,
        message: str,
        metadata: object | None = None,
        *,
        loc: dp.Tree[Any, Any, Any] | dp.AttachedQuery[Any] | None = None,
        related: Sequence[dp.LogMessageId | None] = (),
    ) -> dp.LogMessageId | None:
        """
        Log a message with "error" severity level.

        See `log` method.
        """
        return self.log("error", message, metadata, loc=loc, related=related)

__init__

__init__(
    *,
    object_loader: ObjectLoader | None = None,
    prompt_dirs: Sequence[Path] = (),
    demonstration_files: Sequence[Path] = (),
    data_dirs: Sequence[Path] = (),
    cache: LLMCache | None = None,
    embeddings_cache: EmbeddingsCache | None = None,
    global_embeddings_cache_file: Path | None = None,
    override_answers: AnswerDatabase | None = None,
    log_level: LogLevel = "info",
    log_long_computations: tuple[LogLevel, float] | None = None,
    random_seed: int = 0,
)

Parameters:

Name Type Description Default
object_loader ObjectLoader | None

An object loader. This is useful in particular for loading query objects from their serialized representation.

None
prompt_dirs Sequence[Path]

A sequence of directories where Jinja prompt templates can be found.

()
demonstration_files Sequence[Path]

A sequence of paths to demonstration files (with or without extension .demo.yaml), to create an example database from.

()
data_dirs Sequence[Path]

A sequence of directories where data files can be found.

()
cache LLMCache | None

A request cache, or None to disable caching.

None
embeddings_cache EmbeddingsCache | None

An embeddings cache, or None to disable embeddings caching.

None
global_embeddings_cache_file Path | None

Global cache file that stores common embeddings (e.g. embeddings of examples).

None
override_answers AnswerDatabase | None

If provided, a database of answers that must be used to override LLM calls whenever possible. Individual prompting policies such as few_shot are responsible for consulting this global database using the overriden_answer method.

None
log_level LogLevel

The minimum log level to record. Messages with a lower level will be ignored.

'info'
log_long_computations tuple[LogLevel, float] | None

if set, log computations taking more than the given number of seconds at the given severity level. This settings can be locally overriden by elim_compute.

None
random_seed int

The seed with which to initialize the random number generator.

0
Source code in src/delphyne/stdlib/environments.py
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
def __init__(
    self,
    *,
    object_loader: ObjectLoader | None = None,
    prompt_dirs: Sequence[Path] = (),
    demonstration_files: Sequence[Path] = (),
    data_dirs: Sequence[Path] = (),
    cache: md.LLMCache | None = None,
    embeddings_cache: em.EmbeddingsCache | None = None,
    global_embeddings_cache_file: Path | None = None,
    override_answers: dp.AnswerDatabase | None = None,
    log_level: dp.LogLevel = "info",
    log_long_computations: tuple[dp.LogLevel, float] | None = None,
    random_seed: int = 0,
):
    """
    Args:
        object_loader: An object loader. This is useful in
            particular for loading query objects from their
            serialized representation.
        prompt_dirs: A sequence of directories where Jinja prompt
            templates can be found.
        demonstration_files: A sequence of paths to demonstration
            files (with or without extension `.demo.yaml`), to
            create an example database from.
        data_dirs: A sequence of directories where data files can be
            found.
        cache: A request cache, or `None` to disable caching.
        embeddings_cache: An embeddings cache, or `None` to disable
            embeddings caching.
        global_embeddings_cache_file: Global cache file that stores
            common embeddings (e.g. embeddings of examples).
        override_answers: If provided, a database of answers that
            must be used to override LLM calls whenever possible.
            Individual prompting policies such as `few_shot` are
            responsible for consulting this global database using
            the `overriden_answer` method.
        log_level: The minimum log level to record. Messages with a
            lower level will be ignored.
        log_long_computations: if set, log computations taking more
            than the given number of seconds at the given severity
            level. This settings can be locally overriden by
            `elim_compute`.
        random_seed: The seed with which to initialize the random
            number generator.
    """
    self.data_manager = DataManager(data_dirs)
    self.templates = TemplatesManager(prompt_dirs, self.data_manager)
    self.examples = ExampleDatabase(
        global_embeddings_cache_file=global_embeddings_cache_file,
        templates_manager=self.templates,
        object_loader=object_loader,
    )
    self.object_loader = object_loader
    self.tracer = dp.Tracer(log_level=log_level)
    self.log_long_computations = log_long_computations
    self.cache = cache
    self.embeddings_cache = embeddings_cache
    self.override_answers = override_answers
    self.random = random.Random(random_seed)
    for path in demonstration_files:
        for demo in loaders.load_demo_file(path):
            self.examples.add_demonstration(demo)

overriden_answer

overriden_answer(query: AbstractQuery[Any]) -> Answer | None

Attempt to fetch an answer from the override database and return it if it exists, while logging the event.

Source code in src/delphyne/stdlib/environments.py
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
def overriden_answer(
    self, query: dp.AbstractQuery[Any]
) -> dp.Answer | None:
    """
    Attempt to fetch an answer from the override database and return
    it if it exists, while logging the event.
    """

    if self.override_answers is None:
        return None
    serialized_query = dp.SerializedQuery.make(query)
    ret = self.override_answers.fetch(serialized_query)
    if ret is not None:
        meta = {
            "source": ad.pp_located_answer_source(ret.source),
            "query_name": query.query_name(),
            "query_args": query.serialize_args(),
            "answer": ret.answer,
        }
        self.info("llm_override", meta)
        return ret.answer

log

log(
    level: LogLevel,
    message: str,
    metadata: object | None = None,
    *,
    loc: Tree[Any, Any, Any] | AttachedQuery[Any] | None = None,
    related: Sequence[LogMessageId | None] = (),
) -> LogMessageId | None

Log a message.

Parameters:

Name Type Description Default
level LogLevel

The severity level of the message.

required
message str

The message to log.

required
metadata object | None

Additional metadata to log, as a dictionary of JSON values.

None
loc Tree[Any, Any, Any] | AttachedQuery[Any] | None

Tree or attached query that the message is about, if relevant.

None
Source code in src/delphyne/stdlib/environments.py
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
def log(
    self,
    level: dp.LogLevel,
    message: str,
    metadata: object | None = None,
    *,
    loc: dp.Tree[Any, Any, Any] | dp.AttachedQuery[Any] | None = None,
    related: Sequence[dp.LogMessageId | None] = (),
) -> dp.LogMessageId | None:
    """
    Log a message.

    Arguments:
        level: The severity level of the message.
        message: The message to log.
        metadata: Additional metadata to log, as a dictionary of JSON
            values.
        loc: Tree or attached query that the message is about, if
            relevant.
    """
    location = loc.ref if loc is not None else None
    return self.tracer.log(
        level, message, metadata, location=location, related=related
    )

trace

trace(
    message: str,
    metadata: object | None = None,
    *,
    loc: Tree[Any, Any, Any] | AttachedQuery[Any] | None = None,
    related: Sequence[LogMessageId | None] = (),
) -> LogMessageId | None

Log a message with "trace" severity level.

See log method.

Source code in src/delphyne/stdlib/environments.py
643
644
645
646
647
648
649
650
651
652
653
654
655
656
def trace(
    self,
    message: str,
    metadata: object | None = None,
    *,
    loc: dp.Tree[Any, Any, Any] | dp.AttachedQuery[Any] | None = None,
    related: Sequence[dp.LogMessageId | None] = (),
) -> dp.LogMessageId | None:
    """
    Log a message with "trace" severity level.

    See `log` method.
    """
    return self.log("trace", message, metadata, loc=loc, related=related)

debug

debug(
    message: str,
    metadata: object | None = None,
    *,
    loc: Tree[Any, Any, Any] | AttachedQuery[Any] | None = None,
    related: Sequence[LogMessageId | None] = (),
) -> LogMessageId | None

Log a message with "debug" severity level.

See log method.

Source code in src/delphyne/stdlib/environments.py
658
659
660
661
662
663
664
665
666
667
668
669
670
671
def debug(
    self,
    message: str,
    metadata: object | None = None,
    *,
    loc: dp.Tree[Any, Any, Any] | dp.AttachedQuery[Any] | None = None,
    related: Sequence[dp.LogMessageId | None] = (),
) -> dp.LogMessageId | None:
    """
    Log a message with "debug" severity level.

    See `log` method.
    """
    return self.log("debug", message, metadata, loc=loc, related=related)

info

info(
    message: str,
    metadata: object | None = None,
    *,
    loc: Tree[Any, Any, Any] | AttachedQuery[Any] | None = None,
    related: Sequence[LogMessageId | None] = (),
) -> LogMessageId | None

Log a message with "info" severity level.

See log method.

Source code in src/delphyne/stdlib/environments.py
673
674
675
676
677
678
679
680
681
682
683
684
685
686
def info(
    self,
    message: str,
    metadata: object | None = None,
    *,
    loc: dp.Tree[Any, Any, Any] | dp.AttachedQuery[Any] | None = None,
    related: Sequence[dp.LogMessageId | None] = (),
) -> dp.LogMessageId | None:
    """
    Log a message with "info" severity level.

    See `log` method.
    """
    return self.log("info", message, metadata, loc=loc, related=related)

warn

warn(
    message: str,
    metadata: object | None = None,
    *,
    loc: Tree[Any, Any, Any] | AttachedQuery[Any] | None = None,
    related: Sequence[LogMessageId | None] = (),
) -> LogMessageId | None

Log a message with "warn" severity level.

See log method.

Source code in src/delphyne/stdlib/environments.py
688
689
690
691
692
693
694
695
696
697
698
699
700
701
def warn(
    self,
    message: str,
    metadata: object | None = None,
    *,
    loc: dp.Tree[Any, Any, Any] | dp.AttachedQuery[Any] | None = None,
    related: Sequence[dp.LogMessageId | None] = (),
) -> dp.LogMessageId | None:
    """
    Log a message with "warn" severity level.

    See `log` method.
    """
    return self.log("warn", message, metadata, loc=loc, related=related)

error

error(
    message: str,
    metadata: object | None = None,
    *,
    loc: Tree[Any, Any, Any] | AttachedQuery[Any] | None = None,
    related: Sequence[LogMessageId | None] = (),
) -> LogMessageId | None

Log a message with "error" severity level.

See log method.

Source code in src/delphyne/stdlib/environments.py
703
704
705
706
707
708
709
710
711
712
713
714
715
716
def error(
    self,
    message: str,
    metadata: object | None = None,
    *,
    loc: dp.Tree[Any, Any, Any] | dp.AttachedQuery[Any] | None = None,
    related: Sequence[dp.LogMessageId | None] = (),
) -> dp.LogMessageId | None:
    """
    Log a message with "error" severity level.

    See `log` method.
    """
    return self.log("error", message, metadata, loc=loc, related=related)

InvalidDemoFile dataclass

Bases: Exception

Exception raised when a demonstration file could not be parsed.

Source code in src/delphyne/stdlib/answer_loaders.py
120
121
122
123
124
125
126
127
@dataclass
class InvalidDemoFile(Exception):
    """
    Exception raised when a demonstration file could not be parsed.
    """

    file: Path
    exn: Exception

Example Database

Example dataclass

An example, usable for few-shot prompting.

Attributes:

Name Type Description
query AbstractQuery[Any]

The corresponding query.

answer Answer

The answer to the query.

tags Sequence[str]

A sequence of tags associated with the example, which policies can use to select appropriate examples.

Source code in src/delphyne/stdlib/environments.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
@dataclass(kw_only=True)
class Example:
    """
    An example, usable for few-shot prompting.

    Attributes:
        query: The corresponding query.
        answer: The answer to the query.
        tags: A sequence of tags associated with the example, which
            policies can use to select appropriate examples.
    """

    query: dp.AbstractQuery[Any]
    answer: dp.Answer
    tags: Sequence[str]
    meta: dict[str, Any]

ExampleDatabase

A simple example database.

Source code in src/delphyne/stdlib/environments.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
class ExampleDatabase:
    """
    A simple example database.
    """

    # TODO: add provenance info for better error messages.

    def __init__(
        self,
        *,
        object_loader: ObjectLoader | None,
        global_embeddings_cache_file: Path | None = None,
        templates_manager: dp.AbstractTemplatesManager | None = None,
    ):
        """
        Arguments:
            object_loader: An object loader for loading query objects.
            global_embeddings_cache_file: Global cache file that stores
                common embeddings (e.g. embeddings of examples).
            templates_manager: A templates manager, necessary when using
                embeddings.
        """
        self._embeddings_cache_file = global_embeddings_cache_file
        self.object_loader = object_loader
        self._templates_manager = templates_manager
        self._examples: dict[_QueryName, list[Example]] = defaultdict(list)

        # For both `_query_embeddings` and `_example_embeddings`, we
        # store `None` if the embeddings were computed but there were
        # zero examples. This is the equivalent of a numpy array with
        # zero lines.

        # Embeddings for queries, classified by type
        self._query_embeddings: dict[
            _EmbeddingsBucket, NDArray[np.float32] | None
        ] = {}
        # Embeddings for full examples, classified by type
        self._example_embeddings: dict[
            _EmbeddingsBucket, NDArray[np.float32] | None
        ] = {}
        # Similarity matrix
        self._example_similarity_matrix: dict[
            _EmbeddingsBucket, NDArray[np.float32] | None
        ] = {}

    def add_demonstration(self, demo: dp.Demo):
        """
        Add all examples from a demonstration to the database.
        """
        if isinstance(demo, dp.QueryDemo):
            self._add_query_demonstration(demo)
        else:
            assert isinstance(demo, dp.StrategyDemo)
            for q in demo.queries:
                self._add_query_demonstration(q)
        # Embeddings are cleared since new examples were added.
        self._query_embeddings.clear()

    def _add_query_demonstration(self, demo: dp.QueryDemo):
        """
        Add all examples from a standalone query demonstration to the
        database.
        """
        if not demo.answers:
            return
        if (ex := demo.answers[0].example) is not None and not ex:
            # If the user explicitly asked not to include the example.
            # TODO: What if the user asked to include several answers?
            # Right now, we only allow the first one to be added.
            return
        demo_answer = demo.answers[0]
        answer = dm.translate_answer(demo_answer)
        if self.object_loader is None:
            raise ValueError(
                "ExampleDatabase was not provided with an ObjectLoader."
            )
        query = self.object_loader.load_query(demo.query, demo.args)
        example = Example(
            query=query,
            answer=answer,
            tags=demo_answer.tags,
            meta=demo_answer.meta or {},
        )
        self._examples[demo.query].append(example)

    def examples_for(self, query_name: str) -> Sequence[Example]:
        """
        Obtain examples by their indices for a given query name.
        """
        return self._examples[query_name]

    ### Useful accessors

    @property
    def templates_manager(self) -> dp.AbstractTemplatesManager:
        if self._templates_manager is None:
            raise ValueError(
                "ExampleDatabase.templates_manager was not provided."
            )
        return self._templates_manager

    @property
    def global_embeddings_cache_file(self) -> Path:
        if self._embeddings_cache_file is None:
            raise ValueError(
                "ExampleDatabase.global_embeddings_cache_file was not provided."
            )
        return self._embeddings_cache_file

    ### Loading embeddings

    def query_embedding_text(self, query: dp.AbstractQuery[Any]) -> str:
        return _query_embedding_text(self.templates_manager, query)

    def example_embedding_text(self, example: Example) -> str:
        return _example_embedding_text(self.templates_manager, example)

    def fetch_query_embeddings(
        self,
        name: _QueryName,
        model: _EmbeddingModelName,
    ) -> NDArray[np.float32] | None:
        """
        Obtain the query embeddings for all examples of a given type.

        If the embeddings are not loaded yet, they are loaded from
        cache or computed on the fly.
        """
        key = (name, model)
        if key not in self._query_embeddings:
            embs = self._load_embeddings(
                model,
                self.examples_for(name),
                lambda e: self.query_embedding_text(e.query),
            )
            self._query_embeddings[key] = embs
        return self._query_embeddings[key]

    def fetch_example_embeddings(
        self,
        name: _QueryName,
        model: _EmbeddingModelName,
    ) -> NDArray[np.float32] | None:
        """
        Obtain the embeddings of all examples of a given type.

        If the embeddings are not loaded yet, they are loaded from
        cache or computed on the fly.
        """
        key = (name, model)
        if key not in self._example_embeddings:
            embs = self._load_embeddings(
                model,
                self.examples_for(name),
                self.example_embedding_text,
            )
            self._example_embeddings[key] = embs
        return self._example_embeddings[key]

    def fetch_example_similarity_matrix(
        self,
        name: _QueryName,
        model: _EmbeddingModelName,
    ) -> NDArray[np.float32] | None:
        """
        Obtain the similarity matrix of all examples of a given type.

        If the similarity matrix is not loaded yet, it is computed on
        the fly from the example embeddings.
        """
        key = (name, model)
        if key not in self._example_similarity_matrix:
            embs = self.fetch_example_embeddings(name, model)
            if embs is None:
                sim_matrix = None
            else:
                # Compute cosine similarity matrix
                norms = np.linalg.norm(embs, axis=1, keepdims=True)
                normalized_embs = embs / np.clip(
                    norms, a_min=1e-10, a_max=None
                )
                sim_matrix = normalized_embs @ normalized_embs.T
            self._example_similarity_matrix[key] = sim_matrix
        return self._example_similarity_matrix[key]

    def _load_embeddings(
        self,
        model_name: _EmbeddingModelName,
        examples: Sequence[Example],
        embed_fun: Callable[[Example], str],
    ) -> NDArray[np.float32] | None:
        """
        Get embeddings for all examples of a given query type.

        This method takes a global file lock so as to avoid concurrent
        accesses to the embeddings cache file.
        """

        import filelock

        # Note: on Unix systems, the lockfile may not be automatically
        # deleted. See https://stackoverflow.com/questions/58098634/

        model = em.standard_openai_embedding_model(model_name)
        to_embed = [embed_fun(e) for e in examples]
        cache_file = self.global_embeddings_cache_file
        lock_file = _embeddings_cache_lockfile(
            self.global_embeddings_cache_file
        )
        with filelock.FileLock(lock_file):
            with em.load_embeddings_cache(cache_file, "read_write") as cache:
                res = model.embed(to_embed, cache)
        if not res:
            return None
        embeddings = np.array([r.embedding for r in res], dtype=np.float32)
        # We ignore spending for the global cache.
        return embeddings

__init__

__init__(
    *,
    object_loader: ObjectLoader | None,
    global_embeddings_cache_file: Path | None = None,
    templates_manager: AbstractTemplatesManager | None = None,
)

Parameters:

Name Type Description Default
object_loader ObjectLoader | None

An object loader for loading query objects.

required
global_embeddings_cache_file Path | None

Global cache file that stores common embeddings (e.g. embeddings of examples).

None
templates_manager AbstractTemplatesManager | None

A templates manager, necessary when using embeddings.

None
Source code in src/delphyne/stdlib/environments.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def __init__(
    self,
    *,
    object_loader: ObjectLoader | None,
    global_embeddings_cache_file: Path | None = None,
    templates_manager: dp.AbstractTemplatesManager | None = None,
):
    """
    Arguments:
        object_loader: An object loader for loading query objects.
        global_embeddings_cache_file: Global cache file that stores
            common embeddings (e.g. embeddings of examples).
        templates_manager: A templates manager, necessary when using
            embeddings.
    """
    self._embeddings_cache_file = global_embeddings_cache_file
    self.object_loader = object_loader
    self._templates_manager = templates_manager
    self._examples: dict[_QueryName, list[Example]] = defaultdict(list)

    # For both `_query_embeddings` and `_example_embeddings`, we
    # store `None` if the embeddings were computed but there were
    # zero examples. This is the equivalent of a numpy array with
    # zero lines.

    # Embeddings for queries, classified by type
    self._query_embeddings: dict[
        _EmbeddingsBucket, NDArray[np.float32] | None
    ] = {}
    # Embeddings for full examples, classified by type
    self._example_embeddings: dict[
        _EmbeddingsBucket, NDArray[np.float32] | None
    ] = {}
    # Similarity matrix
    self._example_similarity_matrix: dict[
        _EmbeddingsBucket, NDArray[np.float32] | None
    ] = {}

add_demonstration

add_demonstration(demo: Demo)

Add all examples from a demonstration to the database.

Source code in src/delphyne/stdlib/environments.py
175
176
177
178
179
180
181
182
183
184
185
186
def add_demonstration(self, demo: dp.Demo):
    """
    Add all examples from a demonstration to the database.
    """
    if isinstance(demo, dp.QueryDemo):
        self._add_query_demonstration(demo)
    else:
        assert isinstance(demo, dp.StrategyDemo)
        for q in demo.queries:
            self._add_query_demonstration(q)
    # Embeddings are cleared since new examples were added.
    self._query_embeddings.clear()

examples_for

examples_for(query_name: str) -> Sequence[Example]

Obtain examples by their indices for a given query name.

Source code in src/delphyne/stdlib/environments.py
215
216
217
218
219
def examples_for(self, query_name: str) -> Sequence[Example]:
    """
    Obtain examples by their indices for a given query name.
    """
    return self._examples[query_name]

fetch_query_embeddings

fetch_query_embeddings(
    name: _QueryName, model: _EmbeddingModelName
) -> NDArray[float32] | None

Obtain the query embeddings for all examples of a given type.

If the embeddings are not loaded yet, they are loaded from cache or computed on the fly.

Source code in src/delphyne/stdlib/environments.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def fetch_query_embeddings(
    self,
    name: _QueryName,
    model: _EmbeddingModelName,
) -> NDArray[np.float32] | None:
    """
    Obtain the query embeddings for all examples of a given type.

    If the embeddings are not loaded yet, they are loaded from
    cache or computed on the fly.
    """
    key = (name, model)
    if key not in self._query_embeddings:
        embs = self._load_embeddings(
            model,
            self.examples_for(name),
            lambda e: self.query_embedding_text(e.query),
        )
        self._query_embeddings[key] = embs
    return self._query_embeddings[key]

fetch_example_embeddings

fetch_example_embeddings(
    name: _QueryName, model: _EmbeddingModelName
) -> NDArray[float32] | None

Obtain the embeddings of all examples of a given type.

If the embeddings are not loaded yet, they are loaded from cache or computed on the fly.

Source code in src/delphyne/stdlib/environments.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
def fetch_example_embeddings(
    self,
    name: _QueryName,
    model: _EmbeddingModelName,
) -> NDArray[np.float32] | None:
    """
    Obtain the embeddings of all examples of a given type.

    If the embeddings are not loaded yet, they are loaded from
    cache or computed on the fly.
    """
    key = (name, model)
    if key not in self._example_embeddings:
        embs = self._load_embeddings(
            model,
            self.examples_for(name),
            self.example_embedding_text,
        )
        self._example_embeddings[key] = embs
    return self._example_embeddings[key]

fetch_example_similarity_matrix

fetch_example_similarity_matrix(
    name: _QueryName, model: _EmbeddingModelName
) -> NDArray[float32] | None

Obtain the similarity matrix of all examples of a given type.

If the similarity matrix is not loaded yet, it is computed on the fly from the example embeddings.

Source code in src/delphyne/stdlib/environments.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
def fetch_example_similarity_matrix(
    self,
    name: _QueryName,
    model: _EmbeddingModelName,
) -> NDArray[np.float32] | None:
    """
    Obtain the similarity matrix of all examples of a given type.

    If the similarity matrix is not loaded yet, it is computed on
    the fly from the example embeddings.
    """
    key = (name, model)
    if key not in self._example_similarity_matrix:
        embs = self.fetch_example_embeddings(name, model)
        if embs is None:
            sim_matrix = None
        else:
            # Compute cosine similarity matrix
            norms = np.linalg.norm(embs, axis=1, keepdims=True)
            normalized_embs = embs / np.clip(
                norms, a_min=1e-10, a_max=None
            )
            sim_matrix = normalized_embs @ normalized_embs.T
        self._example_similarity_matrix[key] = sim_matrix
    return self._example_similarity_matrix[key]

SerializedQuery dataclass

A hashable representation of a query.

Attributes:

Name Type Description
name str

The name of the query.

args str

The serialized arguments of the query, as a canonical JSON string. Object keys are sorted so that equality is defined modulo key order.

Source code in src/delphyne/core/answer_databases.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
@dataclass(frozen=True)
class SerializedQuery:
    """
    A hashable representation of a query.

    Attributes:
        name: The name of the query.
        args: The serialized arguments of the query, as a canonical JSON
            string. Object keys are sorted so that equality is defined
            modulo key order.
    """

    name: str
    args_str: str

    @staticmethod
    def _dump_json(obj: Any) -> str:
        return json.dumps(obj, sort_keys=True)

    @staticmethod
    def make(query: AbstractQuery[Any]) -> "SerializedQuery":
        args = SerializedQuery._dump_json(query.serialize_args())
        return SerializedQuery(query.query_name(), args)

    @staticmethod
    def from_json(name: str, args: dict[str, Any]) -> "SerializedQuery":
        return SerializedQuery(name, SerializedQuery._dump_json(args))

    @property
    def args_dict(self):
        return json.loads(self.args_str)

    def parse[T: AbstractQuery[Any]](self, type: type[T]) -> T:
        return type.parse_instance(self.args_dict)

Data and Templates Managers

DataManager

Utility class for loading and accessing external data.

Attributes:

Name Type Description
data

A dictionary containing all loaded data files. Each file corresponds to a key in the dictionary (stripped of the extension).

Source code in src/delphyne/stdlib/environments.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class DataManager:
    """
    Utility class for loading and accessing external data.

    Attributes:
        data: A dictionary containing all loaded data files. Each file
            corresponds to a key in the dictionary (stripped of the
            extension).
    """

    def __init__(self, data_dirs: Sequence[Path]):
        """
        Find all files with extension `*.data.yaml` in the `data_dirs`,
        parse them and save everything in a big dict. If two files have
        the same name, raise an error.
        """
        self.data = _load_data(data_dirs)

__init__

__init__(data_dirs: Sequence[Path])

Find all files with extension *.data.yaml in the data_dirs, parse them and save everything in a big dict. If two files have the same name, raise an error.

Source code in src/delphyne/stdlib/environments.py
44
45
46
47
48
49
50
def __init__(self, data_dirs: Sequence[Path]):
    """
    Find all files with extension `*.data.yaml` in the `data_dirs`,
    parse them and save everything in a big dict. If two files have
    the same name, raise an error.
    """
    self.data = _load_data(data_dirs)

TemplatesManager

Bases: AbstractTemplatesManager

A class for managing Jinja prompt templates.

Templates are configured with the trim_blocks and lstrip_blocks options set to True (no newlines are inserted after blocks and indentation can be used within blocks without affecting the output). The keep_trailing_newline option is set to False so trailing new lines at the end of template files are ignored.

Templates are first searched in the provided prompt folders and then in the standard library (delphyne.stdlib.templates). For example, to show standard formatting instructions, you can include the following in your instance prompts:

{% include 'stdlib/format.jinja' %}

All templates automatically have access to the following global objects:

  • A yaml filter for converting an object into a YAML string.
  • A json filter for converting an object into a JSON string.
  • A fail function that takes an error message as an argument and raises an exception on Python side.
  • A data dictionary containing all loaded data files.
Source code in src/delphyne/stdlib/environments.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
class TemplatesManager(dp.AbstractTemplatesManager):
    """
    A class for managing Jinja prompt templates.

    Templates are configured with the `trim_blocks` and `lstrip_blocks`
    options set to `True` (no newlines are inserted after blocks and
    indentation can be used within blocks without affecting the output).
    The `keep_trailing_newline` option is set to `False` so trailing new
    lines at the end of template files are ignored.

    Templates are first searched in the provided prompt folders and then
    in the standard library (`delphyne.stdlib.templates`). For example,
    to show standard formatting instructions, you can include the
    following in your instance prompts:

    ```jinja
    {% include 'stdlib/format.jinja' %}
    ```

    All templates automatically have access to the following global
    objects:

    - A `yaml` filter for converting an object into a YAML string.
    - A `json` filter for converting an object into a JSON string.
    - A `fail` function that takes an error message as an argument and
      raises an exception on Python side.
    - A `data` dictionary containing all loaded data files.
    """

    def __init__(self, prompt_dirs: Sequence[Path], data_manager: DataManager):
        """
        Args:
            prompt_dirs: A sequence of directories where Jinja prompt
                templates can be found.
            data_manager: A sequence of directories where data files can be
                found.
        """
        self.prompt_folders = prompt_dirs
        self.data_manager = data_manager
        loader = jinja2.ChoiceLoader(
            [
                jinja2.FileSystemLoader(self.prompt_folders),
                jinja2.PackageLoader("delphyne.stdlib"),
            ]
        )
        self.env = jinja2.Environment(
            loader=loader,
            trim_blocks=True,
            lstrip_blocks=True,
            keep_trailing_newline=False,
        )
        self.env.filters["yaml"] = dump_yaml_object
        self.env.filters["json"] = _dump_json_object
        self.env.globals["fail"] = _fail_from_template  # type: ignore

    @override
    def prompt(
        self,
        *,
        query_name: str,
        prompt_kind: Literal["system", "instance"] | str,
        template_args: dict[str, Any],
        default_template: str | None = None,
    ) -> str:
        suffix = "." + prompt_kind
        template_name = f"{query_name}{suffix}{JINJA_EXTENSION}"
        try:
            template = self.env.get_template(template_name)
        except jinja2.TemplateNotFound:
            if default_template is not None:
                template = self.env.from_string(default_template)
            else:
                raise dp.TemplateFileMissing(template_name)
        try:
            assert "data" not in template_args
            template_args |= {"data": self.data_manager.data}
            return template.render(template_args)
        except jinja2.TemplateNotFound as e:
            raise dp.TemplateError(template_name, e)
        except jinja2.UndefinedError as e:
            raise dp.TemplateError(template_name, e)
        except jinja2.TemplateSyntaxError as e:
            raise dp.TemplateError(template_name, e)

__init__

__init__(prompt_dirs: Sequence[Path], data_manager: DataManager)

Parameters:

Name Type Description Default
prompt_dirs Sequence[Path]

A sequence of directories where Jinja prompt templates can be found.

required
data_manager DataManager

A sequence of directories where data files can be found.

required
Source code in src/delphyne/stdlib/environments.py
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
def __init__(self, prompt_dirs: Sequence[Path], data_manager: DataManager):
    """
    Args:
        prompt_dirs: A sequence of directories where Jinja prompt
            templates can be found.
        data_manager: A sequence of directories where data files can be
            found.
    """
    self.prompt_folders = prompt_dirs
    self.data_manager = data_manager
    loader = jinja2.ChoiceLoader(
        [
            jinja2.FileSystemLoader(self.prompt_folders),
            jinja2.PackageLoader("delphyne.stdlib"),
        ]
    )
    self.env = jinja2.Environment(
        loader=loader,
        trim_blocks=True,
        lstrip_blocks=True,
        keep_trailing_newline=False,
    )
    self.env.filters["yaml"] = dump_yaml_object
    self.env.filters["json"] = _dump_json_object
    self.env.globals["fail"] = _fail_from_template  # type: ignore

TemplateFileMissing dataclass

Bases: Exception

Exception raised when a template file is missing.

This exception should only be raised when a top-level template file is missing. If an include statement fails within a template, a TemplateError exception should be raised instead.

Source code in src/delphyne/core/queries.py
144
145
146
147
148
149
150
151
152
153
154
@dataclass
class TemplateFileMissing(Exception):
    """
    Exception raised when a template file is missing.

    This exception should only be raised when a top-level template file
    is missing. If an `include` statement fails within a template, a
    `TemplateError` exception should be raised instead.
    """

    file: str

Tracer

Tracer

A mutable trace along with a mutable list of log messages.

Both components are protected by a lock to ensure thread-safety (some policies spawn multiple concurrent threads).

Attributes:

Name Type Description
trace

A mutable trace.

messages list[LogMessage]

A mutable list of log messages.

lock

A reentrant lock protecting access to the trace and log. The lock is publicly exposed so that threads can log several successive messages without other threads interleaving new messages in between.

Source code in src/delphyne/core/traces.py
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
class Tracer:
    """
    A mutable trace along with a mutable list of log messages.

    Both components are protected by a lock to ensure thread-safety
    (some policies spawn multiple concurrent threads).

    Attributes:
        trace: A mutable trace.
        messages: A mutable list of log messages.
        lock: A reentrant lock protecting access to the trace and log.
            The lock is publicly exposed so that threads can log several
            successive messages without other threads interleaving new
            messages in between.
    """

    # TODO: there are cleaner ways to achieve good message order beyong
    # exposing the lock.

    def __init__(self, log_level: LogLevel = "info"):
        """
        Parameters:
            log_level: The minimum severity level of messages to log.
        """
        self.trace = Trace()
        self.messages: list[LogMessage] = []
        self.log_level: LogLevel = log_level

        # Different threads may be logging information or appending to
        # the trace in parallel.
        self.lock = threading.RLock()

    def global_node_id(self, node: refs.GlobalNodeRef) -> irefs.NodeId:
        """
        Ensure that a node at a given reference is present in the trace
        and return the corresponding node identififier.
        """
        with self.lock:
            return self.trace.convert_global_node_ref(node)

    def trace_node(self, node: refs.GlobalNodeRef) -> None:
        """
        Ensure that a node at a given reference is present in the trace.

        Returns the associated node identifier.

        See `tracer_hook` for registering a hook that automatically
        calls this method on all encountered nodes.
        """
        self.global_node_id(node)

    def trace_query(self, query: AttachedQuery[Any]) -> None:
        """
        Ensure that a query at a given reference is present in the
        trace, even if no answer is provided for it.
        """
        with self.lock:
            self.trace.convert_global_space_path(query.ref)

    def trace_answer(
        self, space: refs.GlobalSpacePath, answer: refs.Answer
    ) -> None:
        """
        Ensure that a given query answer is present in the trace, even
        it is is not used to reach a node.
        """
        with self.lock:
            self.trace.convert_answer_ref((space, answer))

    def log(
        self,
        level: LogLevel,
        message: str,
        metadata: object | None = None,
        *,
        location: Location | None = None,
        related: Sequence[LogMessageId | None] = (),
    ) -> LogMessageId | None:
        """
        Log a message, with optional metadata and location information.
        The metadata must be exportable to JSON using Pydantic.
        """
        if not log_level_greater_or_equal(level, self.log_level):
            return None
        time = datetime.now()
        with self.lock:
            id = len(self.messages)
            short_location = None
            if location is not None:
                short_location = self.trace.convert_location(location)
            self.messages.append(
                LogMessage(
                    message=message,
                    level=level,
                    time=time,
                    metadata=metadata,
                    location=short_location,
                    message_id=id,
                    related=[r for r in related if r is not None],
                )
            )
            return id

    def export_log(
        self, *, remove_timing_info: bool = False
    ) -> Iterable[ExportableLogMessage]:
        """
        Export the log into an easily serializable format.
        """
        with self.lock:
            for m in self.messages:
                node = None
                space = None
                if isinstance(m.location, irefs.NodeId):
                    node = m.location.id
                if isinstance(m.location, irefs.SpaceId):
                    space = m.location.id
                yield ExportableLogMessage(
                    message=m.message,
                    level=m.level,
                    time=m.time if not remove_timing_info else None,
                    node=node,
                    space=space,
                    metadata=pydantic_dump(object, m.metadata),
                    message_id=m.message_id,
                    related=tuple(m.related),
                )

    def export_trace(self) -> ExportableTrace:
        """
        Export the trace into an easily serializable format.
        """
        with self.lock:
            return self.trace.export()

__init__

__init__(log_level: LogLevel = 'info')

Parameters:

Name Type Description Default
log_level LogLevel

The minimum severity level of messages to log.

'info'
Source code in src/delphyne/core/traces.py
659
660
661
662
663
664
665
666
667
668
669
670
def __init__(self, log_level: LogLevel = "info"):
    """
    Parameters:
        log_level: The minimum severity level of messages to log.
    """
    self.trace = Trace()
    self.messages: list[LogMessage] = []
    self.log_level: LogLevel = log_level

    # Different threads may be logging information or appending to
    # the trace in parallel.
    self.lock = threading.RLock()

global_node_id

global_node_id(node: GlobalNodeRef) -> NodeId

Ensure that a node at a given reference is present in the trace and return the corresponding node identififier.

Source code in src/delphyne/core/traces.py
672
673
674
675
676
677
678
def global_node_id(self, node: refs.GlobalNodeRef) -> irefs.NodeId:
    """
    Ensure that a node at a given reference is present in the trace
    and return the corresponding node identififier.
    """
    with self.lock:
        return self.trace.convert_global_node_ref(node)

trace_node

trace_node(node: GlobalNodeRef) -> None

Ensure that a node at a given reference is present in the trace.

Returns the associated node identifier.

See tracer_hook for registering a hook that automatically calls this method on all encountered nodes.

Source code in src/delphyne/core/traces.py
680
681
682
683
684
685
686
687
688
689
def trace_node(self, node: refs.GlobalNodeRef) -> None:
    """
    Ensure that a node at a given reference is present in the trace.

    Returns the associated node identifier.

    See `tracer_hook` for registering a hook that automatically
    calls this method on all encountered nodes.
    """
    self.global_node_id(node)

trace_query

trace_query(query: AttachedQuery[Any]) -> None

Ensure that a query at a given reference is present in the trace, even if no answer is provided for it.

Source code in src/delphyne/core/traces.py
691
692
693
694
695
696
697
def trace_query(self, query: AttachedQuery[Any]) -> None:
    """
    Ensure that a query at a given reference is present in the
    trace, even if no answer is provided for it.
    """
    with self.lock:
        self.trace.convert_global_space_path(query.ref)

trace_answer

trace_answer(space: GlobalSpacePath, answer: Answer) -> None

Ensure that a given query answer is present in the trace, even it is is not used to reach a node.

Source code in src/delphyne/core/traces.py
699
700
701
702
703
704
705
706
707
def trace_answer(
    self, space: refs.GlobalSpacePath, answer: refs.Answer
) -> None:
    """
    Ensure that a given query answer is present in the trace, even
    it is is not used to reach a node.
    """
    with self.lock:
        self.trace.convert_answer_ref((space, answer))

log

log(
    level: LogLevel,
    message: str,
    metadata: object | None = None,
    *,
    location: Location | None = None,
    related: Sequence[LogMessageId | None] = (),
) -> LogMessageId | None

Log a message, with optional metadata and location information. The metadata must be exportable to JSON using Pydantic.

Source code in src/delphyne/core/traces.py
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
def log(
    self,
    level: LogLevel,
    message: str,
    metadata: object | None = None,
    *,
    location: Location | None = None,
    related: Sequence[LogMessageId | None] = (),
) -> LogMessageId | None:
    """
    Log a message, with optional metadata and location information.
    The metadata must be exportable to JSON using Pydantic.
    """
    if not log_level_greater_or_equal(level, self.log_level):
        return None
    time = datetime.now()
    with self.lock:
        id = len(self.messages)
        short_location = None
        if location is not None:
            short_location = self.trace.convert_location(location)
        self.messages.append(
            LogMessage(
                message=message,
                level=level,
                time=time,
                metadata=metadata,
                location=short_location,
                message_id=id,
                related=[r for r in related if r is not None],
            )
        )
        return id

export_log

export_log(*, remove_timing_info: bool = False) -> Iterable[ExportableLogMessage]

Export the log into an easily serializable format.

Source code in src/delphyne/core/traces.py
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
def export_log(
    self, *, remove_timing_info: bool = False
) -> Iterable[ExportableLogMessage]:
    """
    Export the log into an easily serializable format.
    """
    with self.lock:
        for m in self.messages:
            node = None
            space = None
            if isinstance(m.location, irefs.NodeId):
                node = m.location.id
            if isinstance(m.location, irefs.SpaceId):
                space = m.location.id
            yield ExportableLogMessage(
                message=m.message,
                level=m.level,
                time=m.time if not remove_timing_info else None,
                node=node,
                space=space,
                metadata=pydantic_dump(object, m.metadata),
                message_id=m.message_id,
                related=tuple(m.related),
            )

export_trace

export_trace() -> ExportableTrace

Export the trace into an easily serializable format.

Source code in src/delphyne/core/traces.py
768
769
770
771
772
773
def export_trace(self) -> ExportableTrace:
    """
    Export the trace into an easily serializable format.
    """
    with self.lock:
        return self.trace.export()