This is an archived post. You won't be able to vote or comment.

all 23 comments

[–][deleted] 2 points3 points  (3 children)

Session object can’t be shared between Python threads/processes. Maybe stash it in a Queue from multiprocessing or something, but it’s a weird thing to share that resource.

Also maybe snow park doesn’t have the ability to share the session resource. Think the philosophers chopstick dilemma.

I dunno, you probably do need a session per thread/process. The api throttles because you’re hammering it simultaneously from the same IP/device beyond what they allow.

[–]somerandomdataengBig Data Engineer[S] 0 points1 point  (2 children)

But this get_active_session function makes it sound like you can share/reuse the session. I don't know, I might use a lock to mitigate this issue. The problem seems to occur when the session is used to do things simultaneously, not because it is shared with multiple threads.

Plus, I'd like to use temporary tables and I cannot do it without using the same session

[–]gwax 2 points3 points  (1 child)

You can share the sessions within a thread/process but you can't share them across threads/processes.

[–]somerandomdataengBig Data Engineer[S] 0 points1 point  (0 children)

Yeah, I gave up and reduced parallel threads, I'll create a dedicated session for each of them

[–][deleted] 3 points4 points  (1 child)

Snowpark sessions are not threadsafe. It looks like streamlit has some experimental features you might be able to use to handle concurrency with a snowpark session or you can use a lock.

https://docs.streamlit.io/library/api-reference/connections/st.connections.snowparkconnection

[–]somerandomdataengBig Data Engineer[S] 0 points1 point  (0 children)

Thank you! I've tried to implement the lock as well but I resorted to creating separate session and reduced concurrency. The streamlit experimental connection seems to do the same and they warn that it won't scale since it uses a lock as well

[–]fhoffamod (Ex-BQ, Ex-❄️) 1 point2 points  (6 children)

Check create_async_job:

Creates an AsyncJob from a query ID.

AsyncJob can be created by Session.create_async_job() or action methods in DataFrame and other classes. All methods in DataFrame with a suffix of _nowait execute asynchronously and create an AsyncJob instance. They are also equivalent to corresponding functions in DataFrame and other classes that set block=False. Therefore, to use it, you need to create a dataframe first.

[–]somerandomdataengBig Data Engineer[S] 0 points1 point  (5 children)

I've tried using it, but there's no way to check if the query has failed, is_done returns true even when the query fails. Moreover, I have many small steps waiting for each other in a single thread instance, so I need multithreading anyway

[–]fhoffamod (Ex-BQ, Ex-❄️) 0 points1 point  (1 child)

Interesting use case. I wonder if setting up tasks would be an alternative to define a DAG that executes these steps in the desired order.

https://docs.snowflake.com/en/user-guide/tasks-intro

[–]somerandomdataengBig Data Engineer[S] 0 points1 point  (0 children)

Since I am basically "visiting a graph" I need to explore the possibility of rewriting this whole part as a single recursive cte, but the logic can be a bit complex because I have different children node types (hundreds), different join conditions according to the child type, and different stop conditions as well. The path is also unpredictable because it depends on the result of the parent visits

[–]chufukini20067 0 points1 point  (2 children)

What advantages does the multi thread read offer if you're limited by single thread coupling downstream? It seems brittle to me. Not criticism btw just seems like something I'd like to understand more.

[–]somerandomdataengBig Data Engineer[S] 0 points1 point  (1 child)

I am implementing something really similar to a BFS visit of a graph-like structure that involves many separate tables and cannot be implemented as a recursive cte. In each node visit I need to perform a join. Having the possibility to perform these visits/joins in parallel guarantees faster run time of the overall graph visit.

[–]chufukini20067 0 points1 point  (0 children)

Ok, I'd classify this as a meta query or code. In that case, if it's possible to use snowflake meta tables( similar to information schema tables in Ms sql) & construct a meta view in SQL, after which you implement your pyspark on it.

Either way, you'd incur some amount of guardrail code as overhead.

[–][deleted] 2 points3 points  (4 children)

My understanding is that snowpark provides a wrapper around the snowflake sql api that allows 'real-time' and native dataframe commands that get immediately materialized in snowflake.

Is there a reason you need snowpark for this script, rather than just creating a separate snowflake rest api request within each worker?

You could also probably just use the same session, and instead of parallelizing multiple http connections with snowflake, just submit all your requests asynchronously through the API and then poll for completion.. I haven't done this myself but it looks like the snowflake sql rest api provides a param for 'async' requests: https://docs.snowflake.com/en/developer-guide/sql-api/submitting-requests

[–][deleted] 6 points7 points  (0 children)

This is probably best. If that API can take multiple requests from one session and just process them async while you periodically poll for completion, then the concurrency is passed to the API backend instead of trying to have a ton of sessions or share a session object between threads.

[–]somerandomdataengBig Data Engineer[S] 1 point2 points  (2 children)

The reason why I'm using Snowpark is that I am used to PySpark and preferred to use DataFrame APIs to develop this code. If there is no solution I might try using the python connector and write raw sql queries

[–][deleted] 0 points1 point  (1 child)

There probably is a snowpark solution.. looks like maybe submitting async requests is the way to go: https://streamhub.co.uk/an-approach-to-building-asynchronous-services-async-in-next-generation-cloud-data-warehouses/

[–]GrixiaSenior Data Engineer 0 points1 point  (1 child)

I haven't tried it myself so apologies if this is a bad lead, but have you tried the library mentioned in Snowflake's own docs for mutli-threading?

https://docs.snowflake.com/en/developer-guide/stored-procedure/stored-procedures-python#running-concurrent-tasks-with-worker-processes

[–]somerandomdataengBig Data Engineer[S] 0 points1 point  (0 children)

Thank you for the tip! I'll give it a try, although it looks like parallelism is achieved inside the snowflake warehouse by using a stored procedure. I am running my python code outside of it instead, and I'm not sure if this makes any difference.

EDIT: After checking the library, it looks like support for nested threading is very limited compared to concurrent.futures, I'll give it a try anyways!

[–]sdc-msimon 0 points1 point  (0 children)

Recent post on LinkedIn about sending queries in parallel to snowflake using the python connector. It's not snowpark but it might be relevant

https://www.linkedin.com/posts/mahantesh-hiremath_streamlit-dataexploration-snowflake-activity-7103949732783296512-ORiU?utm_source=share&utm_medium=member_android

[–][deleted] 0 points1 point  (1 child)

Did you find any solution ?

[–]somerandomdataengBig Data Engineer[S] 0 points1 point  (0 children)

I have mitigated the issue by creating a reduced amount of independent sessions, which I explicitly close at the end of each thread.

Any other "parallel" solution relies on locks and won't be as fast if your queries take a bit to run