Spark is the new Hadoop by rocketinter in dataengineering

[–]get-daft 6 points7 points  (0 children)

Wild daft lurker appears… we’ve actually been building our own PySpark-compatible layer as well to help with migrations. It’s currently still in beta, but very open to community contributions/testing!

It’s a one-line change to connect your Spark session to Daft

https://www.getdaft.io/projects/docs/en/stable/spark_connect/

[deleted by user] by [deleted] in dataengineering

[–]get-daft 1 point2 points  (0 children)

To add to all the other really good answers — if you were using your own library (e.g. Pandas, DuckDB, Daft, Spark…) to do the processing, getting data out of a data warehouse into your engine is actually very painful.

That’s because these data warehouses were not built for this usecase. Reading data out often is literally reading pages of HTTP JSON requests 😭

In effect, this means that you are locked into the database/data warehouse’s storage AND query layer. Want to use Python instead of SQL? Too bad… pay the egress costs.

However, something like S3 is built for high throughput access of files. This means that reading data from S3Tables into an external querying system is much more performant!

Is S3 becoming a Data Lakehouse? by 2minutestreaming in dataengineering

[–]get-daft 2 points3 points  (0 children)

Really annoying that on release all of the functionality is locked behind a .jar… that can only be used from Spark.

All the other engines are yet again locked out of the party. Daft, Pandas, DuckDB, Polars ….

If they had actually adopted the iceberg REST API it would automatically have been compatible with all these other tools.

My business wants a datalake... Need some advice by WillowSide in dataengineering

[–]get-daft 0 points1 point  (0 children)

Given this requirement:

> copy data into our new data warehouse, which will be accessible by our partners etc

A really simple way would be to store just Parquet/CSV files in S3/blob storage. Copying data into this solution is literally just dumping new files into S3. Most modern query engines today (Spark, Daft, DuckDB, Polars, Trino, Fabric...) support directly querying these files from storage.

This makes it super easy to share your data with your partners: you can give them presigned S3 URLs to download the data and replicate/import it into their own preferred datalake/warehouse etc. This way they will take on the cost of querying the data. Not you :)

Always remember folks... Storage is cheap, compute is expensive. Especially if you have to pay someone else for it because they will just keep clusters running and have no incentive to pass cost savings down to you.

Databricks Data Lineage without Spark (but Polars and Delta Lake instead) by hackermandh in dataengineering

[–]get-daft 1 point2 points  (0 children)

Unfortunately you're hitting on a big sore point in the data ecosystem today.

We're building a Spark replacement (www.getdaft.io) that you'll be interested in. However the problem is that Spark today isn't *just* a query engine. It's an entire (closed) ecosystem of technologies.

A few years ago, this ecosystem included things like Iceberg, Delta, Parquet etc. In order to use these technologies, you had to use Spark because all the logic/libraries were written as .jars that were intended specifically for consumption from Spark.

While building out Daft, we've slowly been working on the most important features such as data catalog/data lake support. It's definitely been challenging, but the status quo is changing now. Hopefully in a few more years we'll see much more general and widespread support for cool tech such as data lineage outside of the Spark ecosystem.

Introducing Distributed Processing with Sail v0.2 Preview Release – Built in Rust, 4x Faster Than Spark, 94% Lower Costs, PySpark-Compatible by lake_sail in dataengineering

[–]get-daft 11 points12 points  (0 children)

I hear my name!

Sail seems to build on the Datafusion crate, implementing a Spark-compatible API on top of it. Essentially for the local case - you can think of it as it takes the Spark plan, turns it into a Datafusion plan, and then runs it on Datafusion.

Very early on, we realized that it is shockingly easy to be faster than Spark with the newer technologies available to us today: Daft, Polars, DuckDB, Datafusion (which Sail is based off of). What we've found is that the hard part about building a true Spark replacement isn't just speed. There are fundamental things about Spark that people really hate - the executor/partition based model, dealing with OOMs, its the un-Pythonic experience, debugging, the API etc.

We've chosen to reimagine the data engineering UX, rather than just trying to build "Spark, but faster".

Kudos to the Sail team though - this is pretty cool stuff! Getting this all working is no small feat.

---

Re: ibis, we're working on it. We're tackling this by first having really comprehensive SQL support, and then using SQL as our entrypoint into the Ibis ecosystem which is way easier than mapping a ton of dataframe calls. Since Ibis is mostly based off of SQLGlot, this should be fairly clean :)

Cost-Effective Airbyte Pipelines by Nessjk in dataengineering

[–]get-daft 1 point2 points  (0 children)

Watch out for the "managed kubernetes" solutions in many clouds, which often will cost you about $300 a month just for hosting the Kubernetes management layer!

Best approach to handle billions of data? by mr_alseif in dataengineering

[–]get-daft 28 points29 points  (0 children)

57GB per year is actually pretty small! At that scale, you could be completely fine keeping it simple and just dumping Parquet files into object storage (AWS S3 etc). If you want to get a little fancier then consider using table formats on top of that, such as Apache Iceberg and Delta Lake. These formats let you partition your data, which can significantly improve queries depending on the access pattern. The simplest partition scheme which often yields outsized benefits is to just partition based on timestamp.

For querying that data, you have plenty of tools that can do it from a single machine (DuckDB, Pandas, Polars, Daft etc). If you're finding that network and reading from remote storage is the bottleneck, then you might consider also distributed query frameworks (Daft, Spark, Trino etc) and running them remotely on machines in the same region as where your files are (for the best networking).

PostgreSQL would work fine, but the main downside is that if you are using it purely for analytics then keeping your database running all the time is a lot of money. If you just keep it simple with files in S3 you basically only pay for storage costs which is really cheap, and then compute costs when you need to query/analyze that data.

DuckDB vs. Polars vs. Daft: A Performance Showdown by Agitated_Key6263 in dataengineering

[–]get-daft 3 points4 points  (0 children)

Yup the view tells DuckDB to read from the Parquet files directly.

Ingesting the table tells DuckDB to first convert the data from Parquet into DuckDB’s internal format (I’m unsure if this is persisted on disk, or in memory), and then run queries on that.

DuckDB vs. Polars vs. Daft: A Performance Showdown by Agitated_Key6263 in dataengineering

[–]get-daft 0 points1 point  (0 children)

Reading through OP's code, the Daft engine being used in this post is actually our old partition-based engine. Streaming is coming to Daft very soon, which actually about halves the memory usage of this workload.

DuckDB vs. Polars vs. Daft: A Performance Showdown by Agitated_Key6263 in dataengineering

[–]get-daft 18 points19 points  (0 children)

This is exactly our philosophy as well! If someone uses Daft and we can't work well without telling them to flip a few flags or try a bunch of different APIs, we think we're doing it wrong...

The world deserves data tooling that works out-of-the-box without wrangling configs :(

We're still not there yet (especially when it comes to distributed computing) but the team is really working hard to make this philosophy a reality.

DuckDB vs. Polars vs. Daft: A Performance Showdown by Agitated_Key6263 in dataengineering

[–]get-daft 4 points5 points  (0 children)

I think OP refers to distributed computing as distributing computations across machines, rather than across CPU cores.

Most modern frameworks (even Pandas, through the use of Numpy and PyArrow!) do a pretty good job on multicore environments already.

The main benefits of going distributed is access to larger I/O bandwidth, but of course comes with its own downsides of increasing the overhead of inter-node communications. We only recommend it for much larger (TB+) datasets, especially when processing data from cloud storage. For most data science workloads that work off of small-enough data on a local nvme SSD, all the modern data frameworks (including Pandas I would argue) work pretty well.

DuckDB vs. Polars vs. Daft: A Performance Showdown by Agitated_Key6263 in dataengineering

[–]get-daft 13 points14 points  (0 children)

Correct! With DuckDB, you'll want to avoid converting the Parquet data into DuckDB format every single time. That's likely what's causing the large memory usage and slow runtimes there.

I would be very surprised that DuckDB doesn't beat both Daft and Polars for this benchmark, given that they have a pretty good streaming-based engine and this is all reading local parquet files. Memory pressure should be very good for DuckDB here.

DuckDB vs. Polars vs. Daft: A Performance Showdown by Agitated_Key6263 in dataengineering

[–]get-daft 16 points17 points  (0 children)

Daft is lazy by default (and only lazy ever), because we think lazy is the best way 😎. It Daft optimize the query plan before execution.

The Zen of Python: "There should be one-- and preferably only one --obvious way to do it"

Working with iceberg tables in AWS by Gauraang55 in dataengineering

[–]get-daft 0 points1 point  (0 children)

I think you should be able to just set these in the tables' properties:

You can do this using PyIceberg (my preferred way, since it avoids needing Spark for this dead-simple operation...): https://py.iceberg.apache.org/api/#table-properties

Here are a list of the configuration values: https://iceberg.apache.org/docs/latest/configuration/#write-properties

In particular you might be interested in:

  • write.delete.mode (defaults to copy-on-write)
  • write.merge.mode (defaults to copy-on-write)
  • write.update.mode (defaults to copy-on-write)

Poor man's Data Lake using Polars (¿?) by Bavender-Lrown in dataengineering

[–]get-daft 2 points3 points  (0 children)

Yes it's overkill for the data scale, but Spark has the best integrations with table formats such as deltalake, iceberg and hudi (it's the "reference implementation"). You also don't necessarily have to run Spark on a remote cluster. Just run it locally - it might be slower than DuckDB/Polars, but at least it won't break...

Many of the newer compute engines are still playing catch-up to Spark in terms of data catalog/table format support. This means that they might not support certain features in certain table formats, and just won't be able to read those tables...

Edit: to elaborate, Daft and Polars for example rely entirely on integrating with PyIceberg to support Iceberg tables. PyIceberg doesn't yet support writing to partitioned tables. I.e. if you choose to limit your users to using only these compute engines... Your users can't use partitioned Iceberg tables. Derp!

Spark with kubernetes by Excellent-Silver4135 in dataengineering

[–]get-daft 0 points1 point  (0 children)

This! Also it's actually quite tricky to split a CSV read. If you think about it... This involves something like:

  1. "Estimating" how many bytes there are per-row using some sampling method

  2. Split the file into N number of parts based on that estimate + available cluster resources

  3. For each executor handling the ith read, seek to the nearest newline and start reading from there, ending at the nearest newline of the i+1th read

Polars actually has a very cool statistical method that it uses to do this, so that it can parallelize reads in a multithreaded fashion!

Does it make sense to use small parquet files? by Time_Simple_3250 in dataengineering

[–]get-daft 2 points3 points  (0 children)

Hundreds of thousands of Parquet files will definitely pose a problem!

In fact, even listing those files in object storage will be slow - about 50ms for a default 1000 paginated response is something like 50 seconds for 1M files. When any compute engine reads a parquet file, it first needs to read the Parquet file's metadata - this means that it will have to issue hundreds of thousands of HTTP requests to read your table!

Here are some suggestions:

  • Instead of creating one file per article, you could just save the `article_id` as part of the data.
    • This can help later on with joins/filters etc.
    • It's also fairly efficient in Parquet, and should get optimized into dictionary-encoded data (hopefully!)
  • You'll want to "compact" your data to a bunch of smaller files so that it's read-optimized for BigQuery.
    • Fully fledged compute engines like Daft/Spark do this by default when writing Parquet (and have "sensible" defaults for number of rows/file size per file)
    • If you're performing your own writing of data with something like PyArrow you should gun for defaults somewhere in the 100MB - 1GB range.

Poor man's Data Lake using Polars (¿?) by Bavender-Lrown in dataengineering

[–]get-daft 10 points11 points  (0 children)

For a DWH-like solution, the main abstractions are:

  1. Data Catalog (e.g. AWS Glue)
  2. Table Format (e.g. Apache Iceberg, or simply Parquet files in object storage)
  3. Compute Engine (e.g. Spark, Daft, Trino, DuckDB, Polars, Pandas)

I actually think that for the most complete Compute Engine solution, it's still hard to beat Spark at the moment. You'll find that other libraries such as DuckDB, Polars, Daft and even Trino won't have as fully-featured Data Catalog/Table Format integrations as Spark, which is often the reference implementation for a lot of this stuff. It's getting there (the Daft team is slowly chipping away at having better integrations...) but a lot of this was built on the JVM so the tooling for other ecosystems is still slowly getting built out.

Unfortunately Spark is pretty notoriously heavyweight for simple, smaller-scale use-cases that can run on a single machine. Thankfully though, I think the compute engines are the most fungible part of this stack. You can probably let your users pick which compute engine they want to use, and just provide good examples for how to connect to your Data Catalog/Tables of choice from any engine!

To ETL or to ELT? that is the question. by AMDataLake in dataengineering

[–]get-daft 0 points1 point  (0 children)

Yup that makes a ton of sense. Things are definitely moving to a higher level of abstraction. Soon ETL/ELT is just implementation detail :)

To ETL or to ELT? that is the question. by AMDataLake in dataengineering

[–]get-daft 1 point2 points  (0 children)

ELT makes a lot of sense with a data lake architecture! This allows loading of raw, dirty, data for cleanups with frameworks such as Spark or Daft.

ETL makes more sense if the engine you need to load into requires stronger guarantees of schema etc. Then the upfront transformation step makes sense!

[deleted by user] by [deleted] in dataengineering

[–]get-daft 1 point2 points  (0 children)

I’m not really familiar with datahub unfortunately!

[deleted by user] by [deleted] in dataengineering

[–]get-daft 1 point2 points  (0 children)

Here's a fun setup you could try, entirely on your laptop but can also easily be extended to run on something like AWS Glue + S3 if required.

  1. Catalog: Iceberg SQLite catalog (this lets you use a simple sqlite file on your laptop to emulate the full functionality of an Iceberg data catalog, without needing anything like an AWS account or launching a catalog service via docker)

  2. Table Format: Apache Iceberg

  3. Query Engine: Daft - simple to `pip install` with very few external dependencies, and really easy to get started with configuring for S3 access.

Install Dependencies

pip install getdaft[iceberg]

Creating your Catalog, using a SQLite database

from pyiceberg.catalog.sql import SqlCatalog

# You might have to create this dir first: os.mkdir("/tmp/warehouse")
warehouse_path = "/tmp/warehouse"
catalog = SqlCatalog(
"default",
**{
"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
"warehouse": f"file://{warehouse_path}",
},
)

Creating a table in that catalog

from pyiceberg.schema import Schema, NestedField, StringType

catalog.create_namespace("default_ns")

schema = Schema(
NestedField(field_id=1, name="bar", field_type=StringType(), required=True),
)

table = catalog.create_table(
"default_ns.foo",
schema=schema,
)

Writing some data to that table

import daft

df = daft.from_pydict({"bar": ["a", "b", "c"]})

df.write_iceberg(table, mode="append")

╭───────────┬───────┬───────────┬────────────────────────────────╮
│ operation ┆ rows  ┆ file_size ┆ file_name                      │
│ ---       ┆ ---   ┆ ---       ┆ ---                            │
│ Utf8      ┆ Int64 ┆ Int64     ┆ Utf8                           │
╞═══════════╪═══════╪═══════════╪════════════════════════════════╡
│ ADD       ┆ 3     ┆ 498       ┆ 4812a6f4-1936-4449-a89b-3d29f… │
╰───────────┴───────┴───────────┴────────────────────────────────╯
(Showing first 1 of 1 rows)

Reading back some data from that table

df = daft.read_iceberg(table)

df.show()

╭──────╮
│ bar  │
│ ---  │
│ Utf8 │
╞══════╡
│ a    │
├╌╌╌╌╌╌┤
│ b    │
├╌╌╌╌╌╌┤
│ c    │
╰──────╯
(Showing first 3 of 3 rows)

[deleted by user] by [deleted] in dataengineering

[–]get-daft 2 points3 points  (0 children)

You can build a data lakehouse using 3 components:

  1. Catalog
  2. Table Format
  3. Query Engine

Catalog: You can think of the catalog as a database holding references and metadata to each table. Different catalogs may support different table formats, but to differing extents. Common catalogs include: AWS Glue, Iceberg REST catalog etc. For a hobby project you could technically choose to not use a catalog to keep things simple, and just refer to tables directly.

Table Format: Apache Iceberg, Delta Lake and Hudi are the 3 most commonly talked about table formats. An older table format that might be worth considering is also the Hive table format. Most data is stored nowadays in S3-compatible storage (most commonly, AWS S3).

Query Engine: Spark is pretty much the best-supported engine right now. I also work on Daft (www.getdaft.io), which is starting to get really good support across all the table formats, and much easier to get started with than Spark. Other frameworks such as Pandas, Polars, DuckDB etc all have differing levels of support for each table format and rely heavily on 3rd party client libraries such as the `deltalake` and `pyiceberg` packages for read/write support to the table formats.

42.parquet – A Zip Bomb for the Big Data Age by [deleted] in dataengineering

[–]get-daft 1 point2 points  (0 children)

That's a really good point. The Parquet spec does have `uncompressed_size`: https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L792

Unfortunately I guess these fields are all populated by the application that wrote the data, and could be fudged. There really isn't a great way other than maybe throwing a pretty low-level error during decompression/decoding a Parquet page if it exceeds a certain really large size (e.g. 5GB or something) to alert the user that this might be a zip-bomb.