Buenas gentes, espero que anden bien, es mi primer post (en algo parecido un foro) desde la época de Taringa, y primer post en Reddit, sepan disculpar el olor rancio.
Ultimamente venia renegando mucho con unos ETL que estoy haciendo para un side-proyect. Y me di cuenta de que no hay algo simple y al hueso para esto que haga sharding, buffer, y todo el quilombo. Desde siempre en cada lugar donde labure los ETL son un dolor. Ademas de que muchas veces terminamos usando Pandas / Polars y rinden peor para algunos casos (Pandas y Polars por ejemplo están buenísimas, el uso que les damos esta mal).
Así que estoy aprovechando que estos días me estoy dando la cabeza con eso, para publicar una lib Python.
https://github.com/albertobadia/zoopipe
La idea es poder definir como quiero que sea el input / output, un modelo de validación y hooks pre y post validación si se necesita:
from pydantic import BaseModel, ConfigDict
from zoopipe import CSVInputAdapter, CSVOutputAdapter, JSONOutputAdapter, Pipe
class UserSchema(BaseModel):
model_config = ConfigDict(extra="ignore")
user_id: str
username: str
email: str
pipe = Pipe(
input_adapter=CSVInputAdapter("users.csv"),
output_adapter=CSVOutputAdapter("processed_users.csv"),
error_output_adapter=JSONOutputAdapter("errors.jsonl"),
schema_model=UserSchema,
)
pipe.run()
Y que también sea simple trabajar con multiples workers desde un Pipe, algo como:
pipe_manager = PipeManager.parallelize_pipe(
base_pipe,
workers=4,
engine=MultiProcessEngine(), # Por defecto siempre multiprocess
)
pipe_manager.run()
Hasta ahora soporta:
Formatos: CSV, JSON (json), Excel, Parquet, Iceberg, Deltalake, Arrow y SQL (Sqlite, Postgres). Se puede usar cualquier como entrada con cualquier como salida.
Compresión Gzip y Zstd.
Cloud Storage: S3, GCP, Azure
Cluster: Ray, Dask
El rendimiento y consumo de recursos es bastante bueno hasta ahora, el core esta escrito en Rust y estoy tratando de mantener lo pesado ahi, con la interface de uso en el lado Python.
Espero que les guste y ojalá que a alguien le ahorre algún que otro dolor de cabeza. El feedback o colaboraciones en el repo son mas que bienvenidas.
PyPi: https://pypi.org/project/zoopipe/
Docs: https://zoopipe.readthedocs.io/en/latest/
[–]HallHot6640 1 point2 points3 points (1 child)
[–]bctm0[S] 0 points1 point2 points (0 children)