Skip to content

Search Algorithms and Utilities

Strategies

interact

interact(
    step: Callable[
        [AnswerPrefix, InteractStats], Opaque[P, Response[A | WrappedParseError, T]]
    ],
    process: Callable[[A, InteractStats], Opaque[P, B | Error]],
    tools: Mapping[type[T], Callable[[Any], Opaque[P, Any]]] | None = None,
    inner_policy_type: type[P] | None = None,
) -> Strategy[Branch, P, B]

A standard strategy for creating conversational agents.

A common pattern for interacting with LLMs is to have multi-message exchanges where the full conversation history is resent repeatedly. LLMs are also often allowed to request tool call. This strategy implements this pattern. It is meant to be inlined into a wrapping strategy (since it is not decorated with strategy).

Attributes:

Name Type Description
step

a parametric opaque space, induced by a strategy or query that takes as an argument the current chat history (possibly empty) along with some statistics, and returns an answer to be processed. Oftentimes, this parametric opaque space is induced by a query with a special prefix field for receiving the chat history (see Query).

process

an opaque space induced by a query or strategy that is called on all model responses that are not tool calls, and which returns either a final response to be returned, or an error to be transmitted to the model as feedback (as an Error value with an absent or serializable meta field).

tools

a mapping from supported tool interfaces to implementations. Tools themselves can be implemented using arbitrary strategies or queries, allowing the integration of horizontal and vertical LLM pipelines.

inner_policy_type

Ambient inner policy type. This information is not used at runtime but it can be provided to help type inference when necessary.

Source code in src/delphyne/stdlib/search/interactive.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def interact[P, A, B, T: md.AbstractTool[Any]](
    step: Callable[
        [dp.AnswerPrefix, InteractStats],
        Opaque[P, dq.Response[A | WrappedParseError, T]],
    ],
    process: Callable[[A, InteractStats], Opaque[P, B | dp.Error]],
    tools: Mapping[type[T], Callable[[Any], Opaque[P, Any]]] | None = None,
    inner_policy_type: type[P] | None = None,
) -> dp.Strategy[Branch, P, B]:
    """
    A standard strategy for creating conversational agents.

    A common pattern for interacting with LLMs is to have multi-message
    exchanges where the full conversation history is resent repeatedly.
    LLMs are also often allowed to request tool call. This strategy
    implements this pattern. It is meant to be inlined into a wrapping
    strategy (since it is not decorated with `strategy`).

    Attributes:
        step: a parametric opaque space, induced by a strategy or query
            that takes as an argument the current chat history (possibly
            empty) along with some statistics, and returns an answer to
            be processed. Oftentimes, this parametric opaque space is
            induced by a query with a special `prefix` field for
            receiving the chat history (see `Query`).
        process: an opaque space induced by a query or strategy that is
            called on all model responses that are not tool calls, and
            which returns either a final response to be returned, or an
            error to be transmitted to the model as feedback (as an
            `Error` value with an absent or serializable `meta` field).
        tools: a mapping from supported tool interfaces to
            implementations. Tools themselves can be implemented using
            arbitrary strategies or queries, allowing the integration of
            horizontal and vertical LLM pipelines.
        inner_policy_type: Ambient inner policy type. This information
            is not used at runtime but it can be provided to help type
            inference when necessary.
    """

    prefix: dp.AnswerPrefix = []
    stats = InteractStats(num_rejected=0, num_tool_call_rounds=0)
    while True:
        resp = yield from branch(step(prefix, stats))
        prefix += [dp.OracleMessage("oracle", resp.answer)]
        match resp.parsed:
            case dq.FinalAnswer(a):
                if isinstance(a, WrappedParseError):
                    msg = dp.FeedbackMessage(
                        kind="feedback",
                        label=a.error.label,
                        description=a.error.description,
                        meta=a.error.meta,
                    )
                    stats.num_rejected += 1
                    prefix += [msg]
                else:
                    res = yield from branch(process(a, stats))
                    if isinstance(res, dp.Error):
                        msg = dp.FeedbackMessage(
                            kind="feedback",
                            label=res.label,
                            description=res.description,
                            meta=res.meta,
                        )
                        stats.num_rejected += 1
                        prefix += [msg]
                    else:
                        return res
            case dq.ToolRequests(tc):
                for i, t in enumerate(tc):
                    assert tools is not None
                    tres = yield from branch(tools[type(t)](t))
                    msg = dp.ToolResult(
                        "tool",
                        resp.answer.tool_calls[i],
                        t.render_result(tres),
                    )
                    prefix += [msg]
                stats.num_tool_call_rounds += 1

Policies

dfs

dfs(
    tree: Tree[Branch | Fail, P, T],
    env: PolicyEnv,
    policy: P,
    max_depth: int | None = None,
    max_branching: int | None = None,
) -> StreamGen[T]

The Standard Depth-First Search Algorithm.

Whenever a branching node is encountered, branching candidates are lazily enumerated and the corresponding child recursively searched.

Attributes:

Name Type Description
max_depth optional

maximum number of branching nodes that can be traversed in a path to success.

max_branching optional

maximum number of children explored at each branching node.

Source code in src/delphyne/stdlib/search/dfs.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@search_policy
def dfs[P, T](
    tree: Tree[Branch | Fail, P, T],
    env: PolicyEnv,
    policy: P,
    max_depth: int | None = None,
    max_branching: int | None = None,
) -> StreamGen[T]:
    """
    The Standard Depth-First Search Algorithm.

    Whenever a branching node is encountered, branching candidates are
    lazily enumerated and the corresponding child recursively searched.

    Attributes:
        max_depth (optional): maximum number of branching nodes
            that can be traversed in a path to success.
        max_branching (optional): maximum number of children explored at
            each branching node.
    """
    assert max_branching is None or max_branching > 0
    match tree.node:
        case Success(x):
            yield Solution(x)
        case Fail():
            pass
        case Branch(cands):
            if max_depth is not None and max_depth <= 0:
                return
            cands = cands.stream(env, policy)
            if max_branching is not None:
                cands = cands.take(max_branching, strict=True)
            yield from cands.bind(
                lambda a: dfs(
                    max_depth=max_depth - 1 if max_depth is not None else None,
                    max_branching=max_branching,
                )(tree.child(a.tracked), env, policy).gen()
            ).gen()
        case _:
            unsupported_node(tree.node)

par_dfs

par_dfs(tree: Tree[Branch | Fail, P, T], env: PolicyEnv, policy: P) -> StreamGen[T]

Parallel Depth-First Search.

Whenever a branching node is encountered, all branching candidates are computed at once and the associated children are explored in parallel.

Source code in src/delphyne/stdlib/search/dfs.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@search_policy
def par_dfs[P, T](
    tree: Tree[Branch | Fail, P, T],
    env: PolicyEnv,
    policy: P,
) -> StreamGen[T]:
    """
    Parallel Depth-First Search.

    Whenever a branching node is encountered, all branching candidates
    are computed at once and the associated children are explored in
    parallel.
    """
    match tree.node:
        case Success(x):
            yield Solution(x)
        case Fail():
            pass
        case Branch(cands):
            cands = yield from cands.stream(env, policy).all()
            yield from Stream.parallel(
                [par_dfs()(tree.child(a.tracked), env, policy) for a in cands]
            ).gen()
        case _:
            unsupported_node(tree.node)

Combinators

sequence

sequence(
    policies: Iterable[PromptingPolicy], *, stop_on_reject: bool = True
) -> PromptingPolicy
sequence(
    policies: Iterable[SearchPolicy[N]], *, stop_on_reject: bool = True
) -> SearchPolicy[N]
sequence(
    policies: Iterable[Policy[N, P]], *, stop_on_reject: bool = True
) -> Policy[N, P]
sequence(policies: Iterable[_AnyPolicy], *, stop_on_reject: bool = True) -> _AnyPolicy

Try a list of policies, search policies, or prompting policies in sequence.

  • policies: An iterable of policies, search policies, or prompting policies to try in sequence.
  • stop_on_reject: If True, stop the sequence as soon as one policy sees all its resource requests denied. Note that this is necessary for termination when policies is an infinite iterator.
Source code in src/delphyne/stdlib/misc.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def sequence(
    policies: Iterable[_AnyPolicy],
    *,
    stop_on_reject: bool = True,
) -> _AnyPolicy:
    """
    Try a list of policies, search policies, or prompting policies in
    sequence.

    Attributes:
    - policies: An iterable of policies, search policies, or prompting
          policies to try in sequence.
    - stop_on_reject: If True, stop the sequence as soon as one policy
          sees all its resource requests denied. Note that this is
          necessary for termination when `policies` is an infinite
          iterator.
    """

    it = iter(policies)
    first = next(it)
    if isinstance(first, PromptingPolicy):
        return sequence_prompting_policies(
            cast(Iterable[PromptingPolicy], policies),
            stop_on_reject=stop_on_reject,
        )
    elif isinstance(first, SearchPolicy):
        return sequence_search_policies(
            cast(Iterable[SearchPolicy[Any]], policies),
            stop_on_reject=stop_on_reject,
        )
    else:
        assert isinstance(first, Policy)
        return sequence_policies(
            cast(Iterable[Policy[Any, Any]], policies),
            stop_on_reject=stop_on_reject,
        )

or_else

or_else(main: SearchPolicy[N], other: SearchPolicy[N]) -> SearchPolicy[N]
or_else(main: Policy[N, P], other: Policy[N, P]) -> Policy[N, P]
or_else(main: _AnyPolicy, other: _AnyPolicy) -> _AnyPolicy

Take two policies, search policies, or prompting policies as arguments. Try the first one, and then the second one only if it fails (i.e., it does not produce any solution).

Source code in src/delphyne/stdlib/misc.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
def or_else(main: _AnyPolicy, other: _AnyPolicy) -> _AnyPolicy:
    """
    Take two policies, search policies, or prompting policies as
    arguments. Try the first one, and then the second one only if it
    fails (i.e., it does not produce any solution).
    """
    if isinstance(main, PromptingPolicy):
        assert isinstance(other, PromptingPolicy)
        return prompting_policy_or_else(main, other)
    elif isinstance(main, SearchPolicy):
        assert isinstance(other, SearchPolicy)
        return search_policy_or_else(main, other)
    else:
        return policy_or_else(main, other)  # type: ignore

nofail

nofail(space: Opaque[P, A], *, default: B) -> Opaque[P, A | B]

Modify an opaque space to that branching over it can never fail.

If the stream associated with the opaque space gets exhausted and no solution is produced, the provided default value is used.

In demonstrations, the default value can be selected by using the #no_fail_default hint.

Source code in src/delphyne/stdlib/misc.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
def nofail[P, A, B](space: Opaque[P, A], *, default: B) -> Opaque[P, A | B]:
    """
    Modify an opaque space to that branching over it can never fail.

    If the stream associated with the opaque space gets exhausted and no
    solution is produced, the provided default value is used.

    In demonstrations, the default value can be selected by using the
    `#no_fail_default` hint.
    """
    try_policy = dfs() @ elim_flag(NoFailFlag, "no_fail_try")
    def_policy = dfs() @ elim_flag(NoFailFlag, "no_fail_default")
    search_policy = or_else(try_policy, def_policy)
    return nofail_strategy(space, default=default).using(
        lambda p: search_policy & p
    )

iterate

iterate(
    next: Callable[[S | None], Opaque[P, tuple[T | None, S]]],
    transform_stream: Callable[[P], StreamTransformer | None] | None = None,
) -> Opaque[P, T]

Iteratively call a strategy or query, repeatedly feeding back the last call's output state into a new call and yielding values along the way.

A standard use case is to repeatedly call a query or strategy with a blacklist of previously generated values, so as to produce diverse success values.

Parameters:

Name Type Description Default
next Callable[[S | None], Opaque[P, tuple[T | None, S]]]

A parametric opaque space, induced by a query or stratey that takes a state as an input (or None initially) and outputs a new state, along with a generated value.

required
transform_stream Callable[[P], StreamTransformer | None] | None

An optional mapping from the inner policy to a stream transformer to be applied to the resulting stream of generated values.

None

Returns:

Type Description
Opaque[P, T]

An opaque space enumerating all generated values.

Source code in src/delphyne/stdlib/search/iteration.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def iterate[P, S, T](
    next: Callable[[S | None], Opaque[P, tuple[T | None, S]]],
    transform_stream: Callable[[P], StreamTransformer | None] | None = None,
) -> Opaque[P, T]:
    """
    Iteratively call a strategy or query, repeatedly feeding back the
    last call's output state into a new call and yielding values along
    the way.

    A standard use case is to repeatedly call a query or strategy with a
    blacklist of previously generated values, so as to produce diverse
    success values.

    Arguments:
        next: A parametric opaque space, induced by a query or stratey
            that takes a state as an input (or `None` initially) and
            outputs a new state, along with a generated value.
        transform_stream: An optional mapping from the inner policy to a
            stream transformer to be applied to the resulting stream of
            generated values.

    Returns:
        An opaque space enumerating all generated values.
    """

    def iterate_policy(inner_policy: P):
        policy = _search_iteration()
        if transform_stream is not None:
            trans = transform_stream(inner_policy)
            if trans is not None:
                policy = trans @ policy
        return policy & inner_policy

    return _iterate(next).using(iterate_policy)

Opaque Spaces Sugar

just_dfs

just_dfs(policy: P) -> Policy[Branch | Fail, P]

Convenience shortcut to avoid passing lambdas to the get_policy argument of using, when using DFS in combination with the ambient inner policy.

Source code in src/delphyne/stdlib/misc.py
60
61
62
63
64
65
66
def just_dfs[P](policy: P) -> Policy[Branch | Fail, P]:
    """
    Convenience shortcut to avoid passing lambdas to the `get_policy`
    argument of `using`, when using DFS in combination with the ambient
    inner policy.
    """
    return dfs() & policy

just_compute

just_compute(policy: P) -> Policy[Compute, P]

Convenience shortcut to avoid passing lambdas to the get_policy argument of using, in the case of sub-strategies that only feature the Compute effect.

Source code in src/delphyne/stdlib/misc.py
69
70
71
72
73
74
75
def just_compute[P](policy: P) -> Policy[Compute, P]:
    """
    Convenience shortcut to avoid passing lambdas to the `get_policy`
    argument of `using`, in the case of sub-strategies that only feature
    the `Compute` effect.
    """
    return dfs() @ elim_compute() & policy

ambient_pp

ambient_pp(policy: PromptingPolicy) -> PromptingPolicy

Convenience shortcut to avoid passing lambdas to the get_policy argument of Query.using, when using the ambient inner policy as a prompting policy.

Source code in src/delphyne/stdlib/misc.py
78
79
80
81
82
83
84
def ambient_pp(policy: PromptingPolicy) -> PromptingPolicy:
    """
    Convenience shortcut to avoid passing lambdas to the `get_policy`
    argument of `Query.using`, when using the ambient inner policy as a
    prompting policy.
    """
    return policy

ambient

ambient(policy: F) -> F

Convenience shortcut to avoid passing lambdas to the get_policy argument of Query.using, when using the ambient inner policy as a sub-policy (or as a sub- prompting policy).

Source code in src/delphyne/stdlib/misc.py
87
88
89
90
91
92
93
def ambient[F](policy: F) -> F:
    """
    Convenience shortcut to avoid passing lambdas to the `get_policy`
    argument of `Query.using`, when using the ambient inner policy as a
    sub-policy (or as a sub- prompting policy).
    """
    return policy

Universal Queries

UniversalQuery dataclass

Bases: Query[object]

A universal query, implicitly defined by the surrounding context of its call. See guess for more information.

Attributes:

Name Type Description
strategy str

Fully qualified name of the surrounding strategy (e.g., my_package.my_module.my_strategy).

expected_type str

A string rendition of the expected answer type.

tags Sequence[str]

Tags associated with the space induced by the query, which can be used to locate the exact location where the query is issued (the default tag takes the name of the variable that the query result is assigned to).

locals dict[str, object]

A dictionary that provides the values of a subset of local variables or expressions (as JSON values).

Experimental

This feature is experimental and subject to change.

Source code in src/delphyne/stdlib/universal_queries.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@dataclass
class UniversalQuery(Query[object]):
    """
    A universal query, implicitly defined by the surrounding context of
    its call. See `guess` for more information.

    Attributes:
        strategy: Fully qualified name of the surrounding strategy
            (e.g., `my_package.my_module.my_strategy`).
        expected_type: A string rendition of the expected answer type.
        tags: Tags associated with the space induced by the query, which
            can be used to locate the exact location where the query is
            issued (the default tag takes the name of the variable that
            the query result is assigned to).
        locals: A dictionary that provides the values of a subset of
            local variables or expressions (as JSON values).

    !!! warning "Experimental"
        This feature is experimental and subject to change.
    """

    # TODO: add a `context` field where we store objects whose
    # documentation or source should be added to the prompt.

    strategy: str
    expected_type: str
    tags: Sequence[str]
    locals: dict[str, object]

    __parser__ = last_code_block.yaml

    @override
    def default_tags(self):
        return self.tags

    @property
    def strategy_source(self) -> str:
        """
        Return the source code of the strategy that contains this query.
        """
        strategy_obj = _load_from_qualified_name(self.strategy)
        assert callable(strategy_obj)
        return _source_code(strategy_obj)

strategy_source property

strategy_source: str

Return the source code of the strategy that contains this query.

guess

guess(
    annot: type[T], /, *, using: Sequence[object]
) -> Strategy[Branch | Fail, IPDict, T]
guess(
    annot: TypeAnnot[Any], /, *, using: Sequence[object]
) -> Strategy[Branch | Fail, IPDict, Any]
guess(
    annot: TypeAnnot[Any], /, *, using: Sequence[object]
) -> Strategy[Branch | Fail, IPDict, Any]

Attempt to guess a value of a given type, using the surrounding context of the call site along with the value of some local variables or expressions.

This function inspects the call stack to determine the context in which it is called and issues a UniversalQuery, with a tag corresponding to the name of the assigned variable. A failure node is issued if the oracle result cannot be parsed into the expected type. For example:

res = yield from guess(int, using=[x, y.summary()])

issues a UniversalQuery query tagged res, with attribute locals a dictionary with string keys "x" and "y.summary()".

Attributes:

Name Type Description
annot

The expected type of the value to be guessed.

using

A sequence of local variables or expressions whose value should be communicated to the oracle (a label for each expression is automatically generated using source information).

Note

Our use of an overloaded type should not be necessary anymore when TypeExpr is released with Python 3.14.

Experimental

This feature is experimental and subject to change.

Source code in src/delphyne/stdlib/universal_queries.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def guess(
    annot: TypeAnnot[Any], /, *, using: Sequence[object]
) -> dp.Strategy[Branch | Fail, IPDict, Any]:
    """
    Attempt to guess a value of a given type, using the surrounding
    context of the call site along with the value of some local
    variables or expressions.

    This function inspects the call stack to determine the context in
    which it is called and issues a `UniversalQuery`, with a tag
    corresponding to the name of the assigned variable. A failure node is
    issued if the oracle result cannot be parsed into the expected type.
    For example:

    ```python
    res = yield from guess(int, using=[x, y.summary()])
    ```

    issues a `UniversalQuery` query tagged `res`, with attribute
    `locals` a dictionary with string keys `"x"` and `"y.summary()"`.

    Attributes:
        annot: The expected type of the value to be guessed.
        using: A sequence of local variables or expressions whose value
            should be communicated to the oracle (a label for each
            expression is automatically generated using source information).

    !!! note
        Our use of an overloaded type should not be necessary anymore
        when `TypeExpr` is released with Python 3.14.

    !!! warning "Experimental"
        This feature is experimental and subject to change.
    """

    # Extracting the name of the surrounding strategy
    strategy = surrounding_qualname(skip=1)
    assert strategy is not None
    strategy = _rename_main_module_in_qualified_name(strategy)

    # Extracting the name of the variable being assigned
    cur_instr_src = current_instruction_source(skip=1)
    ret_val_name = assigned_var_name(cur_instr_src)
    assert isinstance(ret_val_name, str)

    # Computing the 'locals' dictionary
    guess_args = call_argument_sources(cur_instr_src, guess)
    assert guess_args is not None
    _args, kwargs = guess_args
    using_args = _parse_list_of_ids(kwargs["using"])
    assert len(using) == len(using_args)
    locals = {k: pydantic_dump(type(v), v) for k, v in zip(using_args, using)}

    # Building the query
    query = UniversalQuery(strategy, str(annot), [ret_val_name], locals)

    ret = yield from branch(query.using(...))
    try:
        parsed = pydantic_load(annot, ret)
    except Exception as e:
        assert_never((yield from fail("parse_error", message=str(e))))
    return parsed

Abduction

Abduction dataclass

Bases: Node

Node for the singleton tree produced by abduction. See abduction for details.

An action is a successful proof of the main goal.

Source code in src/delphyne/stdlib/search/abduction.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@dataclass
class Abduction(dp.Node):
    """
    Node for the singleton tree produced by `abduction`.
    See `abduction` for details.

    An action is a successful proof of the main goal.
    """

    prove: Callable[
        [Sequence[tuple[_Fact, _Proof]], _Fact | None],
        OpaqueSpace[Any, _Status],
    ]
    suggest: Callable[
        [_Feedback],
        OpaqueSpace[Any, Sequence[_Fact]],
    ]
    search_equivalent: Callable[
        [Sequence[_Fact], _Fact],
        OpaqueSpace[Any, _Fact | None],
    ]
    redundant: Callable[
        [Sequence[_Fact], _Fact],
        OpaqueSpace[Any, bool],
    ]

    def navigate(self) -> dp.Navigation:
        def aux(fact: dp.Tracked[_Fact] | None) -> dp.Navigation:
            # Take a fact as an argument and return a list of
            # (proved_fact, proof) pairs.
            res = yield self.prove([], fact)
            status, payload = res[0], res[1]
            if status.value == "proved":
                return [(fact, payload)]
            elif status.value == "disproved":
                return []
            else:
                assert status.value == "feedback"
                feedback = payload
                suggestions = yield self.suggest(feedback)
                proved: list[Any] = []
                for s in suggestions:
                    extra: Any = yield from aux(s)
                    proved.extend(extra)
                res = yield self.prove(proved, fact)
                status, payload = res[0], res[1]
                if status.value == "proved":
                    proved.append((fact, payload))
                return _remove_duplicates(proved, by=lambda x: drop_refs(x[0]))

        proved: Any = yield from aux(None)
        main_proof = _find_assoc(proved, None)
        if main_proof is None:
            raise dp.NavigationError(
                "No proof for the main goal was produced."
            )
        return main_proof

abduction

abduction(
    prove: Callable[
        [Sequence[tuple[Fact, Proof]], Fact | None],
        Opaque[P, AbductionStatus[Feedback, Proof]],
    ],
    suggest: Callable[[Feedback], Opaque[P, Sequence[Fact]]],
    search_equivalent: Callable[[Sequence[Fact], Fact], Opaque[P, Fact | None]],
    redundant: Callable[[Sequence[Fact], Fact], Opaque[P, bool]],
    inner_policy_type: type[P] | None = None,
) -> Strategy[Abduction, P, Proof]

Higher-order strategy for proving a fact via recursive abduction.

Parameters:

Name Type Description Default
prove Callable[[Sequence[tuple[Fact, Proof]], Fact | None], Opaque[P, AbductionStatus[Feedback, Proof]]]

take a sequence of already established facts as an argument along with a new fact, and attempt to prove this new fact. Three outcomes are possible: the fact is proved, disproved, or a list of suggestions are made that might be helpful to prove first. None denotes the top-level goal to be proved.

required
suggest Callable[[Feedback], Opaque[P, Sequence[Fact]]]

take some feedback from the prove function and return a sequence of fact candidates that may be useful to prove before reattempting the original proof.

required
search_equivalent Callable[[Sequence[Fact], Fact], Opaque[P, Fact | None]]

take a collection of facts along with a new one, and return either the first fact of the list equivalent to the new fact or None. This is used to avoid spending search in proving equivalent facts.

required
redundant Callable[[Sequence[Fact], Fact], Opaque[P, bool]]

take a collection of established facts and decide whether they imply a new fact candidate. This is useful to avoid trying to prove and accumulating redundant facts.

required

Returns:

Type Description
Strategy[Abduction, P, Proof]

a proof of the top-level goal.

Source code in src/delphyne/stdlib/search/abduction.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def abduction[Fact, Feedback, Proof, P](
    prove: Callable[
        [Sequence[tuple[Fact, Proof]], Fact | None],
        Opaque[P, AbductionStatus[Feedback, Proof]],
    ],
    suggest: Callable[
        [Feedback],
        Opaque[P, Sequence[Fact]],
    ],
    search_equivalent: Callable[
        [Sequence[Fact], Fact], Opaque[P, Fact | None]
    ],
    redundant: Callable[[Sequence[Fact], Fact], Opaque[P, bool]],
    inner_policy_type: type[P] | None = None,
) -> dp.Strategy[Abduction, P, Proof]:
    """
    Higher-order strategy for proving a fact via recursive abduction.

    Arguments:
      prove: take a sequence of already established facts as an
        argument along with a new fact, and attempt to prove this new
        fact. Three outcomes are possible: the fact is proved,
        disproved, or a list of suggestions are made that might be
        helpful to prove first. `None` denotes the top-level goal to be
        proved.

      suggest: take some feedback from the `prove` function and return a
        sequence of fact candidates that may be useful to prove before
        reattempting the original proof.

      search_equivalent: take a collection of facts along with a new
        one, and return either the first fact of the list equivalent to
        the new fact or `None`. This is used to avoid spending search in
        proving equivalent facts.

      redundant: take a collection of established facts and decide
        whether they imply a new fact candidate. This is useful to avoid
        trying to prove and accumulating redundant facts.

    Returns:
      a proof of the top-level goal.
    """
    res = yield spawn_node(
        Abduction,
        prove=prove,
        suggest=suggest,
        search_equivalent=search_equivalent,
        redundant=redundant,
    )
    return cast(Proof, res)

abduct_and_saturate

abduct_and_saturate(
    tree: Tree[Abduction, P, Proof],
    env: PolicyEnv,
    policy: P,
    max_rollout_depth: int = 3,
    scoring_function: ScoringFunction = _default_scoring_function,
    verbose: bool = False,
) -> StreamGen[Proof]

A standard, sequential policy to process abduction nodes.

Note: facts must be hashable.

Source code in src/delphyne/stdlib/search/abduction.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
@search_policy
def abduct_and_saturate[P, Proof](
    tree: dp.Tree[Abduction, P, Proof],
    env: PolicyEnv,
    policy: P,
    max_rollout_depth: int = 3,
    scoring_function: ScoringFunction = _default_scoring_function,
    verbose: bool = False,
) -> dp.StreamGen[Proof]:
    """
    A standard, sequential policy to process abduction nodes.

    Note: facts must be hashable.
    """

    # TODO: we are currently allowing redundant facts in `proved` since
    # we never clean up `proved`. For example, if `x > 0` is established
    # before the stronger `x >= 0`, the former won't be deleted from
    # `proved`.

    # Invariant: `candidates`, `proved`, `disproved` and `redundant` are
    # disjoint. Together, they form the set of "canonical facts".
    candidates: dict[_EFact, _CandInfo] = {}
    proved: dict[_EFact, _Proof] = {}
    disproved: set[_EFact] = set()
    # Facts that are implied by the conjunction of all proved facts.
    redundant: set[_EFact] = set()

    # It is easier to manipulate untracked facts and so we keep the
    # correspondence with tracked facts here.
    # Invariant: all canonical facts are included in `tracked`.
    tracked: dict[_EFact, _Tracked_EFact] = {None: None}

    # The `equivalent` dict maps a fact to its canonical equivalent
    # representative that is somewhere in `candidates`, `proved`,
    # `disproved` or `redundant`.
    equivalent: dict[_EFact, _EFact] = {}

    # Can a new fact make a candidate redundant? YES. So we should also
    # do this in `propagate`

    assert isinstance(tree.node, Abduction)
    node = tree.node

    def dbg(msg: str):
        if verbose:
            log(env, msg)

    def all_canonical() -> Sequence[_EFact]:
        return [*candidates, *proved, *disproved, *redundant]

    def is_redundant(f: _EFact) -> dp.StreamContext[bool]:
        if f is None:
            return False
        respace = node.redundant([tracked[o] for o in proved], tracked[f])
        res = yield from respace.stream(env, policy).first()
        if res is None:
            raise _Abort()
        return res.tracked.value

    def add_candidate(c: _EFact) -> dp.StreamContext[None]:
        # Take a new fact and put it into either `proved`, `disproved`,
        # `candidates` or `redundant`. If a canonical fact is passed,
        # nothing is done.
        if c in all_canonical():
            return
        # We first make a redundancy check
        if (yield from is_redundant(c)):
            dbg(f"Redundant: {c}")
            redundant.add(c)
            return
        # If not redundant, we try and prove it
        facts_list = [(tracked[f], p) for f, p in proved.items()]
        pstream = node.prove(facts_list, tracked[c]).stream(env, policy)
        res = yield from pstream.first()
        if res is None:
            raise _Abort()
        status, payload = res.tracked[0], res.tracked[1]
        if status.value == "disproved":
            disproved.add(c)
            dbg(f"Disproved: {c}")
            if c is None:
                raise _Abort()
        elif status.value == "proved":
            proved[c] = payload
            dbg(f"Proved: {c}")
            if c is None:
                raise _ProofFound()
        else:
            candidates[c] = _CandInfo(payload, 0, 0)

    def propagate() -> dp.StreamContext[Literal["updated", "not_updated"]]:
        # Go through each candidate and see if it is now provable
        # assuming all established facts.
        old_candidates = candidates.copy()
        candidates.clear()
        for c, i in old_candidates.items():
            yield from add_candidate(c)
            if c in candidates:
                # Restore the counters if `c` is still a candidate
                candidates[c].num_proposed = i.num_proposed
                candidates[c].num_visited = i.num_visited
        return (
            "updated"
            if len(candidates) != len(old_candidates)
            else "not_updated"
        )

    def saturate() -> dp.StreamContext[None]:
        # Propagate facts until saturation
        while (yield from propagate()) == "updated":
            pass

    def get_canonical(f: _EFact) -> dp.StreamContext[_EFact]:
        # The result is guaranteed to be in `tracked`
        if f in proved or f in disproved or f in candidates:
            # Case where f is a canonical fact
            return f
        assert f is not None
        if f in equivalent:
            # Case where an equivalent canonical fact is known already
            nf = equivalent[f]
            assert nf in all_canonical()
            return equivalent[f]
        # New fact whose equivalence must be tested
        prev = [tracked[o] for o in all_canonical() if o is not None]
        if not prev:
            # First fact: no need to make equivalence call
            return f
        eqspace = node.search_equivalent(prev, tracked[f])
        res = yield from eqspace.stream(env, policy).first()
        if res is None:
            raise _Abort()
        res = res.tracked
        if res.value is None:
            return f
        elif res.value in all_canonical():
            equivalent[f] = res.value
            return res.value
        else:
            log(env, "invalid_equivalent_call")
            return f

    def get_raw_suggestions(c: _EFact) -> dp.StreamContext[Sequence[_EFact]]:
        assert c in candidates
        sstream = node.suggest(candidates[c].feedback).stream(env, policy)
        res = yield from sstream.all()
        if not res:
            # If no suggestions are returned, we are out of budget and
            # abort so as to not call this again in a loop.
            raise _Abort()
        tracked_suggs = [s for r in res for s in r.tracked]
        # Populate the `tracked` cache (this is the only place where new
        # facts can be created and so the only place where `tracked`
        # must be updated).
        suggs = [s.value for s in tracked_suggs]
        dbg(f"Suggestions: {suggs}")
        for s, ts in zip(suggs, tracked_suggs):
            if s not in tracked:
                tracked[s] = ts
        return suggs

    def get_suggestions(c: _EFact) -> dp.StreamContext[dict[_EFact, int]]:
        # Return a dict representing a multiset of suggestions
        assert c in candidates
        raw_suggs = yield from get_raw_suggestions(c)
        suggs: list[_EFact] = []
        for s in raw_suggs:
            suggs.append((yield from get_canonical(s)))
        len_proved_old = len(proved)
        for s in suggs:
            yield from add_candidate(s)
        if len_proved_old != len(proved):
            assert len(proved) > len_proved_old
            yield from saturate()
        suggs = [s for s in suggs if s in candidates]
        suggs_multiset: dict[_EFact, int] = {}
        for s in suggs:
            if s not in suggs_multiset:
                suggs_multiset[s] = 0
            suggs_multiset[s] += 1
        dbg(f"Filtered: {suggs_multiset}")
        return suggs_multiset

    try:
        yield from add_candidate(None)
        while True:
            cur: _EFact = None
            for _ in range(max_rollout_depth):
                dbg(f"Explore fact: {cur}")
                suggs = yield from get_suggestions(cur)
                if not suggs:
                    break
                n = sum(suggs.values())
                for s, k in suggs.items():
                    candidates[s].num_proposed += k / n
                infos = [candidates[c] for c in suggs]
                best = _argmax(
                    scoring_function(i.num_proposed, i.num_visited)
                    for i in infos
                )
                cur = list(suggs.keys())[best]
                candidates[cur].num_visited += 1
    except _Abort:
        return
    except _ProofFound:
        action = proved[None]
        child = tree.child(action)
        assert isinstance(child.node, dp.Success)
        yield dp.Solution(child.node.success)
        return