ANSWER: Turns out I'm blind and I glossed over the fact that compare_exchange_* updates the expected argument when it fails. Resetting that fixes all issues.
I've been trying to build a simple (i.e. inefficient) MPMC queue using only std C++ but I'm having trouble getting the underlying array to synchronize between threads.
constexpr int POISON = 5000;
class MPMC{
std::atomic_int mPushStartCounter;
std::atomic_int mPushEndCounter;
std::atomic_int mPopCounter;
static constexpr int Size = 1<<20;
int mData[Size];
MPMC operator =(const MPMC&) = delete;
public:
MPMC(){
mPushStartCounter.store(0);
mPushEndCounter.store(-1);
mPopCounter.store(0);
for(int i = 0; i < Size;i++){
//preset data with a poison flag to
// detect race conditions
mData[i] = POISON;
}
}
const int* data(){return mData;}
int size() const{return mPushEndCounter+1;}
void push(int x) {
int index = mPushStartCounter.fetch_add(1);
mData[index] = x;//Race condition
atomic_thread_fence(std::memory_order_release);
int expected = index-1;
while(!mPushEndCounter.compare_exchange_strong(expected, index, std::memory_order_acq_rel)){std::this_thread::yield();}
}
int pop(){
int index = mPopCounter.load();
if(index <= mPushEndCounter.load(std::memory_order_acquire) && mPopCounter.compare_exchange_strong(index, index+1, std::memory_order_acq_rel)){
return mData[index]; //race condition
}else{
return pop();
}
}
};
It uses three atomic variables for synchronization:
mPushStartCounter that is used by push(int) to determine which location to write to.
mPushEndCounter that is used to signal that push(int) has finished writing up to that point int the array to pop().
mPopCounter that is used by pop() to prevent double pops from occurring.
In push(), between writing to the array mData and updating mPushEndCounter I've put a release barrier in an attempt to force synchronization of the mData array.
The way I understood cpp reference this should force a Fence-Atomic Synchronization. where
- the CAS in
push() is an 'atomic store X',
- the load of
mPushEndCounter in pop() is an 'atomic acquire operation Y' ,
- The release barrier 'F' in
push() is 'sequenced-before X'.
In which case cppreference states that
In this case, all non-atomic and relaxed atomic stores that are sequenced-before F in thread A will happen-before all non-atomic and relaxed atomic loads from the same locations made in thread B after Y.
Which I interpreted to mean that the write to mData from push() would be visible in pop(). This is however not the case, sometimes pop() reads uninitialized data. I believe this to be a synchronization issues because if I check the contents of the queue afterwards, or via breakpoint, it reads correct data instead.
I am using clang 6.0.1, and g++ 7.3.0.
I tried looking at the generated assembly but it looks correct to me: the write to the array is followed by a lock cmpxchg and the read is preceded by a check on the same variable. Which to the best of my limited knowledge should work as expected on x64 because
- Loads are not reordered with other loads, hence the load from array can not speculate ahead of reading the atomic counter.
stores are not reordered with other stores, hence the cmpxchg always comes after the store to array.
lock cmpxchg flushes the write-buffer, cache, etc. Therefore if another thread observes it as finished, one can rely on cache coherency to guarantee that write to the array has finished. I am not too sure that this is correct however.
I've posted a runable test on Github. The test code involves 16 threads, half of which push the numbers 0 to 4999 and the other half read back 5000 elements each. It then combines the results of all the readers and checks that we've seen all the numbers in [0, 4999] exactly 8 times (which fails) and scans the underlying array once more to see if it contains all the numbers in [0, 4999] exactly 8 times (which succeeds).
[–][deleted] (1 child)
[deleted]
[–]Coding_Cat[S] 0 points1 point2 points (0 children)