By using this site, you agree to our updated Privacy Policy and our Terms of Use. Manage your Cookies Settings.
449,315 Members | 1,672 Online
Bytes IT Community
+ Ask a Question
Need help? Post your question and get tips & solutions from a community of 449,315 IT Pros & Developers. It's quick & easy.

Request for comments about synchronized queue using boost

P: n/a
I am currently designing a synchronized queue used to communicate
between threads. Is the code given below a good solution? Am I
using mutex lock/unlock more than needed?

Are there any resources out there on the Internet on how to design
*thread-safe* *efficient* data-
structures?

/Nordl÷w

The file synched_queue.hpp follows:

#ifndef PNW__SYNCHED_QUEUE_HPP
#define PNW__SYNCHED_QUEUE_HPP

/*!
* @file synched_queue.hpp
* @brief Synchronized (Thread Safe) Container Wrapper on std:queue
* using Boost::Thread.
*/

#include <queue>
#include <iostream>

#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>

//
================================================== ==========================

template <typename T>
class synched_queue
{
std::queue<Tq; ///< Queue.
boost::mutex m; ///< Mutex.
public:
/*!
* Push @p value.
*/
void push(const T & value) {
boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
q.push(value);
}

/*!
* Try and pop into @p value, returning directly in any case.
* @return true if pop was success, false otherwise.
*/
bool try_pop(T & value) {
boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
if (q.size()) {
value = q.front();
q.pop();
return true;
}
return false;
}

/// Pop and return value, possibly waiting forever.
T wait_pop() {
boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
// wait until queue has at least on element()
c.wait(sl, boost::bind(&std::queue<T>::size, q));
T value = q.front();
q.pop();
return value;
}

size_type size() const {
boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
return q.size();
}

bool empty() const {
boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
return q.empty();
}

};

//
================================================== ==========================

#endif
Oct 15 '08 #1
Share this Question
Share on Google+
19 Replies


P: n/a
Nordl÷w wrote:
I am currently designing a synchronized queue used to communicate
between threads. Is the code given below a good solution? Am I
using mutex lock/unlock more than needed?

Are there any resources out there on the Internet on how to design
*thread-safe* *efficient* data-
structures?
comp.programming.threads?
/Nordl÷w

[...]

/// Pop and return value, possibly waiting forever.
T wait_pop() {
boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
// wait until queue has at least on element()
c.wait(sl, boost::bind(&std::queue<T>::size, q));
T value = q.front();
q.pop();
return value;
}
I haven't done any threading in a decade or so, but I wonder how
in the above code anything could be put into the locked queue.
What am I missing?
Oh, and I wonder what 'c' is.
[...]
Schobi
Oct 15 '08 #2

P: n/a
On Oct 15, 2:36*pm, Nordl÷w <per.nord...@gmail.comwrote:
I am currently designing a synchronized queue used to communicate
between threads. Is the code given below a good solution? Am I
using mutex lock/unlock more than needed?
/Nordl÷w

The file synched_queue.hpp follows:

#ifndef PNW__SYNCHED_QUEUE_HPP
#define PNW__SYNCHED_QUEUE_HPP

/*!
** @file synched_queue.hpp
** @brief Synchronized (Thread Safe) Container Wrapper on std:queue
** * * * *using Boost::Thread.
**/

#include <queue>
#include <iostream>

#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>

//
================================================== ==========================

template <typename T>
class synched_queue
{
* * std::queue<Tq; * * * * * * *///< Queue.
* * boost::mutex m; * * * * * * ///< Mutex.
A member variable is missing here:

boost::condition c;
public:
* * /*!
* * ** Push @p value.
* * **/
* * void push(const T & value) {
* * * * boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
* * * * q.push(value);
You need to notify other threads waiting on the queue:

c.notify_one();
* * }

* * /*!
* * ** Try and pop into @p value, returning directly in any case.
* * ** @return true if pop was success, false otherwise.
* * **/
* * bool try_pop(T & value) {
* * * * boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
* * * * if (q.size()) {
* * * * * * value = q.front();
* * * * * * q.pop();
* * * * * * return true;
* * * * }
* * * * return false;
* * }

* * /// Pop and return value, possibly waiting forever.
* * T wait_pop() {
* * * * boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
* * * * // wait until queue has at least on element()
The following line:
* * * * c.wait(sl, boost::bind(&std::queue<T>::size, q));
boost::bind(&std::queue<T>::size, q) stores a copy of the queue in the
object created by boost::bind, so that the wait never finishes if the
queue is empty (and if the condition variable is not notified (see
above)).

It should be as simple as:

while(q.empty())
c.wait(sl);
* * * * T value = q.front();
* * * * q.pop();
* * * * return value;
* * }

* * size_type size() const {
* * * * boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
* * * * return q.size();
* * }

* * bool empty() const {
* * * * boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
* * * * return q.empty();
* * }

};

//
================================================== ==========================

#endif

The other thing is that the queue does not support destruction: the
destructor does not unblock any threads blocked in wait.

Apart from that, the mutex is held for too long. You don't really need
to hold the lock when allocating memory for elements and when invoking
the copy constructor of the elements.

Here is an improved version (although a bit simplified):

#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/function.hpp>
#include <list>

template<class T>
class atomic_queue : private boost::noncopyable
{
private:
boost::mutex mtx_;
boost::condition cnd_;
bool cancel_;
unsigned waiting_;

// use list as a queue because it allows for splicing:
// moving elements between lists without any memory allocation and
copying
typedef std::list<Tqueue_type;
queue_type q_;

public:
struct cancelled : std::logic_error
{
cancelled() : std::logic_error("cancelled") {}
};

atomic_queue()
: cancel_()
, waiting_()
{}

~atomic_queue()
{
// cancel all waiting threads
this->cancel();
}

void cancel()
{
// cancel all waiting threads
boost::mutex::scoped_lock l(mtx_);
cancel_ = true;
cnd_.notify_all();
// and wait till they are done
while(waiting_)
cnd_.wait(l);
}

void push(T const& t)
{
// this avoids an allocation inside the critical section
bellow
queue_type tmp(&t, &t + 1);
{
boost::mutex::scoped_lock l(mtx_);
q_.splice(q_.end(), tmp);
}
cnd_.notify_one();
}

// this function provides only basic exception safety if T's copy
ctor can
// throw or strong exception safety if T's copy ctor is nothrow
T pop()
{
// this avoids copying T inside the critical section bellow
queue_type tmp;
{
boost::mutex::scoped_lock l(mtx_);
++waiting_;
while(!cancel_ && q_.empty())
cnd_.wait(l);
--waiting_;
if(cancel_)
{
cnd_.notify_all();
throw cancelled();
}
tmp.splice(tmp.end(), q_, q_.begin());
}
return tmp.front();
}
};

typedef boost::function<void()unit_of_work;
typedef atomic_queue<unit_of_workwork_queue;

void typical_thread_pool_working_thread(work_queue* q)
try
{
for(;;)
q->pop()();
}
catch(work_queue::cancelled&)
{
// time to terminate the thread
}
Are there any resources out there on the Internet on how to design
*thread-safe* *efficient* data-structures?
I would recommend "Programming with POSIX Threads" book by by David R.
Butenhof.

--
Max
Oct 15 '08 #3

P: n/a
Hendrik Schober wrote:
Nordl÷w wrote:
>I am currently designing a synchronized queue used to communicate
between threads. Is the code given below a good solution? Am I
using mutex lock/unlock more than needed?

Are there any resources out there on the Internet on how to design
*thread-safe* *efficient* data-
structures?

comp.programming.threads?
>/Nordl÷w

[...]

/// Pop and return value, possibly waiting forever.
T wait_pop() {
boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
// wait until queue has at least on element()
c.wait(sl, boost::bind(&std::queue<T>::size, q));
T value = q.front();
q.pop();
return value;
}

I haven't done any threading in a decade or so, but I wonder how
in the above code anything could be put into the locked queue.
What am I missing?
Oh, and I wonder what 'c' is.
c is a condition variable:
http://www.boost.org/doc/libs/1_36_0...on.condvar_ref

You lock the mutex, then wait for a condition, which (automatically)
unlocks the mutex, and locks it again if the condition occurs.

--
Thomas
Oct 15 '08 #4

P: n/a
Thomas J. Gritzan wrote:
[...]
> I haven't done any threading in a decade or so, but I wonder how
in the above code anything could be put into the locked queue.
What am I missing?
Oh, and I wonder what 'c' is.

c is a condition variable:
http://www.boost.org/doc/libs/1_36_0...on.condvar_ref

You lock the mutex, then wait for a condition, which (automatically)
unlocks the mutex, and locks it again if the condition occurs.
Ah, thanks. I haven't looked at boost's threads yet.

Schobi
Oct 15 '08 #5

P: n/a
On 15 Okt, 18:02, Maxim Yegorushkin <maxim.yegorush...@gmail.com>
wrote:
On Oct 15, 2:36*pm, Nordl÷w <per.nord...@gmail.comwrote:
I am currently designing a synchronized queue used to communicate
between threads. Is the code given below a good solution? Am I
using mutex lock/unlock more than needed?
/Nordl÷w
The file synched_queue.hpp follows:
#ifndef PNW__SYNCHED_QUEUE_HPP
#define PNW__SYNCHED_QUEUE_HPP
/*!
** @file synched_queue.hpp
** @brief Synchronized (Thread Safe) Container Wrapper on std:queue
** * * * *using Boost::Thread.
**/
#include <queue>
#include <iostream>
#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
//
================================================== ==========================
template <typename T>
class synched_queue
{
* * std::queue<Tq; * * * * * * *///< Queue.
* * boost::mutex m; * * * * * * ///< Mutex.

A member variable is missing here:

* * boost::condition c;
public:
* * /*!
* * ** Push @p value.
* * **/
* * void push(const T & value) {
* * * * boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
* * * * q.push(value);

You need to notify other threads waiting on the queue:

* * c.notify_one();
* * }
* * /*!
* * ** Try and pop into @p value, returning directly in any case.
* * ** @return true if pop was success, false otherwise.
* * **/
* * bool try_pop(T & value) {
* * * * boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
* * * * if (q.size()) {
* * * * * * value = q.front();
* * * * * * q.pop();
* * * * * * return true;
* * * * }
* * * * return false;
* * }
* * /// Pop and return value, possibly waiting forever.
* * T wait_pop() {
* * * * boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
* * * * // wait until queue has at least on element()

The following line:
* * * * c.wait(sl, boost::bind(&std::queue<T>::size, q));

boost::bind(&std::queue<T>::size, q) stores a copy of the queue in the
object created by boost::bind, so that the wait never finishes if the
queue is empty (and if the condition variable is not notified (see
above)).

It should be as simple as:

* * while(q.empty())
* * * * c.wait(sl);
* * * * T value = q.front();
* * * * q.pop();
* * * * return value;
* * }
* * size_type size() const {
* * * * boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
* * * * return q.size();
* * }
* * bool empty() const {
* * * * boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
* * * * return q.empty();
* * }
};
//
================================================== ==========================
#endif

The other thing is that the queue does not support destruction: the
destructor does not unblock any threads blocked in wait.

Apart from that, the mutex is held for too long. You don't really need
to hold the lock when allocating memory for elements and when invoking
the copy constructor of the elements.

Here is an improved version (although a bit simplified):

#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/function.hpp>
#include <list>

template<class T>
class atomic_queue : private boost::noncopyable
{
private:
* * boost::mutex mtx_;
* * boost::condition cnd_;
* * bool cancel_;
* * unsigned waiting_;

* * // use list as a queue because it allows for splicing:
* * // moving elements between lists without any memory allocation and
copying
* * typedef std::list<Tqueue_type;
* * queue_type q_;

public:
* * struct cancelled : std::logic_error
* * {
* * * * cancelled() : std::logic_error("cancelled") {}
* * };

* * atomic_queue()
* * * * : cancel_()
* * * * , waiting_()
* * {}

* * ~atomic_queue()
* * {
* * * * // cancel all waiting threads
* * * * this->cancel();
* * }

* * void cancel()
* * {
* * * * // cancel all waiting threads
* * * * boost::mutex::scoped_lock l(mtx_);
* * * * cancel_ = true;
* * * * cnd_.notify_all();
* * * * // and wait till they are done
* * * * while(waiting_)
* * * * * * cnd_.wait(l);
* * }

* * void push(T const& t)
* * {
* * * * // this avoids an allocation inside the critical section
bellow
* * * * queue_type tmp(&t, &t + 1);
* * * * {
* * * * * * boost::mutex::scoped_lock l(mtx_);
* * * * * * q_.splice(q_.end(), tmp);
* * * * }
* * * * cnd_.notify_one();
* * }

* * // this function provides only basic exception safety if T's copy
ctor can
* * // throw or strong exception safety if T's copy ctor is nothrow
* * T pop()
* * {
* * * * // this avoids copying T inside the critical section bellow
* * * * queue_type tmp;
* * * * {
* * * * * * boost::mutex::scoped_lock l(mtx_);
* * * * * * ++waiting_;
* * * * * * while(!cancel_ && q_.empty())
* * * * * * * * cnd_.wait(l);
* * * * * * --waiting_;
* * * * * * if(cancel_)
* * * * * * {
* * * * * * * * cnd_.notify_all();
* * * * * * * * throw cancelled();
* * * * * * }
* * * * * * tmp.splice(tmp.end(), q_, q_.begin());
* * * * }
* * * * return tmp.front();
* * }

};

typedef boost::function<void()unit_of_work;
typedef atomic_queue<unit_of_workwork_queue;

void typical_thread_pool_working_thread(work_queue* q)
try
{
* * for(;;)
* * * * q->pop()();}

catch(work_queue::cancelled&)
{
* * // time to terminate the thread

}
Are there any resources out there on the Internet on how to design
*thread-safe* *efficient* data-structures?

I would recommend "Programming with POSIX Threads" book by by David R.
Butenhof.

--
Max

Doesn't the push-argument "T const & t" instead of my version "const T
& t" mean that we don't copy at all here? I believe &t evaluates to
the memory pointer of t:

void push(T const& t)
{
// this avoids an allocation inside the critical section
bellow
queue_type tmp(&t, &t + 1);
{
boost::mutex::scoped_lock l(mtx_);
q_.splice(q_.end(), tmp);
}
cnd_.notify_one();
}

/Nordl÷w
Oct 16 '08 #6

P: n/a
On 15 Okt, 20:16, Hendrik Schober <spamt...@gmx.dewrote:
Thomas J. Gritzan wrote:
[...]
*I haven't done any threading in a decade or so, but I wonder how
*in the above code anything could be put into the locked queue.
*What am I missing?
*Oh, and I wonder what 'c' is.
c is a condition variable:
http://www.boost.org/doc/libs/1_36_0...ynchronization....
You lock the mutex, then wait for a condition, which (automatically)
unlocks the mutex, and locks it again if the condition occurs.

* Ah, thanks. I haven't looked at boost's threads yet.

* Schobi
How can I your queue structure in the following code example:
#include "../synched_queue.hpp"
#include "../threadpool/include/threadpool.hpp"
#include <iostream>

using namespace boost::threadpool;

template <typename T>
void produce(synched_queue<T& q, size_t n)
{
for (size_t i = 0; i < n; i++) {
T x = i;
q.push(x);
std::cout << "i:" << i << " produced: " << x << std::endl;
}
}

template <typename T>
void consume(synched_queue<T& q, size_t n)
{
for (size_t i = 0; i < n; i++) {
T x = q.wait_pop();
std::cout << "i:" << i << " consumed: " << x << std::endl;
}
}

int main()
{
typedef float Elm;
synched_queue<floatq;
// boost::thread pt(boost::bind(produce<Elm>, q, 10));
// boost::thread ct(boost::bind(consume<Elm>, q, 10));
// pt.join();
// ct.join();
return 0;
}
Thanks in advance,
/Nordl÷w
Oct 16 '08 #7

P: n/a
On Oct 16, 3:44*pm, Nordl÷w <per.nord...@gmail.comwrote:
On 15 Okt, 18:02, Maxim Yegorushkin <maxim.yegorush...@gmail.com>
wrote:
On Oct 15, 2:36*pm, Nordl÷w <per.nord...@gmail.comwrote:
I am currently designing a synchronized queue used to communicate
between threads. Is the code given below a good solution? Am I
using mutex lock/unlock more than needed?
/Nordl÷w
The file synched_queue.hpp follows:
#ifndef PNW__SYNCHED_QUEUE_HPP
#define PNW__SYNCHED_QUEUE_HPP
/*!
** @file synched_queue.hpp
** @brief Synchronized (Thread Safe) Container Wrapper on std:queue
** * * * *using Boost::Thread.
**/
#include <queue>
#include <iostream>
#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
//
================================================== ==========================
template <typename T>
class synched_queue
{
* * std::queue<Tq; * * * * * * *///< Queue.
* * boost::mutex m; * * * * * * ///< Mutex.
A member variable is missing here:
* * boost::condition c;
public:
* * /*!
* * ** Push @p value.
* * **/
* * void push(const T & value) {
* * * * boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
* * * * q.push(value);
You need to notify other threads waiting on the queue:
* * c.notify_one();
* * }
* * /*!
* * ** Try and pop into @p value, returning directly in any case.
* * ** @return true if pop was success, false otherwise.
* * **/
* * bool try_pop(T & value) {
* * * * boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
* * * * if (q.size()) {
* * * * * * value = q.front();
* * * * * * q.pop();
* * * * * * return true;
* * * * }
* * * * return false;
* * }
* * /// Pop and return value, possibly waiting forever.
* * T wait_pop() {
* * * * boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
* * * * // wait until queue has at least on element()
The following line:
* * * * c.wait(sl, boost::bind(&std::queue<T>::size, q));
boost::bind(&std::queue<T>::size, q) stores a copy of the queue in the
object created by boost::bind, so that the wait never finishes if the
queue is empty (and if the condition variable is not notified (see
above)).
It should be as simple as:
* * while(q.empty())
* * * * c.wait(sl);
* * * * T value = q.front();
* * * * q.pop();
* * * * return value;
* * }
* * size_type size() const {
* * * * boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
* * * * return q.size();
* * }
* * bool empty() const {
* * * * boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
* * * * return q.empty();
* * }
};
//
================================================== ==========================
#endif
The other thing is that the queue does not support destruction: the
destructor does not unblock any threads blocked in wait.
Apart from that, the mutex is held for too long. You don't really need
to hold the lock when allocating memory for elements and when invoking
the copy constructor of the elements.
Here is an improved version (although a bit simplified):
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/function.hpp>
#include <list>
template<class T>
class atomic_queue : private boost::noncopyable
{
private:
* * boost::mutex mtx_;
* * boost::condition cnd_;
* * bool cancel_;
* * unsigned waiting_;
* * // use list as a queue because it allows for splicing:
* * // moving elements between lists without any memory allocation and
copying
* * typedef std::list<Tqueue_type;
* * queue_type q_;
public:
* * struct cancelled : std::logic_error
* * {
* * * * cancelled() : std::logic_error("cancelled") {}
* * };
* * atomic_queue()
* * * * : cancel_()
* * * * , waiting_()
* * {}
* * ~atomic_queue()
* * {
* * * * // cancel all waiting threads
* * * * this->cancel();
* * }
* * void cancel()
* * {
* * * * // cancel all waiting threads
* * * * boost::mutex::scoped_lock l(mtx_);
* * * * cancel_ = true;
* * * * cnd_.notify_all();
* * * * // and wait till they are done
* * * * while(waiting_)
* * * * * * cnd_.wait(l);
* * }
* * void push(T const& t)
* * {
* * * * // this avoids an allocation inside the critical section
bellow
* * * * queue_type tmp(&t, &t + 1);
* * * * {
* * * * * * boost::mutex::scoped_lock l(mtx_);
* * * * * * q_.splice(q_.end(), tmp);
* * * * }
* * * * cnd_.notify_one();
* * }
* * // this function provides only basic exception safety if T's copy
ctor can
* * // throw or strong exception safety if T's copy ctor is nothrow
* * T pop()
* * {
* * * * // this avoids copying T inside the critical section bellow
* * * * queue_type tmp;
* * * * {
* * * * * * boost::mutex::scoped_lock l(mtx_);
* * * * * * ++waiting_;
* * * * * * while(!cancel_ && q_.empty())
* * * * * * * * cnd_.wait(l);
* * * * * * --waiting_;
* * * * * * if(cancel_)
* * * * * * {
* * * * * * * * cnd_.notify_all();
* * * * * * * * throw cancelled();
* * * * * * }
* * * * * * tmp.splice(tmp.end(), q_, q_.begin());
* * * * }
* * * * return tmp.front();
* * }
};
typedef boost::function<void()unit_of_work;
typedef atomic_queue<unit_of_workwork_queue;
void typical_thread_pool_working_thread(work_queue* q)
try
{
* * for(;;)
* * * * q->pop()();}
catch(work_queue::cancelled&)
{
* * // time to terminate the thread
}
Are there any resources out there on the Internet on how to design
*thread-safe* *efficient* data-structures?
I would recommend "Programming with POSIX Threads" book by by David R.
Butenhof.

Doesn't the push-argument "T const & t" instead of my version "const T
& t" mean that we don't copy at all here?
No, T const& and const T& is the same thing: a reference to a constant
T.
I believe &t evaluates to
the memory pointer of t:

* * void push(T const& t)
* * {
* * * * // this avoids an allocation inside the critical section
bellow
* * * * queue_type tmp(&t, &t + 1);
* * * * {
* * * * * * boost::mutex::scoped_lock l(mtx_);
* * * * * * q_.splice(q_.end(), tmp);
* * * * }
* * * * cnd_.notify_one();
* * }
The trick here is that element t is first inserted in a temporary list
tmp on the stack.

queue_type tmp(&t, &t + 1); // create a list with a copy of t

This involves allocating memory and copying t. And here it is done
without holding the lock because allocating memory may be expensive
(might cause the system to do swapping) and as you hold the lock all
the worker threads won't be able to pop elements from the queue during
such time. Next, the lock is acquired and the element is moved from
list tmp into q_:

q_.splice(q_.end(), tmp);

This operation does not involve any memory allocation or copying
elements (because you can do so easily with the nodes of doubly-linked
lists), which make your critical section of code execute really fast
without stalling the worked threads for too long.

--
Max

Oct 16 '08 #8

P: n/a
On Oct 15, 3:36*pm, Nordl÷w <per.nord...@gmail.comwrote:
I am currently designing a synchronized queue used to communicate
between threads. Is the code given below a good solution?
Not really.
[...]
Are there any resources out there on the Internet on how to design
*thread-safe* *efficient* data-
structures?
Sure.
http://www.google.nl/search?q=boost+thread+safe+queue=

Best Regards,
Szabolcs
Oct 16 '08 #9

P: n/a
On Oct 15, 2:36*pm, Nordl÷w <per.nord...@gmail.comwrote:
I am currently designing a synchronized queue used to communicate
between threads. Is the code given below a good solution? Am I
using mutex lock/unlock more than needed?

Are there any resources out there on the Internet on how to design
*thread-safe* *efficient* data-
structures?
You can also try concurrent_queue from
http://www.threadingbuildingblocks.o...ncurrent_queue

Scout around that link for more documentation.

--
Max
Oct 16 '08 #10

P: n/a
On Oct 16, 5:42 pm, Szabolcs Ferenczi <szabolcs.feren...@gmail.com>
wrote:
On Oct 15, 3:36 pm, Nordl÷w <per.nord...@gmail.comwrote:
[...]
Are there any resources out there on the Internet on how to
design *thread-safe* *efficient* data- structures?
Sure.http://www.google.nl/search?q=boost+thread+safe+queue=
You have to be very careful with googling in cases like this.
There's an awful lot of junk on the net. Just looking at the
first hit, for example, it's quite clear that the author doesn't
know what he's talking about, and I suspect that that's true in
a large number of cases.

--
James Kanze (GABI Software) email:ja*********@gmail.com
Conseils en informatique orientÚe objet/
Beratung in objektorientierter Datenverarbeitung
9 place SÚmard, 78210 St.-Cyr-l'╔cole, France, +33 (0)1 30 23 00 34
Oct 17 '08 #11

P: n/a
On Oct 15, 6:02 pm, Maxim Yegorushkin
<maxim.yegorush...@gmail.comwrote:
On Oct 15, 2:36 pm, Nordl÷w <per.nord...@gmail.comwrote:
[...]
Apart from that, the mutex is held for too long. You don't
really need to hold the lock when allocating memory for
elements and when invoking the copy constructor of the
elements.
Which sounds a lot like pre-mature optimization to me. First
get the queue working, then see if there is a performance
problem, and only then, do something about it. (Given that
clients will normally only call functions on the queue when they
have nothing to do, waiting a couple of microseconds longer on
the lock won't impact anything.)
Here is an improved version (although a bit simplified):
void push(T const& t)
Actually, I'd name this "send", and not "push". Just because
the standard library uses very poor names doesn't mean we have
to.
{
// this avoids an allocation inside the critical section
bellow
queue_type tmp(&t, &t + 1);
{
boost::mutex::scoped_lock l(mtx_);
q_.splice(q_.end(), tmp);
}
cnd_.notify_one();
}
This is unnecessary complexity. And probably looses runtime
efficiency (not that it's important): his initial version uses
std::deque, which doesn't have to allocate at each
insertion---in fact, in all of the uses I've measured, the queue
tends to hit its maximum size pretty quickly, and there are no
more allocations after that.

Yet another case where premature optimization turns out to be
pessimization.

[...]
// this function provides only basic exception safety if T's
// copy ctor can throw or strong exception safety if T's copy
// ctor is nothrow
:-).

In practice, I find that almost all of my inter-thread queues
need to contain polymorphic objects. Which means that the queue
contains pointers, and that all of the objects will in fact be
dynamically allocated. The result is that I use std::auto_ptr
in the interface (so the producer can't access the object once it
has been passed off, and the consumer knows to delete it).

Of course, std::auto_ptr has a no throw copy constructor, so the
queue itself has a strong exception safe guarantee.
Are there any resources out there on the Internet on how to
design *thread-safe* *efficient* data-structures?
I would recommend "Programming with POSIX Threads" book by by
David R. Butenhof.
Very much so, for the basics. (Formally, it's only Unix, but
practically, Boost threads are modeled after pthreads.) For the
data structures, it's less obvious, and of course, Butenhof
doesn't go into the issues which are particular to C++ (local
statics with dynamic initialization, the fact that pointers to
functions have to be ``extern "C"'', etc.).

--
James Kanze (GABI Software) email:ja*********@gmail.com
Conseils en informatique orientÚe objet/
Beratung in objektorientierter Datenverarbeitung
9 place SÚmard, 78210 St.-Cyr-l'╔cole, France, +33 (0)1 30 23 00 34
Oct 17 '08 #12

P: n/a
On Oct 17, 9:43*am, James Kanze <james.ka...@gmail.comwrote:
On Oct 16, 5:42 pm, Szabolcs Ferenczi <szabolcs.feren...@gmail.com>
wrote:
On Oct 15, 3:36 pm, Nordl÷w <per.nord...@gmail.comwrote:
[...]
Are there any resources out there on the Internet on how to
design *thread-safe* *efficient* data- structures?
Sure.http://www.google.nl/search?q=boost+thread+safe+queue=

You have to be very careful with googling in cases like this.
There's an awful lot of junk on the net. *Just looking at the
first hit, for example, it's quite clear that the author doesn't
know what he's talking about, and I suspect that that's true in
a large number of cases.
Hmmmm... For me the first hit is a didactic piece by Anthony Williams:

Implementing a Thread-Safe Queue using Condition Variables ...
In those cases, it might be worth using something like boost::optional
to avoid this requirement ... Tags: threading, thread safe, queue,
condition variable ...
http://www.justsoftwaresolutions.co....variables.html

Saying that "it's quite clear that the author doesn't know what he's
talking about" is, hmmm..., at least indicates something about you.

I do not want to defend him but if you just read it to the end, you
must have learnt something, I guess. You should not stop by the first
fragment which is just a starting point illustrating the problem.

I agree in that he should not have suggest such a bad habit of
handling a shared resource in the front part of his article or, at
least, he should have warned the smattering reader that it is not the
correct way.

Happy reading.

Best Regards,
Szabolcs
Oct 17 '08 #13

P: n/a
James Kanze <ja*********@gmail.comkirjutas:
On Oct 15, 6:02 pm, Maxim Yegorushkin
<maxim.yegorush...@gmail.comwrote:
>On Oct 15, 2:36 pm, Nordl÷w <per.nord...@gmail.comwrote:

[...]
>Apart from that, the mutex is held for too long. You don't
really need to hold the lock when allocating memory for
elements and when invoking the copy constructor of the
elements.

Which sounds a lot like pre-mature optimization to me. First
get the queue working, then see if there is a performance
problem, and only then, do something about it. (Given that
clients will normally only call functions on the queue when they
have nothing to do, waiting a couple of microseconds longer on
the lock won't impact anything.)
While holding a lock a multithreaded application may be effectively
turned into a single-threaded one. This may be not so important for
single-core machines as only one program is running at any given moment
anyway. However, in case of multicore machines it may seriosly impact the
scaling properties of the application. Of course, this only applies if
the performance is important, if multiple threads are used just for a
more convenient way of organizing the program flow then one can forget
about this.

OTOH, a general inter-thread queue is a candidate for a low-level library
component, which might be used in very different situations, including
ones where the performance is critical. In this regard turning attention
to potential performance bottlenecks is important, and in some situations
holding a lock unnecessarily long can introduce serious bottlenecks.

I agree with you that using a temporary list and splice might be a
pessimization. In my code I have resorted to having a std::deque of
message objects. The message objects themselves are very cheap to copy or
swap, and contain only pointers to larger data structures. If needed, a
deep copy of the pointed larger data structure is also done, but only
before locking the queue.

Regards
Paavo
Oct 18 '08 #14

P: n/a
On Oct 17, 9:12*am, James Kanze <james.ka...@gmail.comwrote:
On Oct 15, 6:02 pm, Maxim Yegorushkin

<maxim.yegorush...@gmail.comwrote:
On Oct 15, 2:36 pm, Nordl÷w <per.nord...@gmail.comwrote:

* * [...]
Apart from that, the mutex is held for too long. You don't
really need to hold the lock when allocating memory for
elements and when invoking the copy constructor of the
elements.

Which sounds a lot like pre-mature optimization to me. *First
get the queue working, then see if there is a performance
problem, and only then, do something about it. *(Given that
clients will normally only call functions on the queue when they
have nothing to do, waiting a couple of microseconds longer on
the lock won't impact anything.)
Here is an improved version (although a bit simplified):
* * void push(T const& t)

Actually, I'd name this "send", and not "push". *Just because
the standard library uses very poor names doesn't mean we have
to.
* * {
* * * * // this avoids an allocation inside the critical section
bellow
* * * * queue_type tmp(&t, &t + 1);
* * * * {
* * * * * * boost::mutex::scoped_lock l(mtx_);
* * * * * * q_.splice(q_.end(), tmp);
* * * * }
* * * * cnd_.notify_one();
* * }

This is unnecessary complexity. *And probably looses runtime
efficiency (not that it's important): his initial version uses
std::deque, which doesn't have to allocate at each
insertion---in fact, in all of the uses I've measured, the queue
tends to hit its maximum size pretty quickly, and there are no
more allocations after that.

Yet another case where premature optimization turns out to be
pessimization.

* * [...]
* *// this function provides only basic exception safety if T's
* *// copy ctor can throw or strong exception safety if T's copy
* *// ctor is nothrow

:-).

In practice, I find that almost all of my inter-thread queues
need to contain polymorphic objects. *Which means that the queue
contains pointers, and that all of the objects will in fact be
dynamically allocated. *The result is that I use std::auto_ptr
in the interface (so the producer can't access the object once it
has been passed off, and the consumer knows to delete it).
I agree with you that holding work elements by value is not most
practical. boost::function<and std::list<were used only for
simplicity here.

As you said, in practice, the work units are dynamically allocated
polymorphic objects. Naturally, the work unit base class is also a
(singly-linked) list node and the queue is implemented as an intrusive
list. This way, once a work unit has been allocated, the operations on
the inter-thread queue do not involve any memory allocations.

Something like this:

struct WorkUnit
{
WorkUnit* next;

WorkUnit()
: next()
{}

virtual ~WorkUnit() = 0;
virtual void execute() = 0;
virtual void release() { delete this; }
};

template<class T>
struct IntrusiveQueue
{
T *head, **tail;

IntrusiveQueue()
: head()
, tail(&head)
{}

void push_back(T* n)
{
*tail = n;
tail = &n->next;
}

T* pop_front()
{
T* n = head;
if(head && !(head = head->next))
tail = &head;
return n;
}
};

--
Max
Oct 20 '08 #15

P: n/a
Nordl÷w wrote:
[...]
> Ah, thanks. I haven't looked at boost's threads yet.
How can I your queue structure in the following code example:
Even if I could parse your sentence (which I fail), I fail to see
why you're asking me this.
[...]
Schobi
Oct 20 '08 #16

P: n/a
Nordl÷w wrote:
I am currently designing a synchronized queue used to communicate
between threads. Is the code given below a good solution? Am I
using mutex lock/unlock more than needed?

Are there any resources out there on the Internet on how to design
*thread-safe* *efficient* data-
structures?

/Nordl÷w
November issue of DDJ has a good article by Herb Sutter "Writing
a Generalized Concurrent Queue". Some may consider it full of
premature optimizations, but it's still a good reading. Watch DDJ
site for it to appear online soon.

Andy.
Oct 22 '08 #17

P: n/a
On Oct 17, 10:24 am, Szabolcs Ferenczi <szabolcs.feren...@gmail.com>
wrote:
On Oct 17, 9:43 am, James Kanze <james.ka...@gmail.comwrote:
On Oct 16, 5:42 pm, Szabolcs Ferenczi <szabolcs.feren...@gmail.com>
wrote:
On Oct 15, 3:36 pm, Nordl÷w <per.nord...@gmail.comwrote:
[...]
Are there any resources out there on the Internet on how to
design *thread-safe* *efficient* data- structures?
Sure.http://www.google.nl/search?q=boost+thread+safe+queue=
You have to be very careful with googling in cases like
this. There's an awful lot of junk on the net. Just
looking at the first hit, for example, it's quite clear that
the author doesn't know what he's talking about, and I
suspect that that's true in a large number of cases.
Hmmmm... For me the first hit is a didactic piece by Anthony
Williams:
Implementing a Thread-Safe Queue using Condition Variables ...
In those cases, it might be worth using something like boost::optional
to avoid this requirement ... Tags: threading, thread safe, queue,
condition variable ...http://www.justsoftwaresolutions.co....a-thread-safe-...
Yep. That was the example I was talking about.
Saying that "it's quite clear that the author doesn't know
what he's talking about" is, hmmm..., at least indicates
something about you.
Yes. That I know a bit about programming in a multithreaded
environment, and can spot basic stupidities right away.
I do not want to defend him but if you just read it to the
end, you must have learnt something, I guess.
I learned that he doesn't fully understand the implications.
You should not stop by the first fragment which is just a
starting point illustrating the problem.
When I see threaded code returning a reference into a protected
structure, after having freed the lock, I stop. No one who
understands threading would ever write something like that.
I agree in that he should not have suggest such a bad habit of
handling a shared resource in the front part of his article
or, at least, he should have warned the smattering reader that
it is not the correct way.
The rest just goes on to present what everyone knows anyway.

--
James Kanze (GABI Software) email:ja*********@gmail.com
Conseils en informatique orientÚe objet/
Beratung in objektorientierter Datenverarbeitung
9 place SÚmard, 78210 St.-Cyr-l'╔cole, France, +33 (0)1 30 23 00 34
Oct 22 '08 #18

P: n/a
On Oct 22, 6:41*pm, James Kanze <james.ka...@gmail.comwrote:
On Oct 17, 10:24 am, Szabolcs Ferenczi <szabolcs.feren...@gmail.com>
wrote:
You should not stop by the first fragment which is just a
starting point illustrating the problem.

When I see threaded code returning a reference into a protected
structure, after having freed the lock, I stop. *No one who
understands threading would ever write something like that.
Well, the queue is a single consumer object (the author explicitly
says that), so as long as the consumer doesn't pop, the reference is
never invalidated (it uses a deque internally), at least with all sane
implementations of deque (and I'm fairly sure that c++0x will require
it to work).
I agree in that he should not have suggest such a bad habit of
handling a shared resource in the front part of his article
or, at least, he should have warned the smattering reader that
it is not the correct way.

The rest just goes on to present what everyone knows anyway.
'Everyone' is a fairly strong word. In my experience, many programmers
have no idea of what a condition variable is or how to use it.

--
gpd
Oct 22 '08 #19

P: n/a
On Oct 22, 7:04*pm, gpderetta <gpdere...@gmail.comwrote:
On Oct 22, 6:41*pm, James Kanze <james.ka...@gmail.comwrote:
On Oct 17, 10:24 am, Szabolcs Ferenczi <szabolcs.feren...@gmail.com>
wrote:
You should not stop by the first fragment which is just a
starting point illustrating the problem.
When I see threaded code returning a reference into a
protected structure, after having freed the lock, I stop.
*No one who understands threading would ever write something
like that.
Well, the queue is a single consumer object (the author
explicitly says that), so as long as the consumer doesn't pop,
the reference is never invalidated (it uses a deque
internally), at least with all sane implementations of deque
(and I'm fairly sure that c++0x will require it to work).
In other words, if it works, it works. The author does vaguely
mention something about "single consumer" after presenting the
code, but there's certainly nothing fundamental in his code
which prevents the client from reading from several different
threads. *IF* you're claiming any sort of thread safety
contract which doesn't require external synchronization, you
don't allow pointers and references to external data to escape.
I agree in that he should not have suggest such a bad
habit of handling a shared resource in the front part of
his article or, at least, he should have warned the
smattering reader that it is not the correct way.
The rest just goes on to present what everyone knows anyway.
'Everyone' is a fairly strong word. In my experience, many
programmers have no idea of what a condition variable is or
how to use it.
Yes. I should have said: everyone who knows anything about
programming with threads. From what I've seen, that's really a
minority.

My problem with the article is twofold: first, he presents a
simplistic implementation which is simply too dangerous to
consider. He does mention a vague constraint concerning its
use, but without explaining why, or covering any of the basic
principles. And he then goes on to explain how to use one
particular construct, a condition, again without explaining any
of the basic details. If someone reads just that article,
they're going to mess things up seriously, because they don't
know the basic principles. And if they've learned the basic
principles, say by reading Butenhof, then they already know
everything the article presents.

--
James Kanze (GABI Software) email:ja*********@gmail.com
Conseils en informatique orientÚe objet/
Beratung in objektorientierter Datenverarbeitung
9 place SÚmard, 78210 St.-Cyr-l'╔cole, France, +33 (0)1 30 23 00 34
Oct 22 '08 #20

This discussion thread is closed

Replies have been disabled for this discussion.