71 #ifndef CGU_ASYNC_QUEUE_H
72 #define CGU_ASYNC_QUEUE_H
87 #ifdef CGU_USE_SCHED_YIELD
103 virtual const char*
what()
const throw() {
return "AsyncQueuePopError: popping from empty AsyncQueue object\n";}
147 template <
class T,
class Container = std::list<T> >
class AsyncQueue {
155 std::queue<T, Container> q;
169 #ifdef CGU_USE_SCHED_YIELD
194 void push(
const value_type& obj) {
219 q.push(std::move(obj));
245 template<
class... Args>
248 q.emplace(std::forward<Args>(args)...);
271 void pop(value_type& obj) {
316 obj = std::move(q.front());
441 if (
this != &other) {
442 lock2(mutex, other.mutex);
473 lock2(mutex, rhs.mutex);
476 std::queue<T, Container> temp{rhs.q};
507 q = std::move(rhs.q);
649 std::queue<T, Container> q;
664 #ifdef CGU_USE_SCHED_YIELD
689 void push(
const value_type& obj) {
715 q.push(std::move(obj));
742 template<
class... Args>
745 q.emplace(std::forward<Args>(args)...);
769 void pop(value_type& obj) {
814 obj = std::move(q.front());
900 while (q.empty()) cond.
wait(mutex);
945 while (q.empty()) cond.
wait(mutex);
947 obj = std::move(q.front());
1103 obj = std::move(q.front());
1243 if (
this != &other) {
1244 lock2(mutex, other.mutex);
1249 if (!other.q.empty()) other.cond.
broadcast();
1288 lock2(mutex, rhs.mutex);
1291 std::queue<T, Container> temp{rhs.q};
1330 q = std::move(rhs.q);
1427 q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
1466 template <
class T,
class Container>
1495 template <
class T,
class Container>
1501 #if defined(CGU_USE_INHERITABLE_QUEUE) && !defined(DOXYGEN_PARSING)
1510 template <
class T,
class Allocator>
1511 class AsyncQueue<T,
std::list<T, Allocator> > {
1513 typedef std::list<T, Allocator> Container;
1514 typedef typename Container::value_type
value_type;
1515 typedef typename Container::size_type
size_type;
1523 class Q:
public std::queue<T, Container> {
1525 void splice_end(Container&& lst) {
1526 this->c.splice(this->c.end(), std::move(lst));
1528 void unsplice_beginning(Container& lst) {
1529 lst.splice(lst.begin(), this->c, this->c.begin());
1532 mutable Thread::Mutex mutex;
1535 void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
1538 if (!m2.trylock()) {
1543 #ifdef CGU_USE_SCHED_YIELD
1552 void push(
const value_type& obj) {
1553 Container temp{obj};
1554 Thread::Mutex::Lock lock{mutex};
1556 q.splice_end(std::move(temp));
1559 void push(value_type&& obj) {
1566 temp.push_back(std::move(obj));
1567 Thread::Mutex::Lock lock{mutex};
1569 q.splice_end(std::move(temp));
1572 template<
class... Args>
1573 void emplace(Args&&... args) {
1575 temp.emplace_back(std::forward<Args>(args)...);
1576 Thread::Mutex::Lock lock{mutex};
1578 q.splice_end(std::move(temp));
1581 void pop(value_type& obj) {
1582 Thread::Mutex::Lock lock{mutex};
1583 if (q.empty())
throw AsyncQueuePopError();
1589 Thread::Mutex::Lock lock{mutex};
1590 if (q.empty())
throw AsyncQueuePopError();
1591 obj = std::move(q.front());
1601 Thread::Mutex::Lock lock{mutex};
1602 if (q.empty())
throw AsyncQueuePopError();
1604 q.unsplice_beginning(temp);
1606 obj = std::move(temp.front());
1610 Thread::Mutex::Lock lock{mutex};
1611 if (q.empty())
throw AsyncQueuePopError();
1615 bool empty()
const {
1616 Thread::Mutex::Lock lock{mutex};
1620 size_type
size()
const {
1621 Thread::Mutex::Lock lock{mutex};
1626 if (
this != &other) {
1627 lock2(mutex, other.mutex);
1636 lock2(mutex, rhs.mutex);
1646 Thread::Mutex::Lock lock{mutex};
1647 q = std::move(rhs.q);
1658 Thread::Mutex::Lock lock{mutex};
1671 template <
class T,
class Allocator>
1672 class AsyncQueueDispatch<T,
std::list<T, Allocator> > {
1674 typedef std::list<T, Allocator> Container;
1675 typedef typename Container::value_type
value_type;
1676 typedef typename Container::size_type
size_type;
1684 class Q:
public std::queue<T, Container> {
1686 void splice_end(Container&& lst) {
1687 this->c.splice(this->c.end(), std::move(lst));
1689 void unsplice_beginning(Container& lst) {
1690 lst.splice(lst.begin(), this->c, this->c.begin());
1693 mutable Thread::Mutex mutex;
1697 void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
1700 if (!m2.trylock()) {
1705 #ifdef CGU_USE_SCHED_YIELD
1714 void push(
const value_type& obj) {
1715 Container temp{obj};
1716 Thread::Mutex::Lock lock{mutex};
1718 q.splice_end(std::move(temp));
1722 void push(value_type&& obj) {
1729 temp.push_back(std::move(obj));
1730 Thread::Mutex::Lock lock{mutex};
1732 q.splice_end(std::move(temp));
1736 template<
class... Args>
1737 void emplace(Args&&... args) {
1739 temp.emplace_back(std::forward<Args>(args)...);
1740 Thread::Mutex::Lock lock{mutex};
1742 q.splice_end(std::move(temp));
1746 void pop(value_type& obj) {
1747 Thread::Mutex::Lock lock{mutex};
1748 if (q.empty())
throw AsyncQueuePopError();
1754 Thread::Mutex::Lock lock{mutex};
1755 if (q.empty())
throw AsyncQueuePopError();
1756 obj = std::move(q.front());
1766 Thread::Mutex::Lock lock{mutex};
1767 if (q.empty())
throw AsyncQueuePopError();
1769 q.unsplice_beginning(temp);
1771 obj = std::move(temp.front());
1775 Thread::Mutex::Lock lock{mutex};
1776 while (q.empty()) cond.wait(mutex);
1777 Thread::CancelBlock b;
1783 Thread::Mutex::Lock lock{mutex};
1784 while (q.empty()) cond.wait(mutex);
1785 Thread::CancelBlock b;
1786 obj = std::move(q.front());
1793 bool cancelstate_restored =
false;
1795 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
1800 pthread_setcancelstate(old_state, &ignore);
1801 cancelstate_restored =
true;
1802 Thread::Mutex::TrackLock lock{mutex};
1803 while (q.empty()) cond.wait(mutex);
1804 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &ignore);
1805 cancelstate_restored =
false;
1807 q.unsplice_beginning(temp);
1809 obj = std::move(temp.front());
1810 pthread_setcancelstate(old_state, &ignore);
1820 if (!cancelstate_restored) {
1821 pthread_setcancelstate(old_state, &ignore);
1830 Thread::Mutex::Lock lock{mutex};
1832 if (cond.timed_wait(mutex, ts))
return true;
1834 Thread::CancelBlock b;
1843 Thread::Mutex::Lock lock{mutex};
1845 if (cond.timed_wait(mutex, ts))
return true;
1847 Thread::CancelBlock b;
1848 obj = std::move(q.front());
1858 bool cancelstate_restored =
false;
1860 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
1865 pthread_setcancelstate(old_state, &ignore);
1866 cancelstate_restored =
true;
1867 Thread::Mutex::TrackLock lock{mutex};
1869 if (cond.timed_wait(mutex, ts))
return true;
1871 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &ignore);
1872 cancelstate_restored =
false;
1874 q.unsplice_beginning(temp);
1876 obj = std::move(temp.front());
1877 pthread_setcancelstate(old_state, &ignore);
1888 if (!cancelstate_restored) {
1889 pthread_setcancelstate(old_state, &ignore);
1896 Thread::Mutex::Lock lock{mutex};
1897 if (q.empty())
throw AsyncQueuePopError();
1901 bool empty()
const {
1902 Thread::Mutex::Lock lock{mutex};
1906 size_type
size()
const {
1907 Thread::Mutex::Lock lock{mutex};
1912 if (
this != &other) {
1913 lock2(mutex, other.mutex);
1917 if (!q.empty()) cond.broadcast();
1918 if (!other.q.empty()) other.cond.broadcast();
1924 lock2(mutex, rhs.mutex);
1929 if (!q.empty()) cond.broadcast();
1935 Thread::Mutex::Lock lock{mutex};
1936 q = std::move(rhs.q);
1937 if (!q.empty()) cond.broadcast();
1946 q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
1949 Thread::Mutex::Lock lock{mutex};
1955 #endif // CGU_USE_INHERITABLE_QUEUE
int broadcast()
Definition: mutex.h:483
void push(value_type &&obj)
Definition: async_queue.h:217
void pop(value_type &obj)
Definition: async_queue.h:769
void pop()
Definition: async_queue.h:390
size_type size() const
Definition: async_queue.h:1220
void swap(Cgu::AsyncQueue< T, Container > &q1, Cgu::AsyncQueue< T, Container > &q2)
Definition: async_queue.h:1467
~AsyncQueueDispatch()
Definition: async_queue.h:1436
Container::size_type size_type
Definition: async_queue.h:150
int signal()
Definition: mutex.h:472
void emplace(Args &&...args)
Definition: async_queue.h:743
void pop_dispatch(value_type &obj)
Definition: async_queue.h:898
void move_pop(value_type &obj)
Definition: async_queue.h:313
A wrapper class for pthread condition variables.
Definition: mutex.h:449
AsyncQueue(const AsyncQueue &rhs)
Definition: async_queue.h:583
AsyncQueueDispatch()=default
An exception thrown if calling pop() on a AsyncQueue or AsyncQueueDispatch object fails because the q...
Definition: async_queue.h:102
void move_pop_basic(value_type &obj)
Definition: async_queue.h:872
A thread-safe asynchronous queue.
Definition: async_queue.h:147
A thread-safe asynchronous queue with a blocking pop() method.
Definition: async_queue.h:641
void pop()
Definition: async_queue.h:1187
static void get_abs_time(timespec &ts, unsigned int millisec)
void push(value_type &&obj)
Definition: async_queue.h:713
A class enabling the cancellation state of a thread to be controlled.
Definition: thread.h:681
bool move_pop_timed_dispatch(value_type &obj, unsigned int millisec)
Definition: async_queue.h:1095
A scoped locking class for exception safe Mutex locking.
Definition: mutex.h:207
bool move_pop_timed_dispatch_basic(value_type &obj, unsigned int millisec)
Definition: async_queue.h:1171
void move_pop_dispatch_basic(value_type &obj)
Definition: async_queue.h:1009
void move_pop_basic(value_type &obj)
Definition: async_queue.h:374
AsyncQueueDispatch & operator=(AsyncQueueDispatch &&rhs)
Definition: async_queue.h:1328
~AsyncQueue()
Definition: async_queue.h:590
AsyncQueueDispatch(AsyncQueueDispatch &&rhs)
Definition: async_queue.h:1396
virtual const char * what() const
Definition: async_queue.h:103
void push(const value_type &obj)
Definition: async_queue.h:194
int timed_wait(Mutex &mutex, const timespec &abs_time)
Definition: mutex.h:558
AsyncQueue(AsyncQueue &&rhs)
Definition: async_queue.h:558
A wrapper class for pthread mutexes.
Definition: mutex.h:117
void move_pop_dispatch(value_type &obj)
Definition: async_queue.h:943
int unlock()
Definition: mutex.h:170
void pop(value_type &obj)
Definition: async_queue.h:271
AsyncQueue & operator=(AsyncQueue &&rhs)
Definition: async_queue.h:505
Provides wrapper classes for pthread mutexes and condition variables, and scoped locking classes for ...
bool empty() const
Definition: async_queue.h:406
size_type size() const
Definition: async_queue.h:423
Definition: application.h:44
void push(const value_type &obj)
Definition: async_queue.h:689
Container::value_type value_type
Definition: async_queue.h:149
bool pop_timed_dispatch(value_type &obj, unsigned int millisec)
Definition: async_queue.h:1040
AsyncQueueDispatch(const AsyncQueueDispatch &rhs)
Definition: async_queue.h:1426
int trylock()
Definition: mutex.h:157
bool empty() const
Definition: async_queue.h:1203
void move_pop(value_type &obj)
Definition: async_queue.h:811
Container::value_type value_type
Definition: async_queue.h:643
void emplace(Args &&...args)
Definition: async_queue.h:246
AsyncQueueDispatch & operator=(const AsyncQueueDispatch &rhs)
Definition: async_queue.h:1286
void swap(AsyncQueue &other)
Definition: async_queue.h:440
int lock()
Definition: mutex.h:147
Container container_type
Definition: async_queue.h:645
AsyncQueue & operator=(const AsyncQueue &rhs)
Definition: async_queue.h:471
Container container_type
Definition: async_queue.h:151
void swap(AsyncQueueDispatch &other)
Definition: async_queue.h:1242
#define CGU_GLIB_MEMORY_SLICES_FUNCS
Definition: cgu_config.h:84
Container::size_type size_type
Definition: async_queue.h:644
int wait(Mutex &mutex)
Definition: mutex.h:508