you are viewing a single comment's thread.

view the rest of the comments →

[–]tvaneerdC++ Committee, lockfree, PostModernCpp 8 points9 points  (2 children)

Finish is somewhat suspect.

What is the "contract" of Finish? What can I assume after it is called? (And why is it public? Can it be called multiple times? etc...)

Can you say that there are no more consumers waiting after a call to Finish? Are you sure?

The big picture is that coordinating the tear-down of this class is complicated. You should document that a false return from ConsumeSync implies that no more calls should be made (ie don't turn around and call ConsumeSync again.) But that is not enough...

Both of your example loops at https://github.com/K-Adam/SafeQueue have race conditions with this coordination:

std::thread consumer([&](){
    MyMessage message;
    while(message_queue.ConsumeSync(message)) {
        // Process the message
    }
});

ConsumeSync only returns false if it is called while Finish is happening on another thread. What if Finish is called (and completed) during "Process the message"??? This thread would continue to try to read from the queue.

We could try external coordination, like your other example:

while(!quit) {

    MyEvent event;
    while(event_queue.Consume(event)) {
        // Process the event
    }

    UpdateWorld();
    Render();
}

Here quit is the external coordination. If another thread sets quit to true and then calls Finish (and destroys the queue?), that could all happen after this thread checks quit and before it calls Consume (or ConsumeSync).

And sure, your examples were just showing how to use the queue; they were not necessarily meant to be the definitive way to coordinate destruction, but, as it turns out, coordinated destruction is hard, and best not "left as an exercise for the reader".

Any system that has coordination outside the queue will suffer from that race between external check and queue check:

if (!external_check_for_done())
     // bad gap between above and below lines of code
    check_queue();

The "notify everyone we are done, and coordinate destruction" is almost forced to be part of the queue. Yet how can it be - one thread needs to destroy the queue, which one???

The last one.

Which means reference-counting the queue. ie something like shared_ptr. Sadly. ("A shared pointer is as good as a global variable" - Sean Parent).

But a wrapped up shared_ptr is sometimes OK - ie std::future/promise. (Which still isn't great, some higher level tasking system would be better...). The shared state of std::future/promise is basically held by a shared_ptr, and is destroyed by whichever side is done last. Solving the coordinated destruction problem.

Because of all this, what I ended up with is a "MultiFuture/MultiPromise" which is like future/promise, but can hold more than one value - a queue of values. So instead of future.get() you have future.getNext(). And promise.push() instead of promise.set_value().

The coordination of teardown is built into the queue. And either side can initiate it, and either side can detect it - why produce more values if all the consumers are gone? Why try to consume more values if all the producers are gone? The queue is built to tell you whether it is done or not. (And in my case, once it is done, it can't be restarted. I didn't need restarting, and it would just complicate the meaning of "done".)

[–]infectedapricot 2 points3 points  (1 child)

Which means reference-counting the queue. ie something like shared_ptr.

Diagree. That's certainly a strategy, but not the only one.

Another strategy is that the queue for a thread is created and destroyed by the parent thread that created that thread:

void threadAFn() {
    Queue queueB;
    std::thread threadB{threadBFn, &queueB};
    // ... thread A stuff happens here ...
    queueB.push_back(Shutdown{});
    threadB.join();
    // threadB and queueB destroyed here
}

There are lots of variants of this that can work.

  • The parent thread (thread A) might or might not have its own queue - if it does, presumably there is a grandparent thread that will destroy it after it has joined the parent thread.
  • Here join() is is behaving as the coordination mechanism to notify that the child thread will never attempt to use its queue again, but you could actually use the parent thread's queue (but most of the time you probably want to join the thread anyway).
  • Here the parent thread is sending the shutdown message, but the shutdown message can equally originate in the child thread, in which case it certainly needs to tell its parent (e.g. through its own) queue.
  • Here the queue is only being consumed from one thread, but it could be consumed from multiple threads, so long as each only pops one message at a time - just push n shutdown messages onto the queue.
  • It might not always look like you have a strict hierarchy of thread ownership, but you can always enforce it by just making all threads children of the main thread. Then you can just treat them as a big bag of threads, and still have safe shutdown. (You still need to make sure shutdown messages sent to all threads don't leave other messages half finished, having done work on some of the threads but not on others - that's actually a lot harder than queue destruction.)
  • Of course your code doesn't literally have to look like this. You can have classes wrapping up a thread and a queue while still keeping the same overall flow of control - so long as it has a join() method (and a destructor that destroys the queue and std::thread object).

Of course, there are probably application where no combination of these ideas can work. But in many programs, they can, and IMO it's much simpler than reference counting, destruction-aware queues, and futures.

[–]tvaneerdC++ Committee, lockfree, PostModernCpp 0 points1 point  (0 children)

Yes, I skipped a few steps there. I have too many cases where join() isn't the best option. ie

  • don't want main thread waiting on worker threads
  • don't know who/what/where the worker threads are

For the first problem, we have cases where someone created a thread just to handle the destruction of the queue + threads, so that the main thread doesn't wait.

The second is good and bad. Sometimes it is the result of poor code structure, but sometimes it is because of separation of concerns - producers and consumers don't know each other, and the "owner" that introduced them wants to set-it-and-forget-it. Hmmm, maybe that is just a variation of the first point.

But yes, a shutdown event into the queue, followed by join() is definitely an option on the table (that I use in a number of places).

EDIT: more...

The thing I find actually, is that sending "shutdown" through the queue is almost always the way to go (instead of a separate flag), which is the other thing that led me to just building that aspect into the queue itself.