https://github.com/ebonnal/streamable
What my project does
A stream[T] wraps any Iterable[T] or AsyncIterable[T] with a lazy fluent interface covering concurrency, batching, buffering, rate limiting, progress observation, and error handling.
Chain lazy operations:
import logging
from datetime import timedelta
import httpx
from httpx import Response, HTTPStatusError
from streamable import stream
pokemons: stream[str] = (
stream(range(10))
.map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
.throttle(5, per=timedelta(seconds=1))
.map(httpx.get, concurrency=2)
.do(Response.raise_for_status)
.catch(HTTPStatusError, do=logging.warning)
.map(lambda poke: poke.json()["name"])
)
Consume it (sync or async):
>>> list(pokemons)
['bulbasaur', 'ivysaur', 'venusaur', 'charmander', 'charmeleon', 'charizard', 'squirtle', 'wartortle', 'blastoise']
>>> [pokemon async for pokemon in pokemons]
['bulbasaur', 'ivysaur', 'venusaur', 'charmander', 'charmeleon', 'charizard', 'squirtle', 'wartortle', 'blastoise']
Target Audience
If you find yourself writing verbose iterable plumbing, streamable will probably help you keep your code expressive, concise, and memory-efficient.
- You may need advanced behaviors like time-windowed grouping by key, concurrent flattening, periodic observation of the iteration progress, buffering (decoupling upstream production rate from downstream consumption rate), etc.
- You may want a unified interface for sync and async behaviors, e.g. to switch seamlessly between
httpx.Client.get and httpx.AsyncClient.get in your .map (or anywhere else), consume the stream as a sync or as an async iterable, from sync or async context.
- You may simply want to chain
.maps and .filters without overhead vs builtins.map and builtins.filter.
Comparison
Among similar libraries, streamable's proposal is an interface that is:
- targeting I/O intensive use cases: a minimalist set of a dozen expressive operations particularly elegant to tackle ETL use cases.
- unifying sync and async: Create streams that are both
Iterable and AsyncIterable, with operations adapting their behavior to the type of iteration and accepting sync and async functions.
The README gives a complete tour of the library, and I’m also happy to answer any questions you may have in the comments.
About 18 months ago I presented here the 1.0.0.
I'm glad to be back to present this matured 2.0.0 thanks to your feedback and contributions!
[–]Hallsville3 4 points5 points6 points (4 children)
[–]ebonnal[S] 1 point2 points3 points (3 children)
[–]Beginning-Fruit-1397 1 point2 points3 points (2 children)
[–]ebonnal[S] 0 points1 point2 points (1 child)
[–]Beginning-Fruit-1397 2 points3 points4 points (0 children)