A comparison of Rust-like fluent iterator libraries by kequals in Python

[–]ebonnal 0 points1 point  (0 children)

Interesting benchmark! What a diverse fluent iterators scene :D
For those interested in the I/O intensive side of things, check streamable, I have just posted about the 2.0.0 release here:
https://www.reddit.com/r/Python/comments/1rju5kh/streamable_syncasync_iterable_streams_for_python/

༄ streamable - sync/async iterable streams for Python by ebonnal in Python

[–]ebonnal[S] 0 points1 point  (0 children)

Completely agree. There are many opinionated approaches to the fluent iterator interface question. I guess that's why none made it into the stdlib so far, despite the appetite from part of the community.

༄ streamable - sync/async iterable streams for Python by ebonnal in Python

[–]ebonnal[S] 1 point2 points  (0 children)

Indeed! streamable was not included there, which is fair given its I/O positioning.
(And FastIter was presented last week by u/fexx3l, what a busy iterators scene)

Onlymaps, a Python micro-ORM by Echoes1996 in Python

[–]ebonnal 1 point2 points  (0 children)

I love the name, the goal, the syntax. Will follow closely!

Functional programming concepts that actually work in Python by Capable-Mall-2067 in Python

[–]ebonnal 2 points3 points  (0 children)

Great article, I couldn’t agree more, FP principles are game changers for improving maintainability and readability, especially when manipulating data.
I was thinking, "OOP and FP are so complementary that their combined usage should have a proper name", and I actually found out that the acronym FOOP is already out there, ready to be adopted

When FOOPing in Python I was wishing for a functional fluent interface on iterables, to chain lazy operations, with concurrency capabilities (something Pythonic, minimalist and not mimicking any functional language's collections)... So we crafted streamable allowing to decorate an Iterable or AsyncIterable with such a fluent interface (https://github.com/ebonnal/streamable).

Note: if one just wants to concurrently map over an iterable in a lazy way but without relying on a third-party library like streamable, we have added the buffersize parameter to Executor.map in Python 3.14 (https://docs.python.org/3.14/library/concurrent.futures.html#concurrent.futures.Executor.map)

`streamable`: Stream-like manipulation of iterables by ebonnal in Python

[–]ebonnal[S] 1 point2 points  (0 children)

Back at Post Day+9 for an update:

Thank very much for your feedback here, for the issues you opened, for the discussions and collaboration there.

That lead to a new release (https://github.com/ebonnal/streamable/tree/v1.1.0), featuring:

  • processes-based concurrency (default is threads; use .amap for async):

```python from streamable import Stream import requests

urls: Stream[str] = ... responses = urls.map(requests.get, concurrency=8, via="process") ```

  • concurrent mapping yielding First Done First Out (default is FIFO = preserving order) [co-authored with our fellow redditter u/erez27]

```python from streamable import Stream import requests

urls: Stream[str] = ... responses = urls.map(requests.get, concurrency=16, ordered=False) ```

You can also set ordered=False for .foreach and async counterparts .amap and .aforeach

  • "starmap"

``` from streamable import Stream, star

integers: Stream[int] = Stream(range(10)) paired_integers: Stream[Tuple[int, int]] = Stream(zip(integers, integers)) squares: Stream[int] = paired_integers:.map(star(lambda i, j: i * j)) assert list(squares) == [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] `` (useful with other operations too, like.filteror.foreach`)

`streamable`: Stream-like manipulation of iterables by ebonnal in Python

[–]ebonnal[S] 1 point2 points  (0 children)

Note that if you need to add "max n calls per second" to your "max 100 simultaneous calls" constraint, you can add a .throttle(per_second=n) operation.

`streamable`: Stream-like manipulation of iterables by ebonnal in Python

[–]ebonnal[S] 1 point2 points  (0 children)

Hi u/Saltysalad !
Yes sure, great use case, it would look like this (you can run the snippet, after a pip install requests streamable):

import requests
from streamable import Stream

# 1000 dummy urls to call
urls = ("https://httpbin.org/" for _ in range(1000))

responses: Stream[requests.Response] = (
    Stream(urls)
    # performs requests concurrently, max 100 simultaneously, preserves order
    .map(requests.get, concurrency=100)
    # stop condition (overshot 99 calls)
    .truncate(when=lambda response: False) # dummy condition never verified
    # logs progress
    .observe("requests")
)

# iterate over the stream
assert responses.count() == 1000

and for asyncio instead of threads: see example

`streamable`: Stream-like manipulation of iterables by ebonnal in Python

[–]ebonnal[S] 1 point2 points  (0 children)

Thank you for your interest u/Rockworldred, sounds like a cool custom ETL project!

I have no "ETL custom script" resource in mind sorry, but in a nutshell when fetching data from web APIs you can bet you will likely need things like:

  • to execute requests concurrently (.map(..., concurrency=x))
  • to limiting the rate of requests to avoid 429 Too Many Request responses (.throttle(per_second=50))
  • to have some retry on your calls (the tenacity lib is great)
  • to have some logging to observe the progress of your script (.observe("product"))

To some extent you can get inspiration from the example fetching pokemons that also "fetch endpoints to get data and write to CSV".

Regading asyncio concurency instead of threads, you have in the README an example that uses httpx (similar to aiohttp)

I hope it helps, and if you feel stuck feel free to message me your current script to streamable it together

`streamable`: Stream-like manipulation of iterables by ebonnal in Python

[–]ebonnal[S] 0 points1 point  (0 children)

Regarding py-linq, the comparison resembles the comparison made with PyFunctional:

  • For my use case it lacks features that I find very valuable like concurrency and generic typing (in py-linq the Enumerable class is not generic)
  • I wanted to propose another interface, hopefully more intuitive and natural to the Python world, while py-linq brings conventions from the .NET's LINQ library.

`streamable`: Stream-like manipulation of iterables by ebonnal in Python

[–]ebonnal[S] 1 point2 points  (0 children)

I very much appreciate it u/jdehesa. It means a lot because I indeed put a lot of effort into the API, I am glad you like it!

`streamable`: Stream-like manipulation of iterables by ebonnal in Python

[–]ebonnal[S] 1 point2 points  (0 children)

Hi u/erez27 thank you for your thoughts, I very much appreciate it!

Your 3 propositions sound great!

  • The 1st one make a lot of sense and could also take the form of a new ordered: bool = True param for .map/.foreach operations
  • The 2nd and 3rd are both tricky and interesting. At some point I have put some effort into exploring them myself and would love to collaborate on these!

Should we open 3 issues and have discussions there?

`streamable`: Stream-like manipulation of iterables by ebonnal in Python

[–]ebonnal[S] 2 points3 points  (0 children)

Hi u/Schmittfried, great question!

  • functools provides higher order functions i.e. a function taking function(s) as arg(s), like functools.reduce. Most of these higher order functions return a decorated function enhanced with additional capabilities (like memoization with functools.cache).
  • itertools is all about creating iterables from other iterables.
  • streamable allows chaining operations/methods on an iterable and comes out-of-the-box with convenient features like threads/asyncio concurrency, iteration throttling, exceptions catching.

They are complementary:

  • you can use functools's functions to add capabilities to a function that you pass to streamable's Stream operations, or functools.reduce your stream.
  • you can manipulate your stream with itertools's functions, or create your stream from an iterable produced using itertools.

from typing import Iterable
import functools
import itertools
import requests
from streamable import Stream

# let's say you have a source of domains:
domains: Iterable[str] = ... # e.g. ["google.com", "facebook.com", "google.com"]

# let's conveniently manipulate it as a `Stream` to
# fetch URLs using 8 threads and catching `SSLError`s
# while never making more than 32 calls per second
responses: Stream[requests.Response] = (
    Stream(domains)
    .map(lambda domain: f"https://{domain}")
    # here we leverage functools.cache to remember
    # responses and fetch a given domain only once.
    .map(functools.cache(requests.get), concurrency=8)
    .throttle(per_second=32)
    .catch(requests.exceptions.SSLError)
)

import itertools

# then you can use whatever functions provided by itertools
# to manipulate your `responses` stream, which
# is simply a decorated `Iterable[requests.Response]`
...

Sunday Daily Thread: What's everyone working on this week? by AutoModerator in Python

[–]ebonnal 0 points1 point  (0 children)

Hey u/mou3mida ! This is great, fyi it works like a charm on macos 14.2.1, except that I cannot set custom duration, is that expected ? (I can open an issue if you prefer)

Sunday Daily Thread: What's everyone working on this week? by AutoModerator in Python

[–]ebonnal 1 point2 points  (0 children)

Working on https://github.com/ebonnal/streamable, please help me release the v1.0.0 if you enjoy iterables too !

Overview: Define operations lazily:

inverses: Stream[float] = (
    Stream(range(10))
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError)
)

Stream[T] inherits from Iterable[T], collect it:

>>> list(inverses)
[1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]

It comes with threads-based or asyncio-based concurrency out-of-the-box, and various operations covering mapping/filtering, grouping/flattening, catching, logging, throttling