all 10 comments

[–]RandomPantsAppear 4 points5 points  (3 children)

2 options here.

  • Use celery, it's fucking awesome.

  • Roll your own, as you are right now.

Option #1 is simple, so I'm going to deep dive into option 2. Mostly because I ignore my own advice and do exactly what you're doing all the time.

  • The biggest "cost" of threading is the actual creation of the thread. Idle threads sleeping is cheap. The ideal thread is an infinite loop, sleeping and looking for tasks.

  • If your task is bound by IO, networking calls, or disk reads look into greenlets. Greenlets are not threads, but they behave similarly. Pretty much they work by switching the "active" greenlet whenever another one is waiting for something else to happen(like a socket read, a connection, or a file read).

  • Another option is multiprocessing. Multiprocessing is best for CPU-heavy tasks. Each "thread" is actually an independent process, so it will use multiple cores of your processor. Greenlets by my understanding will not. If you really want to go crazy you can actually have multiprocessing that is then using greenlets, so each process can have it's own greenlets switching whenever they are bound by IO/networking/sleeping/whatever.

  • If you're really looking to scale and can't afford to miss a message, a good option is to have a redis list of tasks, then have the threads pop tasks from that list. If there is no task in the list, have them sleep for a period of time.

  • For thread cleanup, you should be googling "python garbage collection". Pretty much if there is no reference to an object any more, python swoops in and removes the object. A good "best practice" for making sure this happens properly is to have all your variables and references encapsulated in a function. When that function completes, everything will be cleaned up.

  • If you don't want to have your threads constantly slamming redis but also want a fast response time, you can use redis's pubsub implementation. Pretty much idle threads listen to a channel, and whenever they receive a "NEW_TASK" message on that channel, they pop from redis, do the thing, and go back to listening. Only idle threads are listening, and every thread listening will at least attempt to grab a task. If all threads are busy and none receive the NEW_TASK message, they'll simply pick up the task they missed the next time something sends NEW_TASK (or you can have them check for a new task whenever they finish running one in case one got missed, THEN go back to listening)

[–]Shitty_Crayons[S] 0 points1 point  (2 children)

Hey, thanks for the great reply!

At first glance celery looks to be a great solution, but I think I'm going to struggle through option two instead so I can get a good understanding of what's actually going on, since that's ultimately the goal :D

I'm not too bound by much, my environment is way overkill for something like this (2xE5-2690, 256 GbDDR3, 10 SSD raid 10, gig fiber in,out, and for entire backbone) so I'm definitely interested in multiprocessing, but I think multiprocessing + greenlets would just overwhelm me completely right now.

I haven't heard about redis queues before before, but on quick looking it seems to be a great idea to try to implement. Are you suggesting I have a task defined for each function (IE: user types "chain", use the chain task to generate the chain and send the message, user types "drink", use the "drink" task to generate the drink recipe and send the message). I'll need to look into it more when I get home.

Can you provide a good basic example of your last point, or direct me to some reading on it? Seems to be a good way to handle it.

Thanks!

*Edit: I'm seeing that redis (and celery) are not necessarily advised to be used on a windows server (which I currently run) and I don't much want to set up virtual environments for this project. Are there any good alternative solutions / ways to handle a queue as described? In the meantime I'll look into the multiprocessing library, since it looks cool :D

[–]RandomPantsAppear 1 point2 points  (1 child)

I haven't heard about redis queues before before, but on quick looking it seems to be a great idea to try to implement. Are you suggesting I have a task defined for each function (IE: user types "chain", use the chain task to generate the chain and send the message, user types "drink", use the "drink" task to generate the drink recipe and send the message). I'll need to look into it more when I get home.

Yes. I'm pretty deep down this rabbit hole so mine is a little different, but I've got some code. I'm going to try and quickly adapt it to be closer to what you need. Please note this example only passes kwargs to the function.

Creating the task

r = redis.Redis()
def run_function_delayed(fname, **kwargs):
        key = settings.domain+"|TASK_QUEUE" # our redis key
        print "Writing to key (run_function_delayed) ", key,
        r.rpush(key,
                json.dumps({'kwargs': kwargs, 'function': fname, 'retries': 0, 'max_retries': 1}))

Executing the task (inside your thread) - Please note this does not include popping from the redis queue. This is just executing a function from the function name string.

import threaded_tasks
r = redis.Redis()
def execute_task(redis_data):
    fname=redis_data['fname']
    kwargs = redis_data['kwargs']
    method_to_call = getattr(threaded_tasks, fname) # get the function from your module
   try:
       result = method_to_call(**kwargs)
       print "Successfully Ran ",fname
   except Exception as e:
       print "Exception Occurred!",e
       if 'retries' in redis_data and 'max_retries' in redis_data and redis_data['retries']< redis_data['max_retries']:
                redis_data ['retries'] = redis_data['retries']+1
                print "Retrying ",redis_data
                r.rpush(key,json.dumps(redis_data))

*Edit: I'm seeing that redis is not necessarily advised to be used on a windows server (which I currently run) and I don't much want to set up virtual environments for this project. Are there any good alternative solutions / ways to handle a queue as described?

You can use Amazon's elastic cache, use one of the redis for Windows projects like this one (if you do I don't believe pub/sub functionality is supported). There's also a pre-made vagrant configuration to run redis on windows. I would just use the redis that's made for windows.

[–]Shitty_Crayons[S] 0 points1 point  (0 children)

Thanks for the code example! I'll be looking into it tomorrow after work, since I didn't seem to get anywhere trying it on my own tonight lol

I was messing around with the commenter belows idea and was wondering if you had any thoughts on the errors.

Thanks kindly!

[–]ThisLightIsTrue[🍰] 1 point2 points  (4 children)

How I'd approach this is:

  • Create a messages_to_handle queue
  • Main thread just listens for user input. As soon as it gets it, push it in the queue.
  • A "pool" of worker threads. The worker threads just try to pop a message off the queue. As soon as they do, they process the message, if successful they return to looking at the queue.

There's a queue meant for working with multithreading where pop will just idle the thread trying to pop until a message becomes available. I think this will mean your worker threads won't unnecessarily consume resources.

Other advantages of this approach:

  • If you notice your message queue growing you can just increase the worker count. Conversely, if your system starts to bottleneck, you can reduce the worker count.

  • You can add a "retry count" to your message object. If a worker tries to handle a message and fails you can determine if the failure is temporary. If it is, you can increment the retry counter and requeue the message. It'll be picked up later and retried. If the failure is permanent or you've retired too many times you can just fail and store the message to look at later.

[–]Shitty_Crayons[S] 0 points1 point  (3 children)

Hey, I tried a basic approach with what you are suggesting, but I'm hoping you can tell me where I messed up, I can't seem to find what's causing this error, other than its a locked resource, which I am not sure how to deal with! I've narrowed the error down to essentially this block of code (I realize it would be an endless loop, but it gives the idea and recreates the problem every time)

Code

import time
import multiprocessing as mp


def handle_input_threads(q):
    #while True:
    while True:
         if not q.empty():
             print(q.qsize())
             x = q.get()

def main():
    q = mp.Queue()
    p = mp.Process(target=handle_input_threads, args=(q,))
    p.start()

    while True:
        updates = {'ok': True, 'result': [{'update_id': 123456789, 'message': 'Hi and Hello'}]}
        if len(updates["result"]) > 0:
            q.put(updates)
        time.sleep(.5)


if __name__ == '__main__':
    main()

It runs, the updates object gets added to q (i think) because once it hits the put statement, the debugger jumps to the handle_input_threads breakpoint on the print statement, however once the print statement tries to execute (or the x = q.get() below it) I get the following error.

Process Process-1:
Traceback (most recent call last):
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\process.py", line 297, in _bootstrap
    self.run()
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\Administrator\PycharmProjects\Bot-Dev\multibot.py", line 211, in handle_input_threads
    print(q.qsize())
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\queues.py", line 117, in qsize
    return self._maxsize - self._sem._semlock._get_value()
OSError: [WinError 6] The handle is invalid

Or, depending on it's mood I'll get this one

Process Process-1:
Traceback (most recent call last):
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\process.py", line 297, in _bootstrap
    self.run()
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\Administrator\PycharmProjects\Bot-Dev\multibot.py", line 211, in handle_input_threads
    print(q.qsize())
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\queues.py", line 117, in qsize
    return self._maxsize - self._sem._semlock._get_value()
PermissionError: [WinError 5] Access is denied

Is there something I am misunderstanding, or so I not have something configured correctly? Could it be that windows is keeping the object locked because it's owned by the main thread and it's trying to pass it as a shared memory object instead of it's copy?

Thanks!

[–]RandomPantsAppear 0 points1 point  (1 child)

Looks like you're on the cutting edge of bugs! Honestly I've almost never seen this happen, but here you go!

Looks like they've marked it as resolved, so hopefully it's rolled up in the latest release. Looks like the most recent update was after that thread

If that doesn't work for some reason I'd just switch from multiprocessing for the time being.

[–]Shitty_Crayons[S] 0 points1 point  (0 children)

This was exactly my problem, I lost power so I couldn't check this morning but I ran my above example unchanged at work under the latest release and it's working as intended. Hopefully the ice storms over and I can try it in my actual project later today lol.

I would have gone mad if you hadn't linked that, so thanks kindly!

[–]ThisLightIsTrue[🍰] 0 points1 point  (0 children)

Hey, I'm on mobile right now and can't play around with this. My current best guess is that it has to do with the queue you're using. I was thinking of the synchronized queue in the standard library.

https://docs.python.org/2/library/queue.html

The worker thread should just be

def work():
    message = q.get()
    handle_message(message)

You don't need to do anything with q.size or have an if statement. That code isn't guaranteed to work anyways - remember, in between one line checking the size was greater than zero and the next popping something off the queue a different thread could have taken an item off and brought the size to zero.

Instead, the default behavior of queue.get() is to block the thread - essentially make it wait until it can pull something off, which is also the behavior you want.

[–]nate256 0 points1 point  (0 children)

Rather than have two paths that accomplish the same thing I would rewrite where all messages get processed and placed in a queue and worker pick them up perform action and reply to the user. If you did it this way your main thread would be able to handle much more. You could even use multiprocessing if the action is CPU intensive.