1 // Copyright (C) 2019 Austin Beer
2 // Copyright (C) 2019 Vicente J. Botet Escriba
3 //
4 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
5 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 
8 #include <boost/config.hpp>
9 #if ! defined  BOOST_NO_CXX11_DECLTYPE
10 #define BOOST_RESULT_OF_USE_DECLTYPE
11 #endif
12 
13 #define BOOST_THREAD_VERSION 4
14 #define BOOST_THREAD_PROVIDES_EXECUTORS
15 
16 #include <boost/thread.hpp>
17 #include <boost/chrono.hpp>
18 #include <boost/thread/concurrent_queues/sync_timed_queue.hpp>
19 
20 #include <boost/core/lightweight_test.hpp>
21 #include "../../../timming.hpp"
22 
23 using namespace boost::chrono;
24 
25 typedef boost::concurrent::sync_timed_queue<int> sync_tq;
26 
27 const int cnt = 5;
28 
call_push(sync_tq * q,const steady_clock::time_point start)29 void call_push(sync_tq* q, const steady_clock::time_point start)
30 {
31     // push elements onto the queue every 500 milliseconds but with a decreasing delay each time
32     for (int i = 0; i < cnt; ++i)
33     {
34         boost::this_thread::sleep_until(start + milliseconds(i * 500));
35         const steady_clock::time_point expected = start + milliseconds(i * 500) + seconds(cnt - i);
36         q->push(i, expected);
37     }
38 }
39 
call_pull(sync_tq * q,const steady_clock::time_point start)40 void call_pull(sync_tq* q, const steady_clock::time_point start)
41 {
42     // pull elements off of the queue (earliest element first)
43     for (int i = cnt - 1; i >= 0; --i)
44     {
45         int j;
46         q->pull(j);
47         BOOST_TEST_EQ(i, j);
48         const steady_clock::time_point expected = start + milliseconds(i * 500) + seconds(cnt - i);
49         BOOST_TEST_GE(steady_clock::now(), expected - milliseconds(BOOST_THREAD_TEST_TIME_MS));
50         BOOST_TEST_LE(steady_clock::now(), expected + milliseconds(BOOST_THREAD_TEST_TIME_MS));
51     }
52 }
53 
call_pull_until(sync_tq * q,const steady_clock::time_point start)54 void call_pull_until(sync_tq* q, const steady_clock::time_point start)
55 {
56     // pull elements off of the queue (earliest element first)
57     for (int i = cnt - 1; i >= 0; --i)
58     {
59         int j;
60         q->pull_until(steady_clock::now() + hours(1), j);
61         BOOST_TEST_EQ(i, j);
62         const steady_clock::time_point expected = start + milliseconds(i * 500) + seconds(cnt - i);
63         BOOST_TEST_GE(steady_clock::now(), expected - milliseconds(BOOST_THREAD_TEST_TIME_MS));
64         BOOST_TEST_LE(steady_clock::now(), expected + milliseconds(BOOST_THREAD_TEST_TIME_MS));
65     }
66 }
67 
call_pull_for(sync_tq * q,const steady_clock::time_point start)68 void call_pull_for(sync_tq* q, const steady_clock::time_point start)
69 {
70     // pull elements off of the queue (earliest element first)
71     for (int i = cnt - 1; i >= 0; --i)
72     {
73         int j;
74         q->pull_for(hours(1), j);
75         BOOST_TEST_EQ(i, j);
76         const steady_clock::time_point expected = start + milliseconds(i * 500) + seconds(cnt - i);
77         BOOST_TEST_GE(steady_clock::now(), expected - milliseconds(BOOST_THREAD_TEST_TIME_MS));
78         BOOST_TEST_LE(steady_clock::now(), expected + milliseconds(BOOST_THREAD_TEST_TIME_MS));
79     }
80 }
81 
test_push_while_pull()82 void test_push_while_pull()
83 {
84     sync_tq tq;
85     BOOST_TEST(tq.empty());
86     boost::thread_group tg;
87     const steady_clock::time_point start = steady_clock::now();
88     tg.create_thread(boost::bind(call_push, &tq, start));
89     tg.create_thread(boost::bind(call_pull, &tq, start));
90     tg.join_all();
91     BOOST_TEST(tq.empty());
92 }
93 
test_push_while_pull_until()94 void test_push_while_pull_until()
95 {
96     sync_tq tq;
97     BOOST_TEST(tq.empty());
98     boost::thread_group tg;
99     const steady_clock::time_point start = steady_clock::now();
100     tg.create_thread(boost::bind(call_push, &tq, start));
101     tg.create_thread(boost::bind(call_pull_until, &tq, start));
102     tg.join_all();
103     BOOST_TEST(tq.empty());
104 }
105 
test_push_while_pull_for()106 void test_push_while_pull_for()
107 {
108     sync_tq tq;
109     BOOST_TEST(tq.empty());
110     boost::thread_group tg;
111     const steady_clock::time_point start = steady_clock::now();
112     tg.create_thread(boost::bind(call_push, &tq, start));
113     tg.create_thread(boost::bind(call_pull_for, &tq, start));
114     tg.join_all();
115     BOOST_TEST(tq.empty());
116 }
117 
main()118 int main()
119 {
120     test_push_while_pull();
121     test_push_while_pull_until();
122     test_push_while_pull_for();
123     return boost::report_errors();
124 }
125