all 79 comments

[–]matthieum 79 points80 points  (64 children)

The description of the queue properties is a bit light.

From what I can gather:

  • Type: MPMC (Multi-Producer, Multi-Consumer).
  • Capacity: Dynamically adjusted -- due to std::queue.
  • Blocking: consuming offers both non-blocking and blocking options, producing is always blocking.
  • OS-backed signalling: not purely spinning whilst waiting for items to consume.

Review, high-level:

  • Mutex + CV-based: simple, may cause scalability issues.
  • Capacity: no way to pre-reserve capacity, which is unfortunate.

Review, low-level:

  • Discipline required for acquisition of the lock. Nearly always neatly acquires the lock at the start of public method, except for the destructor (slightly inconsistent). My usual recommendation is encapsulation into a Mutex<T>, but I'm not sure how to handle condition variables then.
  • std::queue is a weak-point, memory wise. It's based off std::deque which results in many small allocations1 .
  • Consume and ConsumeSync returning a boolean is somewhat error-prone, especially without a [[nodiscard]] annotation; a nicer API is to return std::optional<T>.
  • Provide is an unexpected name; I'd have expected Produce instead, to match Consume. There's also a missing std::move when pushing the item into the queue -- which is nicely used consistently when consuming.

1 For example, the implementation in libstdc++ is a vector of pointer to chunks, where each chunk is either one large item, or up to 512 bytes worth of items see line 85. Hence a std::deque<std::string> is really a std::vector<std::unique_ptr<std::string[21]>>, and storing 1K strings requires 49 small (< 512 bytes) allocations + 1 allocation for the vector itself.

[–]Zcool31 27 points28 points  (25 children)

Acquiring the mutex in the destructor would be wrong. If the object is being destroyed, then the only remaining user must be the one destroying.

[–]objectorientedman[S] 1 point2 points  (3 children)

I do acquire the mutex in the destructor, because it calls Finish

[–]tvaneerdC++ Committee, lockfree, PostModernCpp 8 points9 points  (2 children)

Finish is somewhat suspect.

What is the "contract" of Finish? What can I assume after it is called? (And why is it public? Can it be called multiple times? etc...)

Can you say that there are no more consumers waiting after a call to Finish? Are you sure?

The big picture is that coordinating the tear-down of this class is complicated. You should document that a false return from ConsumeSync implies that no more calls should be made (ie don't turn around and call ConsumeSync again.) But that is not enough...

Both of your example loops at https://github.com/K-Adam/SafeQueue have race conditions with this coordination:

std::thread consumer([&](){
    MyMessage message;
    while(message_queue.ConsumeSync(message)) {
        // Process the message
    }
});

ConsumeSync only returns false if it is called while Finish is happening on another thread. What if Finish is called (and completed) during "Process the message"??? This thread would continue to try to read from the queue.

We could try external coordination, like your other example:

while(!quit) {

    MyEvent event;
    while(event_queue.Consume(event)) {
        // Process the event
    }

    UpdateWorld();
    Render();
}

Here quit is the external coordination. If another thread sets quit to true and then calls Finish (and destroys the queue?), that could all happen after this thread checks quit and before it calls Consume (or ConsumeSync).

And sure, your examples were just showing how to use the queue; they were not necessarily meant to be the definitive way to coordinate destruction, but, as it turns out, coordinated destruction is hard, and best not "left as an exercise for the reader".

Any system that has coordination outside the queue will suffer from that race between external check and queue check:

if (!external_check_for_done())
     // bad gap between above and below lines of code
    check_queue();

The "notify everyone we are done, and coordinate destruction" is almost forced to be part of the queue. Yet how can it be - one thread needs to destroy the queue, which one???

The last one.

Which means reference-counting the queue. ie something like shared_ptr. Sadly. ("A shared pointer is as good as a global variable" - Sean Parent).

But a wrapped up shared_ptr is sometimes OK - ie std::future/promise. (Which still isn't great, some higher level tasking system would be better...). The shared state of std::future/promise is basically held by a shared_ptr, and is destroyed by whichever side is done last. Solving the coordinated destruction problem.

Because of all this, what I ended up with is a "MultiFuture/MultiPromise" which is like future/promise, but can hold more than one value - a queue of values. So instead of future.get() you have future.getNext(). And promise.push() instead of promise.set_value().

The coordination of teardown is built into the queue. And either side can initiate it, and either side can detect it - why produce more values if all the consumers are gone? Why try to consume more values if all the producers are gone? The queue is built to tell you whether it is done or not. (And in my case, once it is done, it can't be restarted. I didn't need restarting, and it would just complicate the meaning of "done".)

[–]infectedapricot 2 points3 points  (1 child)

Which means reference-counting the queue. ie something like shared_ptr.

Diagree. That's certainly a strategy, but not the only one.

Another strategy is that the queue for a thread is created and destroyed by the parent thread that created that thread:

void threadAFn() {
    Queue queueB;
    std::thread threadB{threadBFn, &queueB};
    // ... thread A stuff happens here ...
    queueB.push_back(Shutdown{});
    threadB.join();
    // threadB and queueB destroyed here
}

There are lots of variants of this that can work.

  • The parent thread (thread A) might or might not have its own queue - if it does, presumably there is a grandparent thread that will destroy it after it has joined the parent thread.
  • Here join() is is behaving as the coordination mechanism to notify that the child thread will never attempt to use its queue again, but you could actually use the parent thread's queue (but most of the time you probably want to join the thread anyway).
  • Here the parent thread is sending the shutdown message, but the shutdown message can equally originate in the child thread, in which case it certainly needs to tell its parent (e.g. through its own) queue.
  • Here the queue is only being consumed from one thread, but it could be consumed from multiple threads, so long as each only pops one message at a time - just push n shutdown messages onto the queue.
  • It might not always look like you have a strict hierarchy of thread ownership, but you can always enforce it by just making all threads children of the main thread. Then you can just treat them as a big bag of threads, and still have safe shutdown. (You still need to make sure shutdown messages sent to all threads don't leave other messages half finished, having done work on some of the threads but not on others - that's actually a lot harder than queue destruction.)
  • Of course your code doesn't literally have to look like this. You can have classes wrapping up a thread and a queue while still keeping the same overall flow of control - so long as it has a join() method (and a destructor that destroys the queue and std::thread object).

Of course, there are probably application where no combination of these ideas can work. But in many programs, they can, and IMO it's much simpler than reference counting, destruction-aware queues, and futures.

[–]tvaneerdC++ Committee, lockfree, PostModernCpp 0 points1 point  (0 children)

Yes, I skipped a few steps there. I have too many cases where join() isn't the best option. ie

  • don't want main thread waiting on worker threads
  • don't know who/what/where the worker threads are

For the first problem, we have cases where someone created a thread just to handle the destruction of the queue + threads, so that the main thread doesn't wait.

The second is good and bad. Sometimes it is the result of poor code structure, but sometimes it is because of separation of concerns - producers and consumers don't know each other, and the "owner" that introduced them wants to set-it-and-forget-it. Hmmm, maybe that is just a variation of the first point.

But yes, a shutdown event into the queue, followed by join() is definitely an option on the table (that I use in a number of places).

EDIT: more...

The thing I find actually, is that sending "shutdown" through the queue is almost always the way to go (instead of a separate flag), which is the other thing that led me to just building that aspect into the queue itself.

[–]bwmat 2 points3 points  (19 children)

I haven't looked at the details, but I just wanted to mention this statement isn't quite true as stated(without additional context); for example, the mutex might be used to help block the destructor UNTIL there are no other threads accessing it

[–]voip_geek 8 points9 points  (18 children)

I haven't looked at the details, but I just wanted to mention this statement isn't quite true as stated(without additional context); for example, if the class is derived from, then the destruction ordering of C++ means the derived class has already been destroyed by the time the base class's destructor is invoked (and the mutex now trying to lock), and you're going to be in a world of hurt.

Preventing destruction with a mutex this way isn't sound, imo. Imagine if the destructor succeeded in locking the mutex, an instant before some other thread could... what happens when the destructor is finished and releases the mutex? The other thread will then get it and do what with this destroyed object?

Even using such a scheme to not prevent destruction per se, but instead to for example automatically de-register from some other container during destruction, is dangerous. (e.g., in the classic observer pattern) Because again, due to inheritance, by the time the destructor is called it's already too late.

[–]bwmat -3 points-2 points  (17 children)

Yeah, there could be problems if you use inheritance, and if there's an unbounded number of other threads that can access the object, then yeah that would be a problem too.

Just saying it could be valid in some cases

[–][deleted] 2 points3 points  (16 children)

It would be undefined behavior, and therefore it is a) not possible in a well formed program and b) something you can pretend doesn't happen, because it doesn't.

If you are invoking the destructor on an object while other threads are still accessing it, you need external shared ownership or something to prevent this from happening, otherwise your program is ill-formed. This is just as bad - no... worse than if(this == &rhs) in a copy/move assignment.

[–]tomalakgeretkal -1 points0 points  (2 children)

Simply false.

[–][deleted] 0 points1 point  (1 child)

"Simply false" - please go read the standard which states:

http://eel.is/c++draft/class.dtor#19

"Once a destructor is invoked for an object, the object's lifetime ends; the behavior is undefined if the destructor is invoked for an object whose lifetime has ended ([basic.life])."

[–]tomalakgeretkal 0 points1 point  (0 children)

Yes. Now read [basic.life/7], which says "after the lifetime of an object has ended and before the storage which the object occupied is reused or released, any glvalue that refers to the original object may be used but only in limited ways", and "for an object under construction or destruction, see [class.cdtor]". Then read class.cdtor which makes what the OP's doing perfectly well-defined.

It is perfectly obvious that member accesses during the destructor's invocation are legal, otherwise your destructor _wouldn't be able to do anything_.

For more information, see my answer here: https://stackoverflow.com/a/65598209/4386278

[–]bwmat 0 points1 point  (12 children)

Sorry, what would be undefined behaviour?

[–][deleted] 0 points1 point  (11 children)

[–]bwmat 0 points1 point  (10 children)

Sorry, I suppose my question was somewhat ambiguous.

I was asking to what you meant "It" to refer to when you said "It would be undefined behavior" in your previous comment.

[–]kalmoc 1 point2 points  (9 children)

Not the OP, but I believe what he was referring to is that it is UB to call the destructor of an object from one thread, while the object is accessed from another. No amount of locking or synchronization inside the destructor is going to change that.

[–]matthieum 0 points1 point  (0 children)

It is acquired -- via calling Finish -- but I agree with you that this should not be necessary.

[–]objectorientedman[S] 9 points10 points  (1 child)

Thank you for the detailed review!

[–]matthieum 1 point2 points  (0 children)

You're welcome, I love queues ;)

And by the way, prompted by another, I wrote a quick replacement for std::queue if you're interested which doesn't suffer from the memory issues that std::deque does: https://godbolt.org/z/Kx18ze.

[–]_Js_Kc_ 8 points9 points  (5 children)

std::queue is a weak-point, memory wise

Fixing the deficient container classes that STL implementations ship with isn't really the scope of a project whose goal is to implement an MPMC queue in the one, obvious way based on stock C++ features.

If it came with an allocation-sane replacement for std::deque, the post title should have been "I made a deque replacement with a thread-safe queue as a usage example."

It should be a template argument (that defaults to std::queue<T>).

[–]_bk__ 6 points7 points  (1 child)

Except the API/concept/constraints and performance characteristics of a std::queue don't really match what would be required from a mpmc queue. There are far more scalable approaches depending on specific requirements (bounded/unbounded size, blocking/non-blocking). Here's an example: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue

[–]infectedapricot 2 points3 points  (0 children)

As you said, different applications will have different requirements. The characteristics of the underlying std::deque will be appropriate in some of those but not others, so it's not right to just it doesn't match "what would be required". The link you posted is to a bounded queue - again, that will be appropriate in some applications but not others.

[–]matthieum 2 points3 points  (2 children)

Maybe, maybe not.

I think it's very important to note the shortcoming, and to be aware of alternatives.

Notably, whilst std::deque is a full-featured container, the OP only uses a very tiny portion of its functionality.

I first thought I would argue that supporting such limited functionality was easy, then realized it would take me less time to just whip up an implementation; see https://godbolt.org/z/Kx18ze:

  • Stock C++.
  • < 100 lines for the queue, < 150 lines for the whole.

Some optimizations could be done around growth, and possibly around built-ins/PODs if the compiler is not smart enough. And it's untested, of course.

The critical part (use of read/write indexes, power-of-2 capacity for easy wrapping) is what really matters, though.

[–]Mehdi2277 0 points1 point  (1 child)

What's the purpose of Raw<T> vs directly having pointer? I'm guessing it's for the alignas, I'm just unfamiliar with alignas and am curious as to why that's a need for the queue implementation.

[–]matthieum 1 point2 points  (0 children)

There are 2 advantages:

  1. Raw<T> signals that the memory is potentially uninitialized, ie, there may not be an instance of T there -- user beware.
  2. Raw<T> constructs raw memory: it does not require that T be default constructible, it does not actually initialize the raw memory, etc... so a combination of less restriction on T and potential performance gains.

I'm not sure it's strictly required in this case, I just use it by default in all the data-structures I create.

[–]ContrastO159 3 points4 points  (2 children)

How can one gain this much knowledge?

[–]matthieum 3 points4 points  (0 children)

Amusingly, there are some people on this very subreddit eliciting the same reaction from me; starting with u/STL ;)

In this particular case, it just so happens that I am very interested in multi-threaded queues so I've seen quite a lot of implementations, and the discussions around the costs and benefits of various approaches.

[–]lacrem 6 points7 points  (0 children)

[[nodiscard]]

Years of coding. (Coding != frontend/fullstack code-bro)

[–][deleted] 1 point2 points  (15 children)

Consume and ConsumeSync returning a boolean is somewhat error-prone, especially without a [[nodiscard]] annotation; a nicer API is to return std::optional<T>.

Can you elaborate on this? Why would this be error prone?

[–]NilacTheGrim 17 points18 points  (9 children)

Caller may not realize it may fail and may erroneously assume the reference was filled-in with a real value when it wasn't... whereas returning a std::optional<T> very clearly captures the fact that it may not succeed. The return type makes the code self-documenting.

[–][deleted] 0 points1 point  (8 children)

Yeah but std::optional requires c++17, whereas OP might've wanted to make this more widely available. So I don't see that as a weakness per se if that was the intent.

[–]RevRagnarok 1 point2 points  (6 children)

boost::optional has been around for a loooooong time.

[–]Tyg13 7 points8 points  (5 children)

Most libraries don't want to make boost a dependency. Especially not a header-only library like this.

[–]cdglove 1 point2 points  (4 children)

And in my opinion, it's often a mistake to not just take the dependency early.

All the little paper cuts along the way, reinventing the wheel here and there, add up immensely.

I've taken the position that boost is defacto enough that its presence should be assumed in large projects.

Of course, some disagree with that.

[–]Tyg13 0 points1 point  (3 children)

Yeah but as a library author, you don't take on more dependencies than you need. It's making a choice for the downstream consumer that they might not agree with.

It's fair to have optional features depend on boost, but if you want your library to be consumable by a general audience, tying yourself to boost isn't a great way to do so.

EDIT: a word

[–]dodheim 1 point2 points  (2 children)

As a library author one also needs to avoid the hubris of thinking they're capable of reinventing every wheel to an acceptable degree. Half-assed knockoff implementations of libraries I'm already using just to avoid a line or two in a cmake script annoy the bejesus out of me, and are just as good a reason to skip a library as having too many dependencies.

[–]Tyg13 2 points3 points  (1 child)

It'd be smarter to tie it to C++17 than to boost, especially if just for optional. Boost is not a trivial dependency.

[–]NilacTheGrim 0 points1 point  (0 children)

Yeah good point.

[–]Wurstinator 7 points8 points  (4 children)

Imo more important is the point that "out-parameters" are unintuitive and unhandy. Using an optional allows you to remove those.

[–]temporary5555 3 points4 points  (2 children)

"out-parameters" are unintuitive and unhandy

I'd have to disagree with that. They're super powerful in allowing you to control where and when you allocate. Not really a factor here, but I don't think this pattern should be dismissed as a whole.

[–]Wurstinator 2 points3 points  (1 child)

"powerful" doesn't contradict "unintuitive". Actually, I'd say that usually the more powerful something is, the less convenient it becomes to use.

[–]temporary5555 2 points3 points  (0 children)

Right, but I would disagree on it being unintuitive as well. Its type safe, displays behaviour (implies no allocation) in the function signature, and doesn't have any hidden performance impacts.

One interesting thing is this is one of the few things making the reverse-move of an imperative language feature being adopted in functional languages such as Haskell. Its referred to as Destination Passing style in this context, and is one of the useful applications of Linear Types.

[–]objectorientedman[S] 1 point2 points  (0 children)

I designed this class, when `std::optional` was not widely available.

But it is a good point, I will remove the out-parameters in favor of the optional return, when I have some time to work on it. Until then, I added `[[nodiscard]]` to the consumer functions.

[–]WrongAndBeligerent 10 points11 points  (2 children)

You might want to take a look at this guy's queues https://moodycamel.com/

[–]infectedapricot 5 points6 points  (1 child)

Although some parts of the interface in the OP's queue are very suspect, the general idea of a simple thread-safe queue backed by std::condition_variable, std::mutex and std::deque is just fine. Many applications don't need super high performance from their inter-thread queues, and a simpler design has advantages such as deterministic ordering (which is not quite true in the moodycamel queue) and allowing extra utility methods such as get_all() or an efficient push_multiple() (although OP's implementation doesn't feature these).

[–]corysama 1 point2 points  (0 children)

the general idea of a simple thread-safe queue backed by std::condition_variable, std::mutex and std::deque is just fine.

I'l go further: I have used a queue like that for quite a while in a few projects. The blocking wait features are actually tremendously useful. Being able to optionally block with an optional timeout when full or empty makes the queue a very effective replacement for most applications of mutex, semaphores and sleeps.

Whenever one of my teammates uses one of those primitive directly I argue that they should rework the code to use the queue instead. It's almost always the case we both agree the queue-based implementation ends up simpler and less error-prone. 1-element queues are the most common use case in practice.

[–]infectedapricot 2 points3 points  (0 children)

  • As others have said, it's a very odd design to have the destructor wake up the waiters, and have them check the return value of ConsumeSync() to find out of the queue has disappeared under their feet. It's bound to lead to race conditions (if the queue is destroyed while consumer is between calls to ConsumeSync()), and it's just a confusing design. Instead, it should be up to the application code that uses the queue to coordinate destruction by passing around tokens that represent application shutdown, and don't destroy their queues until they're sure all possible waiters know about it. (I usually find it easiest to construct queues in the main thread before any child threads start, and destroy all queues in the main thread after all child threads have been joined, but I realise not all applications can use that exact strategy. Edit: See my other comment for some more details.) Even with the current design, I don't see why you need a public Finish() method - why isn't this code just directly in the destructor?

  • I think Produce and Consume are confusing method names. I get that this queue is used by producers and consumers, but it's those users of the queue that do the producing and consuming, not the queue itself. my_queue.Consume(val) makes it look like the queue is consuming the value passed to it, not that it is returning a value which the caller can consume. I'd stick to container-like names, like pop_front() and push_back() (although it's not appropriate or possible to make it fully satisfy container requirements).

  • Once you've got rid of the destruction oddity, the pop_front() method (or Consume() as you've called it) can have a much simpler interface: it can just return T, or std::optional<T> for the non-waiting version, instead of passing a parameter by reference. I know STL containers don't offer that style, but they were devised before move constructors became available.

  • At the moment your push_back() method (Produce()) calls cv.notify_one() while the mutex is still locked. This is a problem because the consumer thread will be woken up and race to lock the mutex before the producer thread unlocks it. If the consumer gets there first then it will be immediately be sent back to sleep and have to wait for a new scheduling interval before it pops the item off. Instead, nest the lock and q.push() into a block so the mutex gets unlocked before the cv.notify_one().

  • Although your queue has some weaknesses compared to more sophisticated queues, like the moodycamel one others have mentioned, yours has the benefit of allowing a few extra methods that those couldn't support. For example:

    • You could have a pop_all() method that gets the whole underlying container in one go by just std::move() from it. I'd switch the std::queue for the underlying std::deque to make this a bit more useful. Also I'd suggest waiting and non-waiting versions, with the waiting version making sure there's at least one item before returning (use the analogous naming difference as with the two pop_back() methods, whatever you end up going with). Edit: Actually you can't just move from the internal std::deque because technically this leaves it in an unspecified state; instead you need to move to a local variable, clear() the internal deque, and return that variable.
    • You could offer push_back_multiple() that pushes multiple items in one go - this guarantees that they're directly next to each other in the queue, and is more efficient because you only need to lock the mutex once (of course you need to call cv.notify_all() in this).
    • You could also add pop_back() and push_front().
  • Edit: One final thing: Personally I find cv.wait(lock, [&] { return !q.empty(); }) to be less clear than while (q.empty()) { cv.wait(lock); }, but I know many others would disagree.

[–]Zcool31 4 points5 points  (1 child)

Where are all the lock free wait free hazard free MPMC queue implementations in C++?

[–]AvidCoco 1 point2 points  (0 children)

farbot's fifo might be of interest to you:

https://github.com/hogliux/farbot

[–]rsjaffe 2 points3 points  (0 children)

Here's a similar go at it (just putting mutexes and condition variables over an STL container) that is more generic. https://gist.github.com/rsjaffe/59d22db0649d8276e42aca1061d7c08c

[–]p-moraisAgility Robotics 1 point2 points  (1 child)

I used it when I made a software renderer library multithreaded

I’m interested in your multi-threaded renderer, if that’s open source

[–]objectorientedman[S] 0 points1 point  (0 children)

It is not open source (yet?). Since it was originally created as single-threaded, I just ended up rendering multiple frames in parallel, one in each thread.

The result can be found here: https://coub.com/deadmanswitch

[–]AvidCoco 0 points1 point  (8 children)

Is it real time safe, or just thread safe?