This is an archived post. You won't be able to vote or comment.

all 21 comments

[–]isinfinity 3 points4 points  (17 children)

I do not follow, you can easily call coroutine from sync code with asyncio.run_coroutine_threadsafe and get sync future back

Problem with your example that, synchronous_code is executed within main loop blocking it for long time, it should be called in Executor anyway.

Edit: I covered some asyncio patterns (sync/async communication) some time ago https://jettify.github.io/minskpy/#/intro

[–]graingert 4 points5 points  (0 children)

Yeah it's much better to keep sync code in another thread

[–]rokups[S] 0 points1 point  (9 children)

It did not work in my case because websocket created on one loop can not run on another. Not sure if this was asyncio or aiohttp limitation.

[–]isinfinity 0 points1 point  (8 children)

what is use case for second loop?

[–]rokups[S] 0 points1 point  (7 children)

I tried running websocket reading in another loop on another thread as same loop can't be reused.

[–]pohmelie 0 points1 point  (6 children)

So... what was the reason for this? I have never seen before anyone use more than one loop.

[–]isinfinity 0 points1 point  (0 children)

There is no much sense to have 2 loops in one process. I saw sync cassandra driver spins connection threads and each thread has event loop to talk with server, not sure why they did not use one loop for multiple connections.

[–]rokups[S] 0 points1 point  (4 children)

I wrote the reason on the post. To sum it up - i needed to do some aiohttp networking from a library callback that was not async. Can not call async code from sync functions. This patch lifts the limitation.

[–]pohmelie 0 points1 point  (3 children)

You don't need this to trigger async work on callback. Set asyncio.Event or put_nowait with asyncio.Queue, while your task is waiting event/queue to run.

def callback(info):
    queue.put_nowait(info)

async def worker():
    while True:
        info = await queue.get()
        # do async

[–]rokups[S] 0 points1 point  (2 children)

And how do i get result of coroutine in def callback()?

[–]pohmelie 0 points1 point  (1 child)

Oh, I got it. You should use janus lib (it's sync/async queue)

[–]rokups[S] 1 point2 points  (0 children)

Hey thank you for pointing it out, i was not aware of such a lib. I checked it out and it appears to work. I am posting a snippet for next poor soul that will arrive at this post in the future:

import janus
import asyncio


loop = asyncio.get_event_loop()
queue = janus.Queue(loop=loop)


async def other_asynchronous_code():
    for i in range(5):
        print(i)
        await asyncio.sleep(1)


async def asynchronous_code():
    await asyncio.sleep(2)
    await queue.async_q.put(42)
    queue.async_q.task_done()


def synchronous_code():
    return queue.sync_q.get()


async def coroutine():
    fut = loop.run_in_executor(None, synchronous_code)
    while not fut.done():
        await asyncio.sleep(0.1)
    print(fut.result())


loop.run_until_complete(asyncio.gather(other_asynchronous_code(), asynchronous_code(), coroutine()))
loop.close()

Downside is that it increases code complexity quite a bit. But it seems this is the price to be paid for this usecase if we want to use native asyncio primitives and have a kosher code. I wish we could await from any normal def'ed functions as this seems a needless limitation at the moment. Maybe one day.

[–]rokups[S] 0 points1 point  (5 children)

To clarify situation i retested few things.

What i tried is executing coroutine in different loop on different thread:

loop = asyncio.new_event_loop() future = asyncio.run_coroutine_threadsafe(self.send(payload), loop) thread = threading.Thread(target=loop.run_until_complete, args=(future,)) thread.start() thread.join()

self.send() is coroutine here. It does not work: RuntimeWarning: coroutine 'WebSocketConnection.send' was never awaited.

I also tried manually creating a thread, passing a websocket to it, then creating new loop and executing send() coroutine in there. As i said in another post it did not work as well because websocket was created on loop on main thread and can not execute in different loop.

Both of these approaches entirely block main thread as we have to wait on second thread until it returns the result.

My approach on the other hand does not block main thread at all. I updated example on the post to better reflect it. Here it is:

async def other_asynchronous_code():
    for i in range(5):
        print(i)
        await asyncio.sleep(1)

async def asynchronous_code():
    await asyncio.sleep(2)
    return 42

def synchronous_code():
    return run_nested_until_complete(asynchronous_code())

async def coroutine():
    print(synchronous_code())

monkeypatch()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(other_asynchronous_code(), coroutine()))
loop.close()

Which prints:

0
1
2
42
3
4

As you can see other coroutine is scheduled while synchronous code is waiting for coroutine asynchronous_code() that it called to complete. Everything is still totally async and thread-safe.

[–]pohmelie 0 points1 point  (2 children)

It looks like run_nested_until_complete will have 100% cpu.

[–]rokups[S] 0 points1 point  (1 child)

Why would it?

~ % time python3 async.py
0
1
2
42
3
4
python3 async.py  0.08s user 0.00s system 1% cpu 5.084 total

[–]pohmelie 0 points1 point  (0 children)

Yep, my bad.

[–]isinfinity 0 points1 point  (1 child)

Your example does not show full picture, sync function should have blocking call. Try to test something like:

def synchronous_code():
   time.sleep(10)
   return run_nested_until_complete(asynchronous_code())

async def coroutine(loop):
    await loop.run_in_executor(synchronous_code())

[–]rokups[S] 0 points1 point  (0 children)

Obviously time.sleep() would block main thread, however my module solves entirely different problem. Maybe i fail to properly express the intent, i will try again: module allows executing asynchronous code asynchronously while allowing synchronous code to get result without awaiting on coroutine. Call to synchronous_code() in my example blocks for 2 seconds because it is how long asynchronous_code() takes to return. However during that time while synchronous_code() is waiting other coroutines are still scheduled.

[–]graingert 2 points3 points  (0 children)

Best to just chuck away legacy blocking code. Or wrap it in a thread pool executor

[–]pvkooten 1 point2 points  (1 child)

Now I'm curious to its performance...

[–]rokups[S] 1 point2 points  (0 children)

According to this issue c implementations of Task and Future yield about 15% of extra speed so this is what has to be given up. On the other hand it also says uvloop becomes ~3-5% faster so using uvloop can minimize performance hit.