Skip to content

Search Algorithms and Utilities

Strategies

interact

interact(
    step: Callable[
        [AnswerPrefix, InteractStats], Opaque[P, Response[A | WrappedParseError, T]]
    ],
    process: Callable[[A, InteractStats], Opaque[P, B | Error]],
    tools: Mapping[type[T], Callable[[Any], Opaque[P, Any]]] | None = None,
    inner_policy_type: type[P] | None = None,
) -> Strategy[Branch, P, B]

A standard strategy for creating conversational agents.

A common pattern for interacting with LLMs is to have multi-message exchanges where the full conversation history is resent repeatedly. LLMs are also often allowed to request tool calls. This strategy implements this pattern. It is meant to be inlined into a wrapping strategy (since it is not decorated with strategy).

Parameters:

Name Type Description Default
step Callable[[AnswerPrefix, InteractStats], Opaque[P, Response[A | WrappedParseError, T]]]

A parametric opaque space, induced by a strategy or query that takes as an argument the current chat history (possibly empty) along with some statistics, and returns an answer to be processed. Oftentimes, this parametric opaque space is induced by a query with a special prefix field for receiving the chat history (see Query).

required
process Callable[[A, InteractStats], Opaque[P, B | Error]]

An opaque space induced by a query or strategy that is called on all model responses that are not tool calls, and which returns either a final response to be returned, or an error to be transmitted to the model as feedback (as an Error value with an absent or serializable meta field).

required
tools Mapping[type[T], Callable[[Any], Opaque[P, Any]]] | None

A mapping from supported tool interfaces to implementations. Tools themselves can be implemented using arbitrary strategies or queries, allowing the integration of horizontal and vertical LLM pipelines.

None
inner_policy_type type[P] | None

Ambient inner policy type. This information is not used at runtime but it can be provided to help type inference when necessary.

None
Source code in src/delphyne/stdlib/search/interactive.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def interact[P, A, B, T: md.AbstractTool[Any]](
    step: Callable[
        [dp.AnswerPrefix, InteractStats],
        Opaque[P, dq.Response[A | WrappedParseError, T]],
    ],
    process: Callable[[A, InteractStats], Opaque[P, B | dp.Error]],
    tools: Mapping[type[T], Callable[[Any], Opaque[P, Any]]] | None = None,
    inner_policy_type: type[P] | None = None,
) -> dp.Strategy[Branch, P, B]:
    """
    A standard strategy for creating conversational agents.

    A common pattern for interacting with LLMs is to have multi-message
    exchanges where the full conversation history is resent repeatedly.
    LLMs are also often allowed to request tool calls. This strategy
    implements this pattern. It is meant to be inlined into a wrapping
    strategy (since it is not decorated with `strategy`).

    Parameters:
        step: A parametric opaque space, induced by a strategy or query
            that takes as an argument the current chat history (possibly
            empty) along with some statistics, and returns an answer to
            be processed. Oftentimes, this parametric opaque space is
            induced by a query with a special `prefix` field for
            receiving the chat history (see `Query`).
        process: An opaque space induced by a query or strategy that is
            called on all model responses that are not tool calls, and
            which returns either a final response to be returned, or an
            error to be transmitted to the model as feedback (as an
            `Error` value with an absent or serializable `meta` field).
        tools: A mapping from supported tool interfaces to
            implementations. Tools themselves can be implemented using
            arbitrary strategies or queries, allowing the integration of
            horizontal and vertical LLM pipelines.
        inner_policy_type: Ambient inner policy type. This information
            is not used at runtime but it can be provided to help type
            inference when necessary.
    """

    prefix: dp.AnswerPrefix = []
    stats = InteractStats(num_rejected=0, num_tool_call_rounds=0)
    while True:
        resp = yield from branch(step(prefix, stats))
        prefix += [dp.OracleMessage("oracle", resp.answer)]
        match resp.parsed:
            case dq.FinalAnswer(a):
                if isinstance(a, WrappedParseError):
                    msg = dp.FeedbackMessage(
                        kind="feedback",
                        label=a.error.label,
                        description=a.error.description,
                        meta=a.error.meta,
                    )
                    stats.num_rejected += 1
                    prefix += [msg]
                else:
                    res = yield from branch(process(a, stats))
                    if isinstance(res, dp.Error):
                        msg = dp.FeedbackMessage(
                            kind="feedback",
                            label=res.label,
                            description=res.description,
                            meta=res.meta,
                        )
                        stats.num_rejected += 1
                        prefix += [msg]
                    else:
                        return res
            case dq.ToolRequests(tc):
                for i, t in enumerate(tc):
                    assert tools is not None
                    tres = yield from branch(tools[type(t)](t))
                    msg = dp.ToolResult(
                        "tool",
                        resp.answer.tool_calls[i],
                        t.render_result(tres),
                    )
                    prefix += [msg]
                stats.num_tool_call_rounds += 1

InteractStats dataclass

Statistics maintained by interact.

Attributes:

Name Type Description
num_rejected int

Number of answers that have been rejected so far, due to either parsing or processing errors.

num_tool_call_rounds int

Number of tool call rounds that have been reuqetsed by the LLM so far (a round consists in a single message that can contain several tool call requests).

Source code in src/delphyne/stdlib/search/interactive.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@dataclass
class InteractStats:
    """
    Statistics maintained by `interact`.

    Attributes:
        num_rejected: Number of answers that have been rejected so far,
            due to either parsing or processing errors.
        num_tool_call_rounds: Number of tool call rounds that have been
            reuqetsed by the LLM so far (a round consists in a single
            message that can contain several tool call requests).
    """

    num_rejected: int
    num_tool_call_rounds: int

Policies

dfs

dfs(
    tree: Tree[Branch | Fail, P, T],
    env: PolicyEnv,
    policy: P,
    max_depth: int | None = None,
    max_branching: int | None = None,
) -> StreamGen[T]

The Standard Depth-First Search Algorithm.

Whenever a branching node is encountered, branching candidates are lazily enumerated and the corresponding child recursively searched.

Attributes:

Name Type Description
max_depth optional

maximum number of branching nodes that can be traversed in a path to success.

max_branching optional

maximum number of children explored at each branching node.

Source code in src/delphyne/stdlib/search/dfs.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@search_policy
def dfs[P, T](
    tree: Tree[Branch | Fail, P, T],
    env: PolicyEnv,
    policy: P,
    max_depth: int | None = None,
    max_branching: int | None = None,
) -> StreamGen[T]:
    """
    The Standard Depth-First Search Algorithm.

    Whenever a branching node is encountered, branching candidates are
    lazily enumerated and the corresponding child recursively searched.

    Attributes:
        max_depth (optional): maximum number of branching nodes
            that can be traversed in a path to success.
        max_branching (optional): maximum number of children explored at
            each branching node.
    """
    assert max_branching is None or max_branching > 0
    match tree.node:
        case Success(x):
            yield Solution(x)
        case Fail():
            pass
        case Branch(cands):
            if max_depth is not None and max_depth <= 0:
                return
            cands = cands.stream(env, policy)
            if max_branching is not None:
                cands = cands.take(max_branching, strict=True)
            yield from cands.bind(
                lambda a: dfs(
                    max_depth=max_depth - 1 if max_depth is not None else None,
                    max_branching=max_branching,
                )(tree.child(a.tracked), env, policy).gen()
            ).gen()
        case _:
            unsupported_node(tree.node)

par_dfs

par_dfs(tree: Tree[Branch | Fail, P, T], env: PolicyEnv, policy: P) -> StreamGen[T]

Parallel Depth-First Search.

Whenever a branching node is encountered, all branching candidates are computed at once and the associated children are explored in parallel.

Source code in src/delphyne/stdlib/search/dfs.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@search_policy
def par_dfs[P, T](
    tree: Tree[Branch | Fail, P, T],
    env: PolicyEnv,
    policy: P,
) -> StreamGen[T]:
    """
    Parallel Depth-First Search.

    Whenever a branching node is encountered, all branching candidates
    are computed at once and the associated children are explored in
    parallel.
    """
    match tree.node:
        case Success(x):
            yield Solution(x)
        case Fail():
            pass
        case Branch(cands):
            cands = yield from cands.stream(env, policy).all()
            yield from Stream.parallel(
                [par_dfs()(tree.child(a.tracked), env, policy) for a in cands]
            ).gen()
        case _:
            unsupported_node(tree.node)

Combinators

sequence

sequence(
    policies: Iterable[PromptingPolicy], *, stop_on_reject: bool = True
) -> PromptingPolicy
sequence(
    policies: Iterable[SearchPolicy[N]], *, stop_on_reject: bool = True
) -> SearchPolicy[N]
sequence(
    policies: Iterable[Policy[N, P]], *, stop_on_reject: bool = True
) -> Policy[N, P]
sequence(policies: Iterable[_AnyPolicy], *, stop_on_reject: bool = True) -> _AnyPolicy

Try a list of policies, search policies, or prompting policies in sequence.

  • policies: An iterable of policies, search policies, or prompting policies to try in sequence.
  • stop_on_reject: If True, stop the sequence as soon as one policy sees all its resource requests denied. Note that this is necessary for termination when policies is an infinite iterator.
Source code in src/delphyne/stdlib/misc.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def sequence(
    policies: Iterable[_AnyPolicy],
    *,
    stop_on_reject: bool = True,
) -> _AnyPolicy:
    """
    Try a list of policies, search policies, or prompting policies in
    sequence.

    Attributes:
    - policies: An iterable of policies, search policies, or prompting
          policies to try in sequence.
    - stop_on_reject: If True, stop the sequence as soon as one policy
          sees all its resource requests denied. Note that this is
          necessary for termination when `policies` is an infinite
          iterator.
    """

    it = iter(policies)
    first = next(it)
    if isinstance(first, PromptingPolicy):
        return sequence_prompting_policies(
            cast(Iterable[PromptingPolicy], policies),
            stop_on_reject=stop_on_reject,
        )
    elif isinstance(first, SearchPolicy):
        return sequence_search_policies(
            cast(Iterable[SearchPolicy[Any]], policies),
            stop_on_reject=stop_on_reject,
        )
    else:
        assert isinstance(first, Policy)
        return sequence_policies(
            cast(Iterable[Policy[Any, Any]], policies),
            stop_on_reject=stop_on_reject,
        )

or_else

or_else(main: SearchPolicy[N], other: SearchPolicy[N]) -> SearchPolicy[N]
or_else(main: Policy[N, P], other: Policy[N, P]) -> Policy[N, P]
or_else(main: _AnyPolicy, other: _AnyPolicy) -> _AnyPolicy

Take two policies, search policies, or prompting policies as arguments. Try the first one, and then the second one only if it fails (i.e., it does not produce any solution).

Source code in src/delphyne/stdlib/misc.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
def or_else(main: _AnyPolicy, other: _AnyPolicy) -> _AnyPolicy:
    """
    Take two policies, search policies, or prompting policies as
    arguments. Try the first one, and then the second one only if it
    fails (i.e., it does not produce any solution).
    """
    if isinstance(main, PromptingPolicy):
        assert isinstance(other, PromptingPolicy)
        return prompting_policy_or_else(main, other)
    elif isinstance(main, SearchPolicy):
        assert isinstance(other, SearchPolicy)
        return search_policy_or_else(main, other)
    else:
        return policy_or_else(main, other)  # type: ignore

nofail

nofail(space: Opaque[P, A], *, default: B) -> Opaque[P, A | B]

Modify an opaque space to that branching over it can never fail.

If the stream associated with the opaque space gets exhausted and no solution is produced, the provided default value is used.

In demonstrations, the default value can be selected by using the #no_fail_default hint.

Source code in src/delphyne/stdlib/misc.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
def nofail[P, A, B](space: Opaque[P, A], *, default: B) -> Opaque[P, A | B]:
    """
    Modify an opaque space to that branching over it can never fail.

    If the stream associated with the opaque space gets exhausted and no
    solution is produced, the provided default value is used.

    In demonstrations, the default value can be selected by using the
    `#no_fail_default` hint.
    """
    try_policy = dfs() @ elim_flag(NoFailFlag, "no_fail_try")
    def_policy = dfs() @ elim_flag(NoFailFlag, "no_fail_default")
    search_policy = or_else(try_policy, def_policy)
    return nofail_strategy(space, default=default).using(
        lambda p: search_policy & p
    )

iterate

iterate(
    next: Callable[[S | None], Opaque[P, tuple[T | None, S]]],
    transform_stream: Callable[[P], StreamTransformer | None] | None = None,
) -> Opaque[P, T]

Iteratively call a strategy or query, repeatedly feeding back the last call's output state into a new call and yielding values along the way.

A standard use case is to repeatedly call a query or strategy with a blacklist of previously generated values, so as to produce diverse success values.

Parameters:

Name Type Description Default
next Callable[[S | None], Opaque[P, tuple[T | None, S]]]

A parametric opaque space, induced by a query or stratey that takes a state as an input (or None initially) and outputs a new state, along with a generated value.

required
transform_stream Callable[[P], StreamTransformer | None] | None

An optional mapping from the inner policy to a stream transformer to be applied to the resulting stream of generated values.

None

Returns:

Type Description
Opaque[P, T]

An opaque space enumerating all generated values.

Source code in src/delphyne/stdlib/search/iteration.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def iterate[P, S, T](
    next: Callable[[S | None], Opaque[P, tuple[T | None, S]]],
    transform_stream: Callable[[P], StreamTransformer | None] | None = None,
) -> Opaque[P, T]:
    """
    Iteratively call a strategy or query, repeatedly feeding back the
    last call's output state into a new call and yielding values along
    the way.

    A standard use case is to repeatedly call a query or strategy with a
    blacklist of previously generated values, so as to produce diverse
    success values.

    Arguments:
        next: A parametric opaque space, induced by a query or stratey
            that takes a state as an input (or `None` initially) and
            outputs a new state, along with a generated value.
        transform_stream: An optional mapping from the inner policy to a
            stream transformer to be applied to the resulting stream of
            generated values.

    Returns:
        An opaque space enumerating all generated values.
    """

    def iterate_policy(inner_policy: P):
        policy = _search_iteration()
        if transform_stream is not None:
            trans = transform_stream(inner_policy)
            if trans is not None:
                policy = trans @ policy
        return policy & inner_policy

    return _iterate(next).using(iterate_policy)

Opaque Spaces Sugar

just_dfs

just_dfs(policy: P) -> Policy[Branch | Fail, P]

Convenience shortcut to avoid passing lambdas to the get_policy argument of using, when using DFS in combination with the ambient inner policy.

Source code in src/delphyne/stdlib/misc.py
60
61
62
63
64
65
66
def just_dfs[P](policy: P) -> Policy[Branch | Fail, P]:
    """
    Convenience shortcut to avoid passing lambdas to the `get_policy`
    argument of `using`, when using DFS in combination with the ambient
    inner policy.
    """
    return dfs() & policy

just_compute

just_compute(policy: P) -> Policy[Compute, P]

Convenience shortcut to avoid passing lambdas to the get_policy argument of using, in the case of sub-strategies that only feature the Compute effect.

Source code in src/delphyne/stdlib/misc.py
69
70
71
72
73
74
75
def just_compute[P](policy: P) -> Policy[Compute, P]:
    """
    Convenience shortcut to avoid passing lambdas to the `get_policy`
    argument of `using`, in the case of sub-strategies that only feature
    the `Compute` effect.
    """
    return dfs() @ elim_compute() & policy

ambient_pp

ambient_pp(policy: PromptingPolicy) -> PromptingPolicy

Convenience shortcut to avoid passing lambdas to the get_policy argument of Query.using, when using the ambient inner policy as a prompting policy.

Source code in src/delphyne/stdlib/misc.py
78
79
80
81
82
83
84
def ambient_pp(policy: PromptingPolicy) -> PromptingPolicy:
    """
    Convenience shortcut to avoid passing lambdas to the `get_policy`
    argument of `Query.using`, when using the ambient inner policy as a
    prompting policy.
    """
    return policy

ambient

ambient(policy: F) -> F

Convenience shortcut to avoid passing lambdas to the get_policy argument of Query.using, when using the ambient inner policy as a sub-policy (or as a sub- prompting policy).

Source code in src/delphyne/stdlib/misc.py
87
88
89
90
91
92
93
def ambient[F](policy: F) -> F:
    """
    Convenience shortcut to avoid passing lambdas to the `get_policy`
    argument of `Query.using`, when using the ambient inner policy as a
    sub-policy (or as a sub- prompting policy).
    """
    return policy

Universal Queries

UniversalQuery dataclass

Bases: Query[object]

A universal query, implicitly defined by the surrounding context of its call. See guess for more information.

Attributes:

Name Type Description
strategy str

Fully qualified name of the surrounding strategy (e.g., my_package.my_module.my_strategy).

expected_type str

A string rendition of the expected answer type.

tags Sequence[str]

Tags associated with the space induced by the query, which can be used to locate the exact location where the query is issued (the default tag takes the name of the variable that the query result is assigned to).

locals dict[str, object]

A dictionary that provides the values of a subset of local variables or expressions (as JSON values).

Experimental

This feature is experimental and subject to change.

Source code in src/delphyne/stdlib/universal_queries.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@dataclass
class UniversalQuery(Query[object]):
    """
    A universal query, implicitly defined by the surrounding context of
    its call. See `guess` for more information.

    Attributes:
        strategy: Fully qualified name of the surrounding strategy
            (e.g., `my_package.my_module.my_strategy`).
        expected_type: A string rendition of the expected answer type.
        tags: Tags associated with the space induced by the query, which
            can be used to locate the exact location where the query is
            issued (the default tag takes the name of the variable that
            the query result is assigned to).
        locals: A dictionary that provides the values of a subset of
            local variables or expressions (as JSON values).

    !!! warning "Experimental"
        This feature is experimental and subject to change.
    """

    # TODO: add a `context` field where we store objects whose
    # documentation or source should be added to the prompt.

    strategy: str
    expected_type: str
    tags: Sequence[str]
    locals: dict[str, object]

    __parser__ = last_code_block.yaml

    @override
    def default_tags(self):
        return self.tags

    @property
    def strategy_source(self) -> str:
        """
        Return the source code of the strategy that contains this query.
        """
        strategy_obj = _load_from_qualified_name(self.strategy)
        assert callable(strategy_obj)
        return _source_code(strategy_obj)

strategy_source property

strategy_source: str

Return the source code of the strategy that contains this query.

guess

guess(
    annot: type[T], /, *, using: Sequence[object]
) -> Strategy[Branch | Fail, IPDict, T]
guess(
    annot: TypeAnnot[Any], /, *, using: Sequence[object]
) -> Strategy[Branch | Fail, IPDict, Any]
guess(
    annot: TypeAnnot[Any], /, *, using: Sequence[object]
) -> Strategy[Branch | Fail, IPDict, Any]

Attempt to guess a value of a given type, using the surrounding context of the call site along with the value of some local variables or expressions.

This function inspects the call stack to determine the context in which it is called and issues a UniversalQuery, with a tag corresponding to the name of the assigned variable. A failure node is issued if the oracle result cannot be parsed into the expected type. For example:

res = yield from guess(int, using=[x, y.summary()])

issues a UniversalQuery query tagged res, with attribute locals a dictionary with string keys "x" and "y.summary()".

Attributes:

Name Type Description
annot

The expected type of the value to be guessed.

using

A sequence of local variables or expressions whose value should be communicated to the oracle (a label for each expression is automatically generated using source information).

Note

Our use of an overloaded type should not be necessary anymore when TypeExpr is released with Python 3.14.

Experimental

This feature is experimental and subject to change.

Source code in src/delphyne/stdlib/universal_queries.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def guess(
    annot: TypeAnnot[Any], /, *, using: Sequence[object]
) -> dp.Strategy[Branch | Fail, IPDict, Any]:
    """
    Attempt to guess a value of a given type, using the surrounding
    context of the call site along with the value of some local
    variables or expressions.

    This function inspects the call stack to determine the context in
    which it is called and issues a `UniversalQuery`, with a tag
    corresponding to the name of the assigned variable. A failure node is
    issued if the oracle result cannot be parsed into the expected type.
    For example:

    ```python
    res = yield from guess(int, using=[x, y.summary()])
    ```

    issues a `UniversalQuery` query tagged `res`, with attribute
    `locals` a dictionary with string keys `"x"` and `"y.summary()"`.

    Attributes:
        annot: The expected type of the value to be guessed.
        using: A sequence of local variables or expressions whose value
            should be communicated to the oracle (a label for each
            expression is automatically generated using source information).

    !!! note
        Our use of an overloaded type should not be necessary anymore
        when `TypeExpr` is released with Python 3.14.

    !!! warning "Experimental"
        This feature is experimental and subject to change.
    """

    # Extracting the name of the surrounding strategy
    strategy = surrounding_qualname(skip=1)
    assert strategy is not None
    strategy = _rename_main_module_in_qualified_name(strategy)

    # Extracting the name of the variable being assigned
    cur_instr_src = current_instruction_source(skip=1)
    ret_val_name = assigned_var_name(cur_instr_src)
    assert isinstance(ret_val_name, str)

    # Computing the 'locals' dictionary
    guess_args = call_argument_sources(cur_instr_src, guess)
    assert guess_args is not None
    _args, kwargs = guess_args
    using_args = _parse_list_of_ids(kwargs["using"])
    assert len(using) == len(using_args)
    locals = {k: pydantic_dump(type(v), v) for k, v in zip(using_args, using)}

    # Building the query
    query = UniversalQuery(strategy, str(annot), [ret_val_name], locals)

    ret = yield from branch(query.using(...))
    try:
        parsed = pydantic_load(annot, ret)
    except Exception as e:
        assert_never((yield from fail("parse_error", message=str(e))))
    return parsed
best_first_search(
    tree: Tree[Branch | Factor | Value | Fail, P, T],
    env: PolicyEnv,
    policy: P,
    child_confidence_prior: Callable[[int, int], float],
    max_depth: int | None = None,
) -> StreamGen[T]

Best First Search Algorithm.

Nodes can be branching nodes or factor nodes. Factor nodes feature a confidence score in the [0, 1] interval. The total confidence of any node in the tree is the product of all confidence factors found on the path from the root to this node. The algorithm stores all visited branching nodes in a priority queue. At every step, it picks the node with highest confidence and spends an atomic amount of effort trying to generate a new child. If it succeeds, the first descendant branching node is added to the tree and the algorithm continues.

Also, the total confidence of each branching node is multiplied by an additional penalty factor that depends on how many children have been generated already, using the child_confidence_prior argument. This argument is a function that takes as its first argument the depth of the current branching node (0 for the root, only incrementing when meeting other branching nodes) and as its second argument how many children have been generated so far. It returns the additional penalty to be added.

The max_depth parameter indicates the maximum depth a branch node can have. The root has depth 0 and and only branch nodes count towards increasing the depth.

Source code in src/delphyne/stdlib/search/bestfs.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
@search_policy
def best_first_search[P, T](
    tree: dp.Tree[Branch | Factor | Value | Fail, P, T],
    env: PolicyEnv,
    policy: P,
    child_confidence_prior: Callable[[int, int], float],
    max_depth: int | None = None,
) -> dp.StreamGen[T]:
    """
    Best First Search Algorithm.

    Nodes can be branching nodes or factor nodes. Factor nodes feature a
    confidence score in the [0, 1] interval. The total confidence of any
    node in the tree is the product of all confidence factors found on
    the path from the root to this node. The algorithm stores all
    visited branching nodes in a priority queue. At every step, it picks
    the node with highest confidence and spends an atomic amount of
    effort trying to generate a new child. If it succeeds, the first
    descendant branching node is added to the tree and the algorithm
    continues.

    Also, the total confidence of each branching node is multiplied by
    an additional penalty factor that depends on how many children have
    been generated already, using the `child_confidence_prior` argument.
    This argument is a function that takes as its first argument the
    depth of the current branching node (0 for the root, only
    incrementing when meeting other branching nodes) and as its second
    argument how many children have been generated so far. It returns
    the additional penalty to be added.

    The `max_depth` parameter indicates the maximum depth a branch node
    can have. The root has depth 0 and and only branch nodes count
    towards increasing the depth.
    """
    # `counter` is used to assign ids that are used to solve ties in the
    # priority queue (the older element gets priority).
    counter = 0
    pqueue: list[_PriorityItem] = []  # a heap

    def push_fresh_node(
        tree: dp.Tree[Branch | Factor | Value | Fail, Any, Any],
        confidence: float,
        depth: int,
    ) -> dp.StreamGen[T]:
        match tree.node:
            case dp.Success():
                yield dp.Solution(tree.node.success)
            case Fail():
                pass
            case Factor() | Value():
                if isinstance(tree.node, Value):
                    penalty_fun = tree.node.value(policy)
                else:
                    penalty_fun = tree.node.factor(policy)
                # Evaluate metrics if a penalty function is provided
                if penalty_fun is not None:
                    eval_stream = tree.node.eval.stream(env, policy)
                    eval = yield from eval_stream.first()
                    # If we failed to evaluate the metrics, we give up.
                    if eval is None:
                        return
                    if isinstance(tree.node, Value):
                        confidence = penalty_fun(eval.tracked.value)
                    else:
                        confidence *= penalty_fun(eval.tracked.value)
                yield from push_fresh_node(tree.child(None), confidence, depth)
            case Branch():
                if max_depth is not None and depth > max_depth:
                    return
                state = _NodeState(
                    depth=depth,
                    children=[],
                    confidence=confidence,
                    stream=[tree.node.cands.stream(env, policy)],
                    node=tree.node,
                    tree=tree,
                    next_actions=[],
                )
                nonlocal counter
                counter += 1
                prior = child_confidence_prior(depth, 0)
                item_confidence = confidence * prior
                item = _PriorityItem(-item_confidence, counter, state)
                heapq.heappush(pqueue, item)
            case _:
                unsupported_node(tree.node)

    def reinsert_node(state: _NodeState) -> None:
        nonlocal counter
        counter += 1
        prior = child_confidence_prior(state.depth, len(state.children))
        item_confidence = state.confidence * prior
        item = _PriorityItem(-item_confidence, counter, state)
        heapq.heappush(pqueue, item)

    # Put the root into the queue.
    yield from push_fresh_node(tree, 1.0, 0)
    while pqueue:
        state = heapq.heappop(pqueue).node_state
        if not state.next_actions:
            if not state.stream[0]:
                # No more actions to take, we do not put the node back.
                continue
            generated, _, next = yield from state.stream[0].next()
            state.next_actions.extend([a.tracked for a in generated])
            state.stream[0] = next
        if state.next_actions:
            cand = state.next_actions.pop(0)
            child = state.tree.child(cand)
            state.children.append(child.ref)
            yield from push_fresh_node(child, 1, state.depth + 1)
        # We put the node back into the queue
        reinsert_node(state)

Abduction

Abduction dataclass

Bases: Node

Node for the singleton tree produced by abduction. See abduction for details.

An action is a successful proof of the main goal.

Source code in src/delphyne/stdlib/search/abduction.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@dataclass
class Abduction(dp.Node):
    """
    Node for the singleton tree produced by `abduction`.
    See `abduction` for details.

    An action is a successful proof of the main goal.
    """

    prove: Callable[
        [Sequence[tuple[_Fact, _Proof]], _Fact | None],
        OpaqueSpace[Any, _Status],
    ]
    suggest: Callable[
        [_Feedback],
        OpaqueSpace[Any, Sequence[_Fact]],
    ]
    search_equivalent: Callable[
        [Sequence[_Fact], _Fact],
        OpaqueSpace[Any, _Fact | None],
    ]
    redundant: Callable[
        [Sequence[_Fact], _Fact],
        OpaqueSpace[Any, bool],
    ]

    def navigate(self) -> dp.Navigation:
        def aux(fact: dp.Tracked[_Fact] | None) -> dp.Navigation:
            # Take a fact as an argument and return a list of
            # (proved_fact, proof) pairs.
            res = yield self.prove([], fact)
            status, payload = res[0], res[1]
            if status.value == "proved":
                return [(fact, payload)]
            elif status.value == "disproved":
                return []
            else:
                assert status.value == "feedback"
                feedback = payload
                suggestions = yield self.suggest(feedback)
                proved: list[Any] = []
                for s in suggestions:
                    extra: Any = yield from aux(s)
                    proved.extend(extra)
                res = yield self.prove(proved, fact)
                status, payload = res[0], res[1]
                if status.value == "proved":
                    proved.append((fact, payload))
                return _remove_duplicates(proved, by=lambda x: drop_refs(x[0]))

        proved: Any = yield from aux(None)
        main_proof = _find_assoc(proved, None)
        if main_proof is None:
            raise dp.NavigationError(
                "No proof for the main goal was produced."
            )
        return main_proof

abduction

abduction(
    prove: Callable[
        [Sequence[tuple[Fact, Proof]], Fact | None],
        Opaque[P, AbductionStatus[Feedback, Proof]],
    ],
    suggest: Callable[[Feedback], Opaque[P, Sequence[Fact]]],
    search_equivalent: Callable[[Sequence[Fact], Fact], Opaque[P, Fact | None]],
    redundant: Callable[[Sequence[Fact], Fact], Opaque[P, bool]],
    inner_policy_type: type[P] | None = None,
) -> Strategy[Abduction, P, Proof]

Higher-order strategy for proving a fact via recursive abduction.

Parameters:

Name Type Description Default
prove Callable[[Sequence[tuple[Fact, Proof]], Fact | None], Opaque[P, AbductionStatus[Feedback, Proof]]]

take a sequence of already established facts as an argument along with a new fact, and attempt to prove this new fact. Three outcomes are possible: the fact is proved, disproved, or a list of suggestions are made that might be helpful to prove first. None denotes the top-level goal to be proved.

required
suggest Callable[[Feedback], Opaque[P, Sequence[Fact]]]

take some feedback from the prove function and return a sequence of fact candidates that may be useful to prove before reattempting the original proof.

required
search_equivalent Callable[[Sequence[Fact], Fact], Opaque[P, Fact | None]]

take a collection of facts along with a new one, and return either the first fact of the list equivalent to the new fact or None. This is used to avoid spending search in proving equivalent facts.

required
redundant Callable[[Sequence[Fact], Fact], Opaque[P, bool]]

take a collection of established facts and decide whether they imply a new fact candidate. This is useful to avoid trying to prove and accumulating redundant facts.

required

Returns:

Type Description
Strategy[Abduction, P, Proof]

a proof of the top-level goal.

Source code in src/delphyne/stdlib/search/abduction.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def abduction[Fact, Feedback, Proof, P](
    prove: Callable[
        [Sequence[tuple[Fact, Proof]], Fact | None],
        Opaque[P, AbductionStatus[Feedback, Proof]],
    ],
    suggest: Callable[
        [Feedback],
        Opaque[P, Sequence[Fact]],
    ],
    search_equivalent: Callable[
        [Sequence[Fact], Fact], Opaque[P, Fact | None]
    ],
    redundant: Callable[[Sequence[Fact], Fact], Opaque[P, bool]],
    inner_policy_type: type[P] | None = None,
) -> dp.Strategy[Abduction, P, Proof]:
    """
    Higher-order strategy for proving a fact via recursive abduction.

    Arguments:
      prove: take a sequence of already established facts as an
        argument along with a new fact, and attempt to prove this new
        fact. Three outcomes are possible: the fact is proved,
        disproved, or a list of suggestions are made that might be
        helpful to prove first. `None` denotes the top-level goal to be
        proved.

      suggest: take some feedback from the `prove` function and return a
        sequence of fact candidates that may be useful to prove before
        reattempting the original proof.

      search_equivalent: take a collection of facts along with a new
        one, and return either the first fact of the list equivalent to
        the new fact or `None`. This is used to avoid spending search in
        proving equivalent facts.

      redundant: take a collection of established facts and decide
        whether they imply a new fact candidate. This is useful to avoid
        trying to prove and accumulating redundant facts.

    Returns:
      a proof of the top-level goal.
    """
    res = yield spawn_node(
        Abduction,
        prove=prove,
        suggest=suggest,
        search_equivalent=search_equivalent,
        redundant=redundant,
    )
    return cast(Proof, res)

abduct_and_saturate

abduct_and_saturate(
    tree: Tree[Abduction, P, Proof],
    env: PolicyEnv,
    policy: P,
    max_rollout_depth: int = 3,
    scoring_function: ScoringFunction = _default_scoring_function,
    log_steps: LogLevel | None = None,
    max_raw_suggestions_per_step: int | None = None,
    max_reattempted_candidates_per_propagation_step: int | None = None,
    max_consecutive_propagation_steps: int | None = None,
) -> StreamGen[Proof]

A saturation-based, sequential policy for abduction trees.

This policy proceeds by saturation: it repeatedly grows a set of proved facts until the main goal is proved or some limit is reached.

It does so by repeatedly performing rollouts. Each rollout starts with the toplevel goal as a target, and attempts to prove this target assuming all facts in proved. If the target cannot be proved, suggestions for auxilliary facts to prove first are requested before another attempt is made. If still unsuccessful, one of the unproved suggestions is set as the new target and the rollout proceeds (up to some depth specified by max_rollout_depth).

The algorithm maintains four, disjoint global sets of facts:

  • proved: facts that have been successfully proved
  • disproved: facts that have been disproved
  • redundant: facts that are implied by the conjunction of all facts from proved.
  • candidates: facts that have been suggested but do not belong to any of the three sets above.

Each step of a rollout proceeds as follows:

  • The current target is assumed to be a fact from the candidates set. Suggestions for new rollout targets are determined as follows (get_suggestions):
    • The suggest node function returns a list of candidates.
    • All suggestions are normalized using the search_equivalent node function (one call per suggestion).
    • Each normalized suggestion is added (add_candidate) to one of the proved, disproved, redundant, or candidates sets. At most one call to the prove and is_redundant node functions is made per suggestion.
    • Assuming the previous step results in at least one new fact being proved, all candidates from the candidates set are re-examined until saturation (saturate).
    • Remaining suggestions that are in candidates are potential taregts for the next rollout step.
  • Assuming the current target is still not proved, the next rollout target is picked using the scoring_function parameter.

Parameters:

Name Type Description Default
max_rollout_depth int

The maximum depth of a rollout, as the maximal number of consecutive target goals that can be set (the first goal being the toplevel goal).

3
scoring_function ScoringFunction

Scoring function for choosing the next target goal at the end of each rollout step.

_default_scoring_function
log_steps LogLevel | None

If not None, log main steps of the algorithm at the provided severity level.

None
max_raw_suggestions_per_step int | None

Maximum number of suggestions from the suggest node function to consider at each rollout step. If more suggestions are available, the most frequent (for naive, syntacic equality) ones are chosen.

None
max_reattempted_candidates_per_propagation_step int | None

Maximum number of candidates that are reattempted at each propagation step. Candidates that have been proposed more frequently are selected in priority.

None
max_consecutive_propagation_steps int | None

Maximum number of propagation steps that are performed during a rollout step, or None if there is no limit.

None

Warning

Facts must be hashable.

Warning

By design, this policy tries and makes as few calls to suggest as possible, since those typically involve LLM calls. However, by default, it can make a very large number of calls to prove, is_redundant and search_equivalent. This number can explode as the number of candidates increases (in particular, it can be quadratic in the number of candidates at each rollout step, due to saturation). Thus, we recommend setting proper limits using the hyperparameters whose name start with max_.

Note

No fact is attempted to be proved if it is redundant with already-proved facts. However, in the current implementation, the set of proved facts can still contain redundancy. For example, if x > 0 is established before the stronger x >= 0 is, the former won't be deleted.

Source code in src/delphyne/stdlib/search/abduction.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
@search_policy
def abduct_and_saturate[P, Proof](
    tree: dp.Tree[Abduction, P, Proof],
    env: PolicyEnv,
    policy: P,
    max_rollout_depth: int = 3,
    scoring_function: ScoringFunction = _default_scoring_function,
    log_steps: dp.LogLevel | None = None,
    max_raw_suggestions_per_step: int | None = None,
    max_reattempted_candidates_per_propagation_step: int | None = None,
    max_consecutive_propagation_steps: int | None = None,
) -> dp.StreamGen[Proof]:
    """
    A saturation-based, sequential policy for abduction trees.

    This policy proceeds by saturation: it repeatedly grows a set of
    proved facts until the main goal is proved or some limit is reached.

    It does so by repeatedly performing _rollouts_. Each rollout starts
    with the toplevel goal as a target, and attempts to prove this target
    assuming all facts in `proved`. If the target cannot be proved,
    suggestions for auxilliary facts to prove first are requested before
    another attempt is made. If still unsuccessful, one of the unproved
    suggestions is set as the new target and the rollout proceeds (up to
    some depth specified by `max_rollout_depth`).

    The algorithm maintains four, disjoint global sets of facts:

    - `proved`: facts that have been successfully proved
    - `disproved`: facts that have been disproved
    - `redundant`: facts that are implied by the conjunction of all
      facts from `proved`.
    - `candidates`: facts that have been suggested but do not belong to
      any of the three sets above.

    Each step of a rollout proceeds as follows:

    - The current target is assumed to be a fact from the `candidates`
      set. Suggestions for new rollout targets are determined as follows
      (`get_suggestions`):
        - The `suggest` node function returns a list of candidates.
        - All suggestions are normalized using the `search_equivalent`
          node function (one call per suggestion).
        - Each normalized suggestion is added (`add_candidate`) to one
          of the `proved`, `disproved`, `redundant`, or `candidates`
          sets. At most one call to the `prove` and `is_redundant` node
          functions is made per suggestion.
        - Assuming the previous step results in at least one new fact
          being proved, all candidates from the `candidates` set are
          re-examined until saturation (`saturate`).
        - Remaining suggestions that are in `candidates` are potential
          taregts for the next rollout step.
    - Assuming the current target is still not proved, the next rollout
      target is picked using the `scoring_function` parameter.

    Arguments:
        max_rollout_depth: The maximum depth of a rollout, as the
            maximal number of consecutive target goals that can be set
            (the first goal being the toplevel goal).
        scoring_function: Scoring function for choosing the next target
            goal at the end of each rollout step.
        log_steps: If not `None`, log main steps of the algorithm at the
            provided severity level.
        max_raw_suggestions_per_step: Maximum number of suggestions from
            the `suggest` node function to consider at each rollout
            step. If more suggestions are available, the most frequent
            (for naive, syntacic equality) ones are chosen.
        max_reattempted_candidates_per_propagation_step: Maximum number
            of candidates that are reattempted at each propagation step.
            Candidates that have been proposed more frequently are
            selected in priority.
        max_consecutive_propagation_steps: Maximum number of propagation
            steps that are performed during a rollout step, or `None` if
            there is no limit.

    !!! warning
        Facts must be hashable.

    !!! warning
        By design, this policy tries and makes as few calls to `suggest`
        as possible, since those typically involve LLM calls. However,
        by default, it can make a very large number of calls to `prove`,
        `is_redundant` and `search_equivalent`. This number can explode
        as the number of candidates increases (in particular, it can be
        quadratic in the number of candidates at each rollout step, due
        to saturation). Thus, we recommend setting proper limits using
        the hyperparameters whose name start with `max_`.

    !!! note
        No fact is attempted to be proved if it is redundant with
        already-proved facts. However, in the current implementation,
        the set of proved facts can still contain redundancy. For
        example, if `x > 0` is established before the stronger `x >= 0`
        is, the former won't be deleted.
    """

    # TODO: stop the rollout if the current goal is proved.

    # Initialize tool statistics tracking
    call_stats = _CallStats()

    # Invariant: `candidates`, `proved`, `disproved` and `redundant` are
    # disjoint. Together, they form the set of "canonical facts".
    candidates: dict[_EFact, _CandInfo] = {}
    proved: dict[_EFact, _Proof] = {}
    disproved: set[_EFact] = set()
    # Facts that are implied by the conjunction of all proved facts.
    redundant: set[_EFact] = set()

    # It is easier to manipulate untracked facts and so we keep the
    # correspondence with tracked facts here.
    # Invariant: all canonical facts are included in `tracked`.
    tracked: dict[_EFact, _Tracked_EFact] = {None: None}

    # The `equivalent` dict maps a fact to its canonical equivalent
    # representative that is somewhere in `candidates`, `proved`,
    # `disproved` or `redundant`.
    equivalent: dict[_EFact, _EFact] = {}

    # Can a new fact make a candidate redundant? YES. So we should also
    # do this in `propagate`

    assert max_rollout_depth >= 1
    assert isinstance(tree.node, Abduction)
    node = tree.node

    def compute_fact_stats() -> _FactStats:
        return _FactStats(
            num_candidates=len(candidates),
            num_proved=len(proved),
            num_disproved=len(disproved),
            num_redundant=len(redundant),
        )

    def dbg(msg: str):
        if log_steps:
            stats = {
                "facts_stats": compute_fact_stats(),
                "call_stats": call_stats,
            }
            env.log(log_steps, msg, stats)

    def log_call_stats():
        env.info("abduct_and_saturate_call_stats", call_stats)

    def all_canonical() -> Sequence[_EFact]:
        return [*candidates, *proved, *disproved, *redundant]

    def is_redundant(f: _EFact) -> dp.StreamContext[bool]:
        if f is None:
            return False
        call_stats.is_redundant_calls += 1
        start_time = time.time()
        respace = node.redundant([tracked[o] for o in proved], tracked[f])
        res = yield from respace.stream(env, policy).first()
        call_stats.is_redundant_time_in_seconds += time.time() - start_time
        if res is None:
            raise _Abort()
        return res.tracked.value

    def add_candidate(c: _EFact) -> dp.StreamContext[None]:
        # Take a new fact and put it into either `proved`, `disproved`,
        # `candidates` or `redundant`. If a canonical fact is passed,
        # nothing is done.
        if c in all_canonical():
            return
        # We first make a redundancy check
        if (yield from is_redundant(c)):
            dbg(f"Redundant: {c}")
            redundant.add(c)
            return
        # If not redundant, we try and prove it
        call_stats.prove_calls += 1
        start_time = time.time()
        facts_list = [(tracked[f], p) for f, p in proved.items()]
        pstream = node.prove(facts_list, tracked[c]).stream(env, policy)
        res = yield from pstream.first()
        call_stats.prove_time_in_seconds += time.time() - start_time
        if res is None:
            raise _Abort()
        status, payload = res.tracked[0], res.tracked[1]
        if status.value == "disproved":
            disproved.add(c)
            dbg(f"Disproved: {c}")
            if c is None:
                raise _Abort()
        elif status.value == "proved":
            proved[c] = payload
            dbg(f"Proved: {c}")
            if c is None:
                raise _ProofFound()
        else:
            candidates[c] = _CandInfo(payload, 0, 0)

    def propagate() -> dp.StreamContext[Literal["updated", "not_updated"]]:
        # Go through each candidate and see if it is now provable
        # assuming all established facts.
        dbg("Propagating...")
        old_candidates = candidates.copy()
        # Determining which candidates to reattempt
        M = max_reattempted_candidates_per_propagation_step
        if M is None:
            to_reattempt = old_candidates
            candidates.clear()
        else:
            to_reattempt_list = list(old_candidates.items())
            to_reattempt_list.sort(key=lambda x: -x[1].num_proposed)
            to_reattempt = dict(to_reattempt_list[:M])
            for c in to_reattempt:
                del candidates[c]
        for c, i in to_reattempt.items():
            yield from add_candidate(c)
            if c in candidates:
                # Restore the counters if `c` is still a candidate
                candidates[c].num_proposed = i.num_proposed
                candidates[c].num_visited = i.num_visited
        return (
            "updated"
            if len(candidates) != len(old_candidates)
            else "not_updated"
        )

    def saturate() -> dp.StreamContext[None]:
        # Propagate facts until saturation
        i = 0
        m = max_consecutive_propagation_steps
        while (m is None or i < m) and (yield from propagate()) == "updated":
            i += 1

    def get_canonical(f: _EFact) -> dp.StreamContext[_EFact]:
        # The result is guaranteed to be in `tracked`
        if f in proved or f in disproved or f in candidates:
            # Case where f is a canonical fact
            return f
        assert f is not None
        if f in equivalent:
            # Case where an equivalent canonical fact is known already
            nf = equivalent[f]
            assert nf in all_canonical()
            return equivalent[f]
        # New fact whose equivalence must be tested
        prev = [tracked[o] for o in all_canonical() if o is not None]
        if not prev:
            # First fact: no need to make equivalence call
            return f
        call_stats.search_equivalent_calls += 1
        start_time = time.time()
        eqspace = node.search_equivalent(prev, tracked[f])
        res = yield from eqspace.stream(env, policy).first()
        call_stats.search_equivalent_time_in_seconds += (
            time.time() - start_time
        )
        if res is None:
            raise _Abort()
        res = res.tracked
        if res.value is None:
            return f
        elif res.value in all_canonical():
            equivalent[f] = res.value
            return res.value
        else:
            env.error("invalid_equivalent_call")
            return f

    def get_raw_suggestions(c: _EFact) -> dp.StreamContext[Sequence[_EFact]]:
        assert c in candidates
        sstream = node.suggest(candidates[c].feedback).stream(env, policy)
        res = yield from sstream.all()
        if not res:
            # If no suggestions are returned, we are out of budget and
            # abort so as to not call this again in a loop.
            raise _Abort()
        tracked_suggs = [s for r in res for s in r.tracked]
        M = max_raw_suggestions_per_step
        if M is not None and len(tracked_suggs) > M:
            counts: dict[_Fact, int] = defaultdict(int)
            for s in tracked_suggs:
                counts[s.value] += 1
            tracked_suggs.sort(key=lambda x: counts[x.value], reverse=True)
            tracked_suggs = tracked_suggs[:M]
        # Populate the `tracked` cache (this is the only place where new
        # facts can be created and so the only place where `tracked`
        # must be updated).
        suggs = [s.value for s in tracked_suggs]
        dbg(f"Suggestions: {suggs}")
        for s, ts in zip(suggs, tracked_suggs):
            if s not in tracked:
                tracked[s] = ts
        return suggs

    def get_suggestions(c: _EFact) -> dp.StreamContext[dict[_EFact, int]]:
        # Return a dict representing a multiset of suggestions
        assert c in candidates
        raw_suggs = yield from get_raw_suggestions(c)
        suggs: list[_EFact] = []
        for s in raw_suggs:
            suggs.append((yield from get_canonical(s)))
        len_proved_old = len(proved)
        for s in suggs:
            yield from add_candidate(s)
        if len_proved_old != len(proved):
            assert len(proved) > len_proved_old
            yield from saturate()
        suggs = [s for s in suggs if s in candidates]
        suggs_multiset: dict[_EFact, int] = {}
        for s in suggs:
            if s not in suggs_multiset:
                suggs_multiset[s] = 0
            suggs_multiset[s] += 1
        dbg(f"Filtered: {suggs_multiset}")
        return suggs_multiset

    try:
        yield from add_candidate(None)
        while True:
            cur: _EFact = None
            for _ in range(max_rollout_depth):
                dbg(f"Explore fact: {cur}")
                suggs = yield from get_suggestions(cur)
                if not suggs or cur in proved:
                    break
                n = sum(suggs.values())
                for s, k in suggs.items():
                    candidates[s].num_proposed += k / n
                infos = [candidates[c] for c in suggs]
                best = _argmax(
                    scoring_function(i.num_proposed, i.num_visited)
                    for i in infos
                )
                cur = list(suggs.keys())[best]
                candidates[cur].num_visited += 1
    except _Abort:
        log_call_stats()
        return
    except _ProofFound:
        log_call_stats()
        action = proved[None]
        child = tree.child(action)
        assert isinstance(child.node, dp.Success)
        yield dp.Solution(child.node.success)
        return

ScoringFunction

Bases: Protocol

A function for assigning a score to candidate facts to prove, so that the fact with the highest score is chosen next.

Source code in src/delphyne/stdlib/search/abduction.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
class ScoringFunction(Protocol):
    """
    A function for assigning a score to candidate facts to prove, so
    that the fact with the highest score is chosen next.
    """

    def __call__(self, num_proposed: float, num_visited: float) -> float:
        """
        Arguments:
            num_proposed: Normalized number of times the fact was
                proposed by the `suggest` function. When the latter
                returns `n` suggestions, each suggestion's count is
                increased by `1/n`.
            num_visited: Number of times the fact was chosen as target
                in one step of a rollout.
        """
        ...

__call__

__call__(num_proposed: float, num_visited: float) -> float

Parameters:

Name Type Description Default
num_proposed float

Normalized number of times the fact was proposed by the suggest function. When the latter returns n suggestions, each suggestion's count is increased by 1/n.

required
num_visited float

Number of times the fact was chosen as target in one step of a rollout.

required
Source code in src/delphyne/stdlib/search/abduction.py
192
193
194
195
196
197
198
199
200
201
202
def __call__(self, num_proposed: float, num_visited: float) -> float:
    """
    Arguments:
        num_proposed: Normalized number of times the fact was
            proposed by the `suggest` function. When the latter
            returns `n` suggestions, each suggestion's count is
            increased by `1/n`.
        num_visited: Number of times the fact was chosen as target
            in one step of a rollout.
    """
    ...

_default_scoring_function

_default_scoring_function(num_proposed: float, num_visited: float) -> float

The default scoring function for fact candidates.

See ScoringFunction for details.

Source code in src/delphyne/stdlib/search/abduction.py
205
206
207
208
209
210
211
212
213
def _default_scoring_function(
    num_proposed: float, num_visited: float
) -> float:
    """
    The default scoring function for fact candidates.

    See `ScoringFunction` for details.
    """
    return -(num_visited / max(1, math.sqrt(num_proposed)))