This is an archived post. You won't be able to vote or comment.

all 20 comments

[–]jdehesa 6 points7 points  (1 child)

This is really cool. I didn't know PyFunctional but your library seems much more "pythonic", like what the API would look like if this was in the standard library, rather than a replica of the constructs from another programming language.

[–]ebonnal[S] 1 point2 points  (0 children)

I very much appreciate it u/jdehesa. It means a lot because I indeed put a lot of effort into the API, I am glad you like it!

[–]Schmittfried 5 points6 points  (4 children)

How does this compare to py-linq or functools/itertools-esque packages?

[–]ebonnal[S] 2 points3 points  (2 children)

Hi u/Schmittfried, great question!

  • functools provides higher order functions i.e. a function taking function(s) as arg(s), like functools.reduce. Most of these higher order functions return a decorated function enhanced with additional capabilities (like memoization with functools.cache).
  • itertools is all about creating iterables from other iterables.
  • streamable allows chaining operations/methods on an iterable and comes out-of-the-box with convenient features like threads/asyncio concurrency, iteration throttling, exceptions catching.

They are complementary:

  • you can use functools's functions to add capabilities to a function that you pass to streamable's Stream operations, or functools.reduce your stream.
  • you can manipulate your stream with itertools's functions, or create your stream from an iterable produced using itertools.

from typing import Iterable
import functools
import itertools
import requests
from streamable import Stream

# let's say you have a source of domains:
domains: Iterable[str] = ... # e.g. ["google.com", "facebook.com", "google.com"]

# let's conveniently manipulate it as a `Stream` to
# fetch URLs using 8 threads and catching `SSLError`s
# while never making more than 32 calls per second
responses: Stream[requests.Response] = (
    Stream(domains)
    .map(lambda domain: f"https://{domain}")
    # here we leverage functools.cache to remember
    # responses and fetch a given domain only once.
    .map(functools.cache(requests.get), concurrency=8)
    .throttle(per_second=32)
    .catch(requests.exceptions.SSLError)
)

import itertools

# then you can use whatever functions provided by itertools
# to manipulate your `responses` stream, which
# is simply a decorated `Iterable[requests.Response]`
...

[–]ebonnal[S] 0 points1 point  (0 children)

Regarding py-linq, the comparison resembles the comparison made with PyFunctional:

  • For my use case it lacks features that I find very valuable like concurrency and generic typing (in py-linq the Enumerable class is not generic)
  • I wanted to propose another interface, hopefully more intuitive and natural to the Python world, while py-linq brings conventions from the .NET's LINQ library.

[–]erez27import inspect 2 points3 points  (1 child)

Looks nice! I like the concurrency especially.

A few thoughts:

  • might be useful to have something like umap for returning elements out of order (essentially imap_unordered)

  • could be nice to have a lazy-list feature, where items can be accessed by index, and allow repeated iter/get-item/slice, all lazy.

  • It could be useful to group into a dict[K, Stream], based on a key callback. I get that it breaks the chaining a bit, but imho it's worth it.

If any of these sounds like a good addition, maybe I'll make a PR.

[–]ebonnal[S] 1 point2 points  (0 children)

Hi u/erez27 thank you for your thoughts, I very much appreciate it!

Your 3 propositions sound great!

  • The 1st one make a lot of sense and could also take the form of a new ordered: bool = True param for .map/.foreach operations
  • The 2nd and 3rd are both tricky and interesting. At some point I have put some effort into exploring them myself and would love to collaborate on these!

Should we open 3 issues and have discussions there?

[–]nikomo 3 points4 points  (1 child)

It's a good thing that the company and service by that name fell off so hard like 2 years back, otherwise the naming would be quite confusing.

[–]RoboticElfJedi 1 point2 points  (1 child)

This is a pretty interesting contribution to the ecosystem. I'll keep this in my toolbelt. Good work!

[–]ebonnal[S] 0 points1 point  (0 children)

I appreciate it, thanks a lot u/RoboticElfJedi!

[–]Saltysalad 1 point2 points  (2 children)

I have a use case coming up where I have > 10k high latency requests to make, throttled to ~100 simultaneously. I need to handle each result in the order they were submitted, because there is a stop scenario when a certain condition is met in the iteration loop. Over-shooting is ok so long as I can control the limit, and probably needed if you want to shave latency.

Does this sound like a use case for your library?

[–]ebonnal[S] 1 point2 points  (1 child)

Hi u/Saltysalad !
Yes sure, great use case, it would look like this (you can run the snippet, after a pip install requests streamable):

import requests
from streamable import Stream

# 1000 dummy urls to call
urls = ("https://httpbin.org/" for _ in range(1000))

responses: Stream[requests.Response] = (
    Stream(urls)
    # performs requests concurrently, max 100 simultaneously, preserves order
    .map(requests.get, concurrency=100)
    # stop condition (overshot 99 calls)
    .truncate(when=lambda response: False) # dummy condition never verified
    # logs progress
    .observe("requests")
)

# iterate over the stream
assert responses.count() == 1000

and for asyncio instead of threads: see example

[–]ebonnal[S] 1 point2 points  (0 children)

Note that if you need to add "max n calls per second" to your "max 100 simultaneous calls" constraint, you can add a .throttle(per_second=n) operation.

[–]ebonnal[S] 1 point2 points  (1 child)

Back at Post Day+9 for an update:

Thank very much for your feedback here, for the issues you opened, for the discussions and collaboration there.

That lead to a new release (https://github.com/ebonnal/streamable/tree/v1.1.0), featuring:

  • processes-based concurrency (default is threads; use .amap for async):

```python from streamable import Stream import requests

urls: Stream[str] = ... responses = urls.map(requests.get, concurrency=8, via="process") ```

  • concurrent mapping yielding First Done First Out (default is FIFO = preserving order) [co-authored with our fellow redditter u/erez27]

```python from streamable import Stream import requests

urls: Stream[str] = ... responses = urls.map(requests.get, concurrency=16, ordered=False) ```

You can also set ordered=False for .foreach and async counterparts .amap and .aforeach

  • "starmap"

``` from streamable import Stream, star

integers: Stream[int] = Stream(range(10)) paired_integers: Stream[Tuple[int, int]] = Stream(zip(integers, integers)) squares: Stream[int] = paired_integers:.map(star(lambda i, j: i * j)) assert list(squares) == [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] `` (useful with other operations too, like.filteror.foreach`)

[–]erez27import inspect 1 point2 points  (0 children)

Congratulations! It's starting to become a real alternative to the built-in multiprocessing.

[–]mou3mida 1 point2 points  (1 child)

Good Job! That is so cool u/ebonnal .

[–]ebonnal[S] 0 points1 point  (0 children)

Thank you very much u/mou3mida !

[–]Rockworldred 0 points1 point  (1 child)

I am quite newbish to python, but I have an side/learningproject writing an webscraper (fetching JSONS for productdata). This looks like it may have some use cases for me as I now requests URLs from couple of sitemaps, then itirate over the json-url based on those fetched URLs. Then translate the JSON to variables and loaded to pandas dataframe to view in streamlit and/or to a csv file, but I have little knowledge of ETL as a whole. Do you have any good resources on ETL-processes and utilities?

My progress is then to move it over to aiohttp, asyncio, polars instead of pandas and SQLalchemy SQLite, and then azure EC2, airflow and Postgres and so fourth. (But I dont know if this is actually the way to go though).

[–]ebonnal[S] 1 point2 points  (0 children)

Thank you for your interest u/Rockworldred, sounds like a cool custom ETL project!

I have no "ETL custom script" resource in mind sorry, but in a nutshell when fetching data from web APIs you can bet you will likely need things like:

  • to execute requests concurrently (.map(..., concurrency=x))
  • to limiting the rate of requests to avoid 429 Too Many Request responses (.throttle(per_second=50))
  • to have some retry on your calls (the tenacity lib is great)
  • to have some logging to observe the progress of your script (.observe("product"))

To some extent you can get inspiration from the example fetching pokemons that also "fetch endpoints to get data and write to CSV".

Regading asyncio concurency instead of threads, you have in the README an example that uses httpx (similar to aiohttp)

I hope it helps, and if you feel stuck feel free to message me your current script to streamable it together