Skip to content

Stream Combinators

Search Streams

Stream dataclass

Bases: AbstractStream[T]

A search stream produced by a search policy or prompting policy.

This class inherits AbstractStream and supports various methods and combinators for assembling streams, while guaranteeing adherence to the search stream protocol.

Attributes:

Name Type Description
_generate Callable[[], StreamGen[T]]

A zeroary function that produces a stream generator.

Source code in src/delphyne/stdlib/streams.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
@dataclass(frozen=True)
class Stream[T](dp.AbstractStream[T]):
    """
    A search stream produced by a search policy or prompting policy.

    This class inherits `AbstractStream` and supports various methods
    and combinators for assembling streams, while guaranteeing adherence
    to the [search stream protocol][delphyne.core.streams].

    Attributes:
        _generate: A zeroary function that produces a stream generator.
    """

    _generate: Callable[[], dp.StreamGen[T]]

    @override
    def gen(self) -> dp.StreamGen[T]:
        return self._generate()

    ## Collecting all elements

    def collect(
        self,
        budget: dp.BudgetLimit | None = None,
        num_generated: int | None = None,
    ) -> tuple[Sequence[dp.Solution[T]], dp.Budget]:
        """
        Exhaust a stream and collect all generated solutions.

        Attributes:
            budget (optional): Budget limit (see `with_budget` method).
            num_generated (optional): Number of solutions to generate
                (see `take` method).

        Returns:
            A sequence of solutions along with the total spent budget.

        !!! warning
            This function must only be used at top-level and *not*
            within the definition of a search stream generator (in which
            case the consumed resources won't be accounted for in the
            parent stream). See `Stream.all`.
        """
        if budget is not None:
            self = self.with_budget(budget)
        if num_generated is not None:
            self = self.take(num_generated)
        return stream_collect(self.gen())

    ## Transforming the stream

    def with_budget(self, budget: dp.BudgetLimit):
        """
        Return an identical stream that denies all spending requests
        once a given amount of budget is spent.

        **Guarantees**: if all budget spending over-estimates passed to
        `spend_on` are accurate, the given budget limit is rigorously
        respected. If not, the spent amount may exceed the budget by an
        amount of `Delta * N`, where `Delta` is the maximum estimation
        error and `N` is the concurrency level of the stream (1 if
        `Stream.parallel` is never used).
        """
        return Stream(lambda: stream_with_budget(self.gen(), budget))

    def take(self, num_generated: int, strict: bool = True):
        """
        Return an identical stream that terminates once a given number
        of solution is generated. If `strict` is set to `False`, more
        solutions can be returned, provided that no additional budget
        must be spent for generating them.
        """
        return Stream(lambda: stream_take(self.gen(), num_generated, strict))

    def loop(
        self, n: int | None = None, *, stop_on_reject: bool = True
    ) -> "Stream[T]":
        """
        Repeatedly execute a stream.

        Arguments:
            n (optional): Number of times to repeat the stream. By
                default, the stream is repeated indefinitely.
            stop_on_reject: If set to `True` (default), the resulting
                stream stops after the first iteration during which no
                spending request was granted. This guarantees
                termination, even if `n` is `None`.
        """
        it = itertools.count() if n is None else range(n)
        return Stream(
            lambda: stream_sequence(
                (self.gen for _ in it), stop_on_reject=stop_on_reject
            )
        )

    def bind[U](
        self, f: Callable[[dp.Solution[T]], dp.StreamGen[U]]
    ) -> "Stream[U]":
        """
        Apply a function to all generated solutions of a stream and
        concatenate the resulting streams.

        This is analogous to the `concat_map` funtion on lists:

            def concat_map(f, xs):
                return [y for x in xs for y in f(x)]
        """
        return Stream(lambda: stream_bind(self.gen(), f))

    ## Monadic Methods

    def first(self) -> dp.StreamContext[dp.Solution[T] | None]:
        """
        Obtain the first solution from a stream, or return `None` if the
        stream terminates without yielding any solution.
        """
        return stream_first(self.gen())

    def all(self) -> dp.StreamContext[Sequence[dp.Solution[T]]]:
        """
        Obtain all solutions from a stream.
        """
        return stream_all(self.gen())

    def next(
        self,
    ) -> dp.StreamContext[
        "tuple[Sequence[dp.Solution[T]], dp.Budget, Stream[T] | None]"
    ]:
        """
        Make an atomic attempt to obtain a solution from the stream,
        stopping right before a second spending request is made.

        Return a sequence of generated solutions, the total spent
        budget, and the remaining stream, if any.
        """
        gen, budg, rest = yield from stream_next(self.gen())
        new_rest = None if rest is None else Stream(lambda: rest)
        return gen, budg, new_rest

    ## Static Methods

    @staticmethod
    def sequence[U](
        streams: Iterable["Stream[U]"], *, stop_on_reject: bool = True
    ) -> "Stream[U]":
        """
        Concatenate all streams from a possibly infinite collection.

        If `stop_on_reject` is set to `True` (default), then the
        resulting stream is stopped as soon as one stream in the
        collection terminates without a single spending request being
        granted. This allows guaranteeing termination, even if an
        infinite collection of streams is passed.
        """
        return Stream(
            lambda: stream_sequence(
                (s.gen for s in streams), stop_on_reject=stop_on_reject
            )
        )

    @staticmethod
    def parallel[U](streams: Sequence["Stream[U]"]) -> "Stream[U]":
        """
        Run all streams of a sequence in separate threads, possibly
        interleaving the resulting solutions.
        """
        return Stream(lambda: stream_parallel([s.gen() for s in streams]))

    @staticmethod
    def or_else[U](main: "Stream[U]", fallback: "Stream[U]") -> "Stream[U]":
        """
        Run the `main` stream and, if it does not yield any solution,
        run the `fallback` stream.
        """
        return Stream(lambda: stream_or_else(main.gen, fallback.gen))

collect

collect(
    budget: BudgetLimit | None = None, num_generated: int | None = None
) -> tuple[Sequence[Solution[T]], Budget]

Exhaust a stream and collect all generated solutions.

Attributes:

Name Type Description
budget optional

Budget limit (see with_budget method).

num_generated optional

Number of solutions to generate (see take method).

Returns:

Type Description
tuple[Sequence[Solution[T]], Budget]

A sequence of solutions along with the total spent budget.

Warning

This function must only be used at top-level and not within the definition of a search stream generator (in which case the consumed resources won't be accounted for in the parent stream). See Stream.all.

Source code in src/delphyne/stdlib/streams.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def collect(
    self,
    budget: dp.BudgetLimit | None = None,
    num_generated: int | None = None,
) -> tuple[Sequence[dp.Solution[T]], dp.Budget]:
    """
    Exhaust a stream and collect all generated solutions.

    Attributes:
        budget (optional): Budget limit (see `with_budget` method).
        num_generated (optional): Number of solutions to generate
            (see `take` method).

    Returns:
        A sequence of solutions along with the total spent budget.

    !!! warning
        This function must only be used at top-level and *not*
        within the definition of a search stream generator (in which
        case the consumed resources won't be accounted for in the
        parent stream). See `Stream.all`.
    """
    if budget is not None:
        self = self.with_budget(budget)
    if num_generated is not None:
        self = self.take(num_generated)
    return stream_collect(self.gen())

with_budget

with_budget(budget: BudgetLimit)

Return an identical stream that denies all spending requests once a given amount of budget is spent.

Guarantees: if all budget spending over-estimates passed to spend_on are accurate, the given budget limit is rigorously respected. If not, the spent amount may exceed the budget by an amount of Delta * N, where Delta is the maximum estimation error and N is the concurrency level of the stream (1 if Stream.parallel is never used).

Source code in src/delphyne/stdlib/streams.py
70
71
72
73
74
75
76
77
78
79
80
81
82
def with_budget(self, budget: dp.BudgetLimit):
    """
    Return an identical stream that denies all spending requests
    once a given amount of budget is spent.

    **Guarantees**: if all budget spending over-estimates passed to
    `spend_on` are accurate, the given budget limit is rigorously
    respected. If not, the spent amount may exceed the budget by an
    amount of `Delta * N`, where `Delta` is the maximum estimation
    error and `N` is the concurrency level of the stream (1 if
    `Stream.parallel` is never used).
    """
    return Stream(lambda: stream_with_budget(self.gen(), budget))

take

take(num_generated: int, strict: bool = True)

Return an identical stream that terminates once a given number of solution is generated. If strict is set to False, more solutions can be returned, provided that no additional budget must be spent for generating them.

Source code in src/delphyne/stdlib/streams.py
84
85
86
87
88
89
90
91
def take(self, num_generated: int, strict: bool = True):
    """
    Return an identical stream that terminates once a given number
    of solution is generated. If `strict` is set to `False`, more
    solutions can be returned, provided that no additional budget
    must be spent for generating them.
    """
    return Stream(lambda: stream_take(self.gen(), num_generated, strict))

loop

loop(n: int | None = None, *, stop_on_reject: bool = True) -> Stream[T]

Repeatedly execute a stream.

Parameters:

Name Type Description Default
n optional

Number of times to repeat the stream. By default, the stream is repeated indefinitely.

None
stop_on_reject bool

If set to True (default), the resulting stream stops after the first iteration during which no spending request was granted. This guarantees termination, even if n is None.

True
Source code in src/delphyne/stdlib/streams.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def loop(
    self, n: int | None = None, *, stop_on_reject: bool = True
) -> "Stream[T]":
    """
    Repeatedly execute a stream.

    Arguments:
        n (optional): Number of times to repeat the stream. By
            default, the stream is repeated indefinitely.
        stop_on_reject: If set to `True` (default), the resulting
            stream stops after the first iteration during which no
            spending request was granted. This guarantees
            termination, even if `n` is `None`.
    """
    it = itertools.count() if n is None else range(n)
    return Stream(
        lambda: stream_sequence(
            (self.gen for _ in it), stop_on_reject=stop_on_reject
        )
    )

bind

bind(f: Callable[[Solution[T]], StreamGen[U]]) -> Stream[U]

Apply a function to all generated solutions of a stream and concatenate the resulting streams.

This is analogous to the concat_map funtion on lists:

def concat_map(f, xs):
    return [y for x in xs for y in f(x)]
Source code in src/delphyne/stdlib/streams.py
114
115
116
117
118
119
120
121
122
123
124
125
126
def bind[U](
    self, f: Callable[[dp.Solution[T]], dp.StreamGen[U]]
) -> "Stream[U]":
    """
    Apply a function to all generated solutions of a stream and
    concatenate the resulting streams.

    This is analogous to the `concat_map` funtion on lists:

        def concat_map(f, xs):
            return [y for x in xs for y in f(x)]
    """
    return Stream(lambda: stream_bind(self.gen(), f))

first

first() -> StreamContext[Solution[T] | None]

Obtain the first solution from a stream, or return None if the stream terminates without yielding any solution.

Source code in src/delphyne/stdlib/streams.py
130
131
132
133
134
135
def first(self) -> dp.StreamContext[dp.Solution[T] | None]:
    """
    Obtain the first solution from a stream, or return `None` if the
    stream terminates without yielding any solution.
    """
    return stream_first(self.gen())

all

all() -> StreamContext[Sequence[Solution[T]]]

Obtain all solutions from a stream.

Source code in src/delphyne/stdlib/streams.py
137
138
139
140
141
def all(self) -> dp.StreamContext[Sequence[dp.Solution[T]]]:
    """
    Obtain all solutions from a stream.
    """
    return stream_all(self.gen())

next

next() -> StreamContext[tuple[Sequence[Solution[T]], Budget, Stream[T] | None]]

Make an atomic attempt to obtain a solution from the stream, stopping right before a second spending request is made.

Return a sequence of generated solutions, the total spent budget, and the remaining stream, if any.

Source code in src/delphyne/stdlib/streams.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def next(
    self,
) -> dp.StreamContext[
    "tuple[Sequence[dp.Solution[T]], dp.Budget, Stream[T] | None]"
]:
    """
    Make an atomic attempt to obtain a solution from the stream,
    stopping right before a second spending request is made.

    Return a sequence of generated solutions, the total spent
    budget, and the remaining stream, if any.
    """
    gen, budg, rest = yield from stream_next(self.gen())
    new_rest = None if rest is None else Stream(lambda: rest)
    return gen, budg, new_rest

sequence staticmethod

sequence(streams: Iterable[Stream[U]], *, stop_on_reject: bool = True) -> Stream[U]

Concatenate all streams from a possibly infinite collection.

If stop_on_reject is set to True (default), then the resulting stream is stopped as soon as one stream in the collection terminates without a single spending request being granted. This allows guaranteeing termination, even if an infinite collection of streams is passed.

Source code in src/delphyne/stdlib/streams.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
@staticmethod
def sequence[U](
    streams: Iterable["Stream[U]"], *, stop_on_reject: bool = True
) -> "Stream[U]":
    """
    Concatenate all streams from a possibly infinite collection.

    If `stop_on_reject` is set to `True` (default), then the
    resulting stream is stopped as soon as one stream in the
    collection terminates without a single spending request being
    granted. This allows guaranteeing termination, even if an
    infinite collection of streams is passed.
    """
    return Stream(
        lambda: stream_sequence(
            (s.gen for s in streams), stop_on_reject=stop_on_reject
        )
    )

parallel staticmethod

parallel(streams: Sequence[Stream[U]]) -> Stream[U]

Run all streams of a sequence in separate threads, possibly interleaving the resulting solutions.

Source code in src/delphyne/stdlib/streams.py
180
181
182
183
184
185
186
@staticmethod
def parallel[U](streams: Sequence["Stream[U]"]) -> "Stream[U]":
    """
    Run all streams of a sequence in separate threads, possibly
    interleaving the resulting solutions.
    """
    return Stream(lambda: stream_parallel([s.gen() for s in streams]))

or_else staticmethod

or_else(main: Stream[U], fallback: Stream[U]) -> Stream[U]

Run the main stream and, if it does not yield any solution, run the fallback stream.

Source code in src/delphyne/stdlib/streams.py
188
189
190
191
192
193
194
@staticmethod
def or_else[U](main: "Stream[U]", fallback: "Stream[U]") -> "Stream[U]":
    """
    Run the `main` stream and, if it does not yield any solution,
    run the `fallback` stream.
    """
    return Stream(lambda: stream_or_else(main.gen, fallback.gen))

spend_on

spend_on(
    f: Callable[[], tuple[T, Budget]], /, estimate: Budget
) -> StreamContext[T | SpendingDeclined]

Perform a computation that requires spending some resources.

Attributes:

Name Type Description
f

A zeroary function that returns the computation result, along with the budget spent on the computation.

estimate

An over-estimate of the budget that is consumed by the computation. This estimate is allowed to be inaccurate. See Stream.with_budget for the provided guarantees.

Returns:

Type Description
StreamContext[T | SpendingDeclined]

In a stream context, the value returned by the computation or an

StreamContext[T | SpendingDeclined]

instance of SpendingDeclined if the spending request was

StreamContext[T | SpendingDeclined]

declined.

Source code in src/delphyne/stdlib/streams.py
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
def spend_on[T](
    f: Callable[[], tuple[T, dp.Budget]], /, estimate: dp.Budget
) -> dp.StreamContext[T | SpendingDeclined]:
    """
    Perform a computation that requires spending some resources.

    Attributes:
        f: A zeroary function that returns the computation result, along
            with the budget spent on the computation.
        estimate: An over-estimate of the budget that is consumed by the
            computation. This estimate is allowed to be inaccurate. See
            `Stream.with_budget` for the provided guarantees.

    Returns:
        In a stream context, the value returned by the computation or an
        instance of `SpendingDeclined` if the spending request was
        declined.
    """
    barrier = Barrier(estimate)
    yield barrier
    if barrier.allow:
        value, spent = f()
        yield Spent(budget=spent, barrier_id=barrier.id)
        return value
    else:
        yield Spent(budget=dp.Budget.zero(), barrier_id=barrier.id)
        return SpendingDeclined()

SpendingDeclined dataclass

Sentinel value indicating that a spending request was declined.

Source code in src/delphyne/stdlib/streams.py
378
379
380
381
382
383
384
@dataclass(frozen=True)
class SpendingDeclined:
    """
    Sentinel value indicating that a spending request was declined.
    """

    pass

Stream Transformers

StreamTransformer dataclass

Wrapper for a function that maps a stream to another one, possibly depending on the global policy environment. Can be composed with policies, search policies and other stream transformers using the @ operator.

Attributes:

Name Type Description
trans _StreamTransformerFn

The wrapped stream transformer function.

Source code in src/delphyne/stdlib/streams.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
@dataclass
class StreamTransformer:
    """
    Wrapper for a function that maps a stream to another one, possibly
    depending on the global policy environment. Can be composed with
    policies, search policies and other stream transformers using the
    `@` operator.

    Attributes:
        trans: The wrapped stream transformer function.
    """

    trans: _StreamTransformerFn

    def __call__[T](
        self,
        stream: Stream[T],
        env: PolicyEnv,
    ) -> Stream[T]:
        return Stream(lambda: self.trans(stream, env))

    def __matmul__(self, other: "StreamTransformer") -> "StreamTransformer":
        """
        Compose this transformer with another one.
        """
        if not isinstance(other, StreamTransformer):  # pyright: ignore[reportUnnecessaryIsInstance]
            return NotImplemented

        def transformer[T](
            stream: Stream[T],
            env: PolicyEnv,
        ) -> dp.StreamGen[T]:
            return self(other(stream, env), env).gen()

        return StreamTransformer(transformer)

__matmul__

__matmul__(other: StreamTransformer) -> StreamTransformer

Compose this transformer with another one.

Source code in src/delphyne/stdlib/streams.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def __matmul__(self, other: "StreamTransformer") -> "StreamTransformer":
    """
    Compose this transformer with another one.
    """
    if not isinstance(other, StreamTransformer):  # pyright: ignore[reportUnnecessaryIsInstance]
        return NotImplemented

    def transformer[T](
        stream: Stream[T],
        env: PolicyEnv,
    ) -> dp.StreamGen[T]:
        return self(other(stream, env), env).gen()

    return StreamTransformer(transformer)

take

take(stream: Stream[T], env: PolicyEnv, num_generated: int, strict: bool = True)

Stream transformer version of Stream.take.

Source code in src/delphyne/stdlib/streams.py
344
345
346
347
348
349
350
351
352
353
354
@stream_transformer
def take[T](
    stream: Stream[T],
    env: PolicyEnv,
    num_generated: int,
    strict: bool = True,
):
    """
    Stream transformer version of `Stream.take`.
    """
    return stream_take(stream.gen(), num_generated, strict)

with_budget

with_budget(stream: Stream[T], env: PolicyEnv, budget: BudgetLimit)

Stream transformer version of Stream.with_budget.

Source code in src/delphyne/stdlib/streams.py
332
333
334
335
336
337
338
339
340
341
@stream_transformer
def with_budget[T](
    stream: Stream[T],
    env: PolicyEnv,
    budget: dp.BudgetLimit,
):
    """
    Stream transformer version of `Stream.with_budget`.
    """
    return stream_with_budget(stream.gen(), budget)

loop

loop(
    stream: Stream[T],
    env: PolicyEnv,
    n: int | None = None,
    *,
    stop_on_reject: bool = True,
) -> StreamGen[T]

Stream transformer that repeatedly respawns the underlying stream, up to an (optional) limit.

Source code in src/delphyne/stdlib/streams.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
@stream_transformer
def loop[T](
    stream: Stream[T],
    env: PolicyEnv,
    n: int | None = None,
    *,
    stop_on_reject: bool = True,
) -> dp.StreamGen[T]:
    """
    Stream transformer that repeatedly respawns the underlying stream,
    up to an (optional) limit.
    """

    return stream.loop(n, stop_on_reject=stop_on_reject).gen()

stream_transformer

stream_transformer(
    f: _ParametricStreamTransformerFn[A],
) -> Callable[A, StreamTransformer]

Convenience decorator for creating parametric stream transformers (i.e., functions that return stream transformers).

See take for an example.

Attributes:

Name Type Description
f

A function that takes a stream, a policy environment, and additional parameters, and returns a stream generator.

Returns:

Type Description
Callable[A, StreamTransformer]

A function that takes the additional parameters of f and

Callable[A, StreamTransformer]

returns a StreamTransformer object.

Source code in src/delphyne/stdlib/streams.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def stream_transformer[**A](
    f: _ParametricStreamTransformerFn[A],
) -> Callable[A, StreamTransformer]:
    """
    Convenience decorator for creating parametric stream transformers
    (i.e., functions that return stream transformers).

    See `take` for an example.

    Attributes:
        f: A function that takes a stream, a policy environment, and
            additional parameters, and returns a stream generator.

    Returns:
        A function that takes the additional parameters of `f` and
        returns a `StreamTransformer` object.
    """

    def parametric(*args: A.args, **kwargs: A.kwargs) -> StreamTransformer:
        def transformer[T](
            stream: Stream[T],
            env: PolicyEnv,
        ) -> dp.StreamGen[T]:
            return f(stream, env, *args, **kwargs)

        return StreamTransformer(transformer)

    return parametric

_StreamTransformerFn

Bases: Protocol

Source code in src/delphyne/stdlib/streams.py
202
203
204
205
206
207
class _StreamTransformerFn(Protocol):
    def __call__[T](
        self,
        stream: Stream[T],
        env: PolicyEnv,
    ) -> dp.StreamGen[T]: ...

_ParametricStreamTransformerFn

Bases: Protocol

Source code in src/delphyne/stdlib/streams.py
210
211
212
213
214
215
216
217
class _ParametricStreamTransformerFn[**A](Protocol):
    def __call__[T](
        self,
        stream: Stream[T],
        env: PolicyEnv,
        *args: A.args,
        **kwargs: A.kwargs,
    ) -> dp.StreamGen[T]: ...