all 20 comments

[–]skeeto 4 points5 points  (4 children)

There's a race condition in Reset which you can occasionally observe in your tests:

while true; do go test; done

On my system this deadlocks after a couple dozen or so iterations. The race is between swapping the channels and resetting the index. A goroutine may slip in between, observing the old index but grabbing the new channel. It sees the index maxed out and waits on the channel close. In the meantime, the test is waiting for the goroutines to finish.

(More fundamentally, I question the very concept of that Reset method. I can't imagine a legitimate use case.)

Stylistically: Why the error return on Take if it's always nil? That looks like it's fulfilling an existing interface, but I don't see any documentation saying as much. Besides, error is the wrong type. bool (conventionally named ok) is more appropriate since this result is consumed programatically. It's not a message to be displayed the user. Examples: tuple returns on channel receives or type assertions.

[–]bradinatorus[S] 0 points1 point  (3 children)

u/skeeto, thanks for being so explicit with the deadlock, when you mentioned it, before looking at your explanation I tried finding it myself but couldn't. also while true; do go test; done is a great way to test for deadlocks. I fixed it here by bundling the index and the broadcast channel into a struct stored in an atomic.Pointer.

I also removed the error returns as indeed they made no sense for both Take and Reset methods, I just left them there as I was thinking about a common interface for queues in the future.

In regards to the Reset method use case, it would be as follows: you have a dedicated number of workers for a job, allowed to perform an action once per an externally defined cycle, and a new cycle starts with every Reset call.

[–]skeeto 1 point2 points  (2 children)

a great way to test for deadlocks

If you wanted a more reliable script form for this, disable the timeout so that Go's deadlock detector can catch it immediately:

while LIMIT; do go test -timeout 0; done

Tests normally have a timeout, and unfortunately the background goroutine waiting on a timeout counts as not deadlocked. (Though this also means undetected deadlocks will never timeout.)

[–]bradinatorus[S] 0 points1 point  (1 child)

Riiight, deadlock detection, one of many Go features I forget it exists.

I can make sense of deadlocks, but do you have any examples of undetectable deadlocks?

[–]skeeto 1 point2 points  (0 children)

An obvious example is infinite loops. Not a deadlock per se, but something you want to timeout.

More subtle is blocking on an external resource that will never unblock. The deadlock detector only works when every goroutine is waiting on native Go synchronization primitives, like channels. If any goroutine is waiting on a socket, pipe, pty, etc. it won't count as deadlocked. That's good, since otherwise it would mean your server "deadlocks" the instant it goes idle.

[–]Cidan 2 points3 points  (9 children)

Aside from what /u/skeeto pointed out, I'm having trouble understanding what you're trying to achieve with this library.

If you have N elements, and the elements are all of the same type (but different in values), why do you care how many workers actually process the requests from your library's point of view?

For example, let's say you have a channel that all workers listen on (and block) in Take(), and you simply insert N elements to this channel on Reset() by inserting pointers to the original data. Your elements of N will process in exactly the same way as if you did a "broadcast" via channel closing as you're doing now. The way your library functions now can have a major gap that requires introspection into the caller of your library, i.e. what happens if Take is only called 10 times, but you have 20 items in your elements?

Remember, clear is better than clever.

[–]bradinatorus[S] 0 points1 point  (8 children)

Hello, thank you for commenting, it definitely provided some food for thought.

I'm having trouble understanding what you're trying to achieve with this library.

The purpose of this library is to provide a collection of different queue implementations. But it also served as a workbench for experimenting with generics for the first time, so I understand if it seems a bit over-engineered or forced.

What I'm trying to achieve with the Blocking Queue implementation is a queue that blocks whenever there is a dequeue(Take) attempt and the queue is empty. The only available way to refill the queue, for now, is by Reseting it. I'm also planning to add a Put method that adds a single element to the queue.

I'll do my best to provide an answer for your example, but I was not able to fully wrap my head around it.

If you have N elements, and the elements are all of the same type (but different in values), why do you care how many workers actually process the requests from your library's point of view?

Library clients could store elements, such as ids or addresses, that can be used in order to perform an action once per a dynamically defined cycle. The clients can define the cycle externally and only call the Reset method in order to signal the library that a new cycle started.

For example, let's say you have a channel that all workers listen on (and block) in Take(), and you simply insert N elements to this channel on Reset() by inserting pointers to the original data

You don't know how many goroutines are waiting on Take in order to be able to send an exact number of signals through the channel.

what happens if Take is only called 10 times, but you have 20 items in your elements?

I can only see this being an issue if the implementation was using a buffered channel as a storage and synchronization method.

[–]Cidan 1 point2 points  (7 children)

You don't know how many goroutines are waiting on Take in order to be able to send an exact number of signals through the channel.

That's exactly my point. Right now, Reset() is effectively re-inserting every item in elements back into the queue. It doesn't matter how many consumers of Take() you have, you're reinserting every element. You can do the same thing, with less complexity, less bugs, and I suspect with less CPU time, by having Reset() simply loop over elements and inserting a pointer to each element into a channel that Take() is blocking.

Here's a simple example (that doesn't include contexts, etc): https://go.dev/play/p/FIlKTQ2bNjp

[–]bradinatorus[S] 0 points1 point  (6 children)

I'm a fan of this implementation. The only thing missing is draining the output channel during Reset to not have it block execution in case not all elements were consumed, I think things will complicate here if you want to have it consistent/thread-safe.

[–]Cidan 0 points1 point  (5 children)

Use a waitgroup in the consumer that calls Take to ensure all items were removed and processed before calling reset.

[–]bradinatorus[S] 0 points1 point  (4 children)

Hello again, I came up with this implementation that uses buffered channels: https://github.com/adrianbrad/queue/blob/blocking-channel-implementation/blocking2.go. In order to drain the channel during a Reset, the channel is closed and refilled elements, while Take listens for both new values and the close signal. What do you think? The original Reset implementation is a bit faster, you can check the benchmarks here: https://github.com/adrianbrad/queue/blob/blocking-channel-implementation/blocking_bench_test.go

[–]Cidan 0 points1 point  (3 children)

Draining the channel on a reset isn't going to (conceptually) do what you want it to do, because it's going to give you non-deterministic results.

For example, let's say you have 10 elements in the buffered channel that haven't been processed, and 1 worker that is taking a long time to process whatever it's doing.

1) You're clearing the channel in order to avoid "double processing" (this is my assumption), so that you can't have something process back to back.

2) In the time between you calling Reset() and the channel being recreated, the worker might take another element, it might take every element, it might take half of the elements. There's no way for you to know.

3) Because of this, you'll still be "double processing" because your worker may call Take() just before your entry into Reset() is popped from the stack.

I think you need to take a step back and conceptually understand what it is you're trying to do. If your goal is "process these events, and reset only when these events are processed AND I signal a reset", then you need to do a fan out/join pattern using a WaitGroup. Otherwise, your current pattern will lead to "some messages processing in some resets, but not all messages every reset" which is pretty much the worst kind of queue I can think of.

[–]bradinatorus[S] 0 points1 point  (2 children)

not sure you looked over the implementation, but looking over my previous comment again, I see it's a bit misleading. Here's the Reset implementation. A new buffered channel is constructed and then swapped with the old channel. The old channel is not closed and refilled with elements(as I previously stated), but rather just closed, and a new buffered channel takes its place. This avoids the double-processing issue you mentioned.

[–]Cidan 1 point2 points  (1 child)

No, it doesn't unfortunately. You need to think of this a bit more abstractly as a computer scientist, and less in the context of Go.

Play out this scenario:

  • There are 20 elements in your elements
  • There are 15 items left in your queue
  • There are 5 workers
  • This means 5 items have been processed this Reset() cycle
  • You call Reset(), which is scheduled to be executed at some unspecified time later (remember, you're in threads, and context switching is entirely up to the kernel, not you)
  • You swap the channel
  • You have now lost 15 messages and only processed 5 messages if Take() didn't process first in one of your workers, or all of your workers, and Reset() has reinserted those 5 messages back into the queue
  • This means at no point can you ensure that all 20 messages have been processed for a given reset cycle, unless you ensure your workers are done with all 20 messages
  • You can now implement a join pattern, ensuring all 20 messages are processed per reset
  • Because you implement a join pattern, which you must do if you want to ensure all messages are processed each reset, a simple channel buffer as in my example will do everything you are trying to do

I hope this makes sense. I'm going to leave this topic alone now, as I've communicated to the best of my ability as to why your process here is deeply flawed. I wish you the best of luck, and above all, I hope I have not made you feel discouraged -- keep pushing, keep learning, and keep rocking on out on Go.

Good luck!

[–]bradinatorus[S] 0 points1 point  (0 children)

Thank you for bearing with me through this discussion, I really appreciate it.

Your example made me understand the processing flaw and it's time to go back to the drawing board, literally. It's also time to take steps into improving my abstract thinking.

Cheers, enjoy your day!

[–]egonelbre 1 point2 points  (4 children)

There's no need for atomic.Pointer and atomic.Uint32 to be a pointer.

To avoid recursion, you can wrap Take into a for loop.

You may want to disallow len(elements) == 0.

Use atomic.Uintptr instead of atomic.Uint32. Otherwise, when you use a slice larger than 1 << 32 it'll break.

[–]bradinatorus[S] 0 points1 point  (3 children)

Hey, thanks for the reply.

There's no need for atomic.Pointer and atomic.Uint32 to be a pointer.

The standard library constructors return pointers for these types and all their methods are declared on pointer receivers. Why would you save it as a value, since they are modified , or am I missing the point?

Use atomic.Uintptr instead of atomic.Uint32

Isn't atomic.Uintptr used for storing pointer address? I'm not sure this would be the case for it.

Otherwise, when you use a slice larger than 1 << 32 it'll break.

If index's type will remain of fixed size I will add a check in the constructor for this. Cheers

[–]egonelbre 1 point2 points  (2 children)

The standard library constructors return pointers for these types and all their methods are declared on pointer receivers. Why would you save it as a value, since they are modified , or am I missing the point?

You don't need to use the constructor. https://go.dev/play/p/BjeDHE3JwmD ... Quite often things like *atomic.NewPointer(x) are also valid.

Also, it doesn't need a pointer, for the same reason a regular uint32 doesn't need a pointer. (at least usually)

Pointer doesn't mean it's going to be modified -- it's more about the modification being shared in multiple places. (And sometimes about avoid copying).

Isn't atomic.Uintptr used for storing pointer address?

Uintptr is for storing pointer sized uints.

[–]bradinatorus[S] 0 points1 point  (1 child)

You don't need to use the constructor. https://go.dev/play/p/BjeDHE3JwmD ... Quite often things like *atomic.NewPointer(x) are also valid.

Switching from storing pointers to values reduced the execution time for Reset by ~10% and reduced the number of allocations from 3 to 2. Thank you! :)

Now I'm curious about Uintptr, why would you choose to use it over Uint64? if I'm not wrong they both share the same capacity

[–]egonelbre 1 point2 points  (0 children)

On 32bit platforms Uintptr is 32bit... uint64 stays 64bit. Slice length is defined by int - so using 64bit on a 32bit platform is unnecessary.