Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,13 @@ push() pop()
### Queue API
The queue class templates provide the following member functions:
* `try_push` - Appends an element to the end of the queue. Returns `false` when the queue is full.
* `try_push` (batch) - Appends an iterator range to the end of the queue. Returns the iterator one past the last element that got pushed, which could be different from the last iterator when the queue becomes full.
* `try_pop` - Removes an element from the front of the queue. Returns `false` when the queue is empty.
* `try_pop` (batch) - Removes elements from the front of the queue and places them into an iterator. Returns the number of elements popped, which may be different from the number of requested elements if the queue becomes empty.
* `push` (optimist) - Appends an element to the end of the queue. Busy waits when the queue is full. Faster than `try_push` when the queue is not full. Optional FIFO producer queuing and total order.
* `push` (optimist, batch) - Appends an iterator range to the end of the queue. Busy waits when the queue is full. Returns the iterator past the last pushed element. Faster than `try_push` when the queue is not full. Optional FIFO producer queuing and total order.
* `pop` (optimist) - Removes an element from the front of the queue. Busy waits when the queue is empty. Faster than `try_pop` when the queue is not empty. Optional FIFO consumer queuing and total order.
* `pop` (optimist, batch) - Removes elements from the front of the queue and places them into an iterator. Returns the iterator past the last popped element. Busy waits when the queue is empty. Faster than `try_pop` when the queue is not empty. Optional FIFO consumer queuing and total order.
* `was_size` - Returns the number of unconsumed elements during the call. The state may have changed by the time the return value is examined.
* `was_empty` - Returns `true` if the container was empty during the call. The state may have changed by the time the return value is examined.
* `was_full` - Returns `true` if the container was full during the call. The state may have changed by the time the return value is examined.
Expand Down
90 changes: 90 additions & 0 deletions include/atomic_queue/atomic_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <iterator>
#include <memory>
#include <utility>

Expand Down Expand Up @@ -389,6 +390,33 @@ class AtomicQueueCommon {
return true;
}

template<class InputIt>
ATOMIC_QUEUE_INLINE InputIt try_push(InputIt first, InputIt const last) noexcept {
int n = static_cast<int>(std::distance(first, last));
auto head = head_.load(X);
if(Derived::spsc_) {
int const slots = static_cast<int>(tail_.load(X) + downcast().size_ - head);
n = std::min(n, slots);
if(n <= 0)
return first;
head_.store(head + static_cast<unsigned>(n), X);
}
else {
int const length = n;
do {
int const slots = static_cast<int>(tail_.load(X) + downcast().size_ - head);
n = std::min(length, slots);
if(n <= 0)
return first;
} while(ATOMIC_QUEUE_UNLIKELY(!head_.compare_exchange_weak(head, head + static_cast<unsigned>(n), X, X))); // This loop is not FIFO.
}

do {
downcast().do_push(*first++, head++);
} while(--n);
return first;
}

template<class T>
ATOMIC_QUEUE_INLINE bool try_pop(T& element) noexcept {
auto tail = tail_.load(X);
Expand All @@ -408,6 +436,33 @@ class AtomicQueueCommon {
return true;
}

template<class OutputIt>
ATOMIC_QUEUE_INLINE int try_pop(OutputIt& first, int n) noexcept {
auto tail = tail_.load(X);
if(Derived::spsc_) {
int const num_elements = static_cast<int>(head_.load(X) - tail);
n = std::min(n, num_elements);
if(n <= 0)
return 0;
tail_.store(tail + static_cast<unsigned>(n), X);
}
else {
int const desired_pops = n;
do {
int const num_elements = static_cast<int>(head_.load(X) - tail);
n = std::min(desired_pops, num_elements);
if(n <= 0)
return 0;
} while(ATOMIC_QUEUE_UNLIKELY(!tail_.compare_exchange_weak(tail, tail + static_cast<unsigned>(n), X, X))); // This loop is not FIFO.
}

int i = n;
do {
*first++ = downcast().do_pop(tail++);
} while(--i);
return n;
}

template<class T>
ATOMIC_QUEUE_INLINE void push(T&& element) noexcept {
unsigned head;
Expand All @@ -422,6 +477,24 @@ class AtomicQueueCommon {
downcast().do_push(std::forward<T>(element), head);
}

template<class InputIt>
ATOMIC_QUEUE_INLINE InputIt push(InputIt first, InputIt const last) noexcept {
unsigned n = static_cast<unsigned>(std::distance(first, last));
unsigned head;
if(Derived::spsc_) {
head = head_.load(X);
head_.store(head + n, X);
}
else {
constexpr auto memory_order = Derived::total_order_ ? std::memory_order_seq_cst : std::memory_order_relaxed;
head = head_.fetch_add(n, memory_order); // FIFO and total order on Intel regardless, as of 2019.
}
while(n--) {
downcast().do_push(*first++, head++);
}
return first;
}

ATOMIC_QUEUE_INLINE auto pop() noexcept {
unsigned tail;
if(Derived::spsc_) {
Expand All @@ -435,6 +508,23 @@ class AtomicQueueCommon {
return downcast().do_pop(tail);
}

template<class OutputIt>
ATOMIC_QUEUE_INLINE OutputIt pop(OutputIt first, unsigned n) noexcept {
unsigned tail;
if(Derived::spsc_) {
tail = tail_.load(X);
tail_.store(tail + n, X);
}
else {
constexpr auto memory_order = Derived::total_order_ ? std::memory_order_seq_cst : std::memory_order_relaxed;
tail = tail_.fetch_add(n, memory_order); // FIFO and total order on Intel regardless, as of 2019.
}
while(n--) {
*first++ = downcast().do_pop(tail++);
}
return first;
}

ATOMIC_QUEUE_INLINE bool was_empty() const noexcept {
return !was_size();
}
Expand Down
17 changes: 17 additions & 0 deletions src/benchmarks.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,29 @@ struct RetryDecorator : Queue {
spin_loop_pause();
}

template<class InputIt>
ATOMIC_QUEUE_INLINE InputIt push(InputIt first, InputIt const last) noexcept {
while(last != (first = this->try_push(first, last))) {
spin_loop_pause();
}
return first;
}

ATOMIC_QUEUE_INLINE T pop() noexcept {
T element;
while(!this->try_pop(element))
spin_loop_pause();
return element;
}

template<class OutputIt>
ATOMIC_QUEUE_INLINE OutputIt pop(OutputIt first, int n) noexcept {
if (n <= 0) return first;
while(n -= this->try_pop(first, n)) {
spin_loop_pause();
}
return first;
}
};

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
110 changes: 110 additions & 0 deletions src/tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@
#include "atomic_queue/barrier.h"
#include "benchmarks.h"

#include <algorithm>
#include <boost/mpl/list.hpp>
#include <bitset>
#include <chrono>
#include <cstdint>
#include <numeric>
#include <random>
#include <thread>
#include <string>
#include <vector>

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -96,6 +101,111 @@ BOOST_AUTO_TEST_CASE_TEMPLATE(stress, Queue, stress_queues) {
BOOST_CHECK_EQUAL(result_diff, 0);
}

// Check that all (batch) push'es are ever (batch) pop'ed once with multiple producer and multiple consumers.
BOOST_AUTO_TEST_CASE_TEMPLATE(stress_batch, Queue, stress_queues) {
enum {
PRODUCERS = Queue::is_spsc() ? 1 : 3,
CONSUMERS = Queue::is_spsc() ? 1 : 3
};
using T = typename Queue::value_type;

Queue q;
Barrier barrier;

std::vector<std::atomic<int>> number_of_pops(CONSUMERS);
for (auto & val : number_of_pops) val.store(0, X);

std::vector<std::atomic<bool>> consumer_finished(CONSUMERS);
for (auto & val : consumer_finished) val.store(false, X);

std::thread producers[PRODUCERS];
for(unsigned i = 0; i < PRODUCERS; ++i)
producers[i] = std::thread([&q, &barrier]() {
std::size_t const seed = std::chrono::high_resolution_clock::now().time_since_epoch().count();
std::mt19937 gen(seed);
std::uniform_int_distribution<> distr(1, 2 * q.capacity());;

barrier.wait();
for(T n = N_STRESS_MSG; n;) {
// Inserting elements into local buffer before
int const BATCH_SIZE = distr(gen);
std::vector<T> buffer(BATCH_SIZE);
typename std::vector<T>::iterator it = buffer.begin();
while (it != buffer.end() && n) {
*it++ = n--;
}
// Pushing them to the queue
q.push(buffer.begin(), it);
}
});

uint64_t results[CONSUMERS];
std::thread consumers[CONSUMERS];
for(unsigned i = 0; i < CONSUMERS; ++i)
consumers[i] = std::thread([&q, &barrier, &r = results[i], &n_pop = number_of_pops[i], &finished = consumer_finished[i]]() {
std::size_t const seed = std::chrono::high_resolution_clock::now().time_since_epoch().count();
std::mt19937 gen(seed);
std::uniform_int_distribution<> distr(1, 2 * q.capacity());;

barrier.wait();
uint64_t result = 0;

// Allocating local buffer
{
bool continue_pops = true;
while (continue_pops) {
int const BATCH_SIZE = distr(gen);
// Popping into local buffer before
n_pop.fetch_add(BATCH_SIZE, A);
std::vector<T> buffer(BATCH_SIZE);
typename std::vector<T>::iterator out_it = q.pop(buffer.begin(), BATCH_SIZE);
// Accumulating the output
for (typename std::vector<T>::iterator it = buffer.begin(); it != out_it; ++it) {
if (*it != static_cast<T>(STOP_MSG)) {
result += *it;
}
else {
continue_pops = false;
}
}
}
finished.store(true, R);
}
r = result;
});

barrier.release(PRODUCERS + CONSUMERS);
for(auto& t : producers)
t.join();

int number_of_pushes = N_STRESS_MSG * PRODUCERS;
for (auto it = consumer_finished.cbegin(); it != consumer_finished.cend();) {
if (it->load(A)) {
++it;
continue;
}
for(int n; (n = std::accumulate(number_of_pops.cbegin(), number_of_pops.cend(), 0, [](int acc, const auto &val) { return acc + val.load(X);}) - number_of_pushes) > 0;) {
number_of_pushes += n;
do {
q.push(STOP_MSG);
} while(--n);
}
}

for(auto& t : consumers)
t.join();

constexpr uint64_t expected_result = (N_STRESS_MSG + 1) / 2. * N_STRESS_MSG * PRODUCERS;
constexpr uint64_t consumer_result_min = expected_result / CONSUMERS / 10;
uint64_t result = 0;
for(auto& r : results) {
BOOST_WARN_GT(r, consumer_result_min); // Make sure a consumer didn't starve. False positives are possible here.
result += r;
}
int64_t result_diff = result - expected_result;
BOOST_CHECK_EQUAL(result_diff, 0);
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

namespace {
Expand Down