1 // Copyright (C) 2019 Austin Beer
2 // Copyright (C) 2019 Vicente J. Botet Escriba
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7
8 #include <boost/config.hpp>
9 #if ! defined BOOST_NO_CXX11_DECLTYPE
10 #define BOOST_RESULT_OF_USE_DECLTYPE
11 #endif
12
13 #define BOOST_THREAD_VERSION 4
14 #define BOOST_THREAD_PROVIDES_EXECUTORS
15
16 #include <boost/thread.hpp>
17 #include <boost/chrono.hpp>
18 #include <boost/thread/concurrent_queues/sync_timed_queue.hpp>
19
20 #include <boost/core/lightweight_test.hpp>
21 #include "../../../timming.hpp"
22
23 using namespace boost::chrono;
24
25 typedef boost::concurrent::sync_timed_queue<int> sync_tq;
26
27 const int cnt = 5;
28
call_push(sync_tq * q,const steady_clock::time_point start)29 void call_push(sync_tq* q, const steady_clock::time_point start)
30 {
31 // push elements onto the queue every 500 milliseconds but with a decreasing delay each time
32 for (int i = 0; i < cnt; ++i)
33 {
34 boost::this_thread::sleep_until(start + milliseconds(i * 500));
35 const steady_clock::time_point expected = start + milliseconds(i * 500) + seconds(cnt - i);
36 q->push(i, expected);
37 }
38 }
39
call_pull(sync_tq * q,const steady_clock::time_point start)40 void call_pull(sync_tq* q, const steady_clock::time_point start)
41 {
42 // pull elements off of the queue (earliest element first)
43 for (int i = cnt - 1; i >= 0; --i)
44 {
45 int j;
46 q->pull(j);
47 BOOST_TEST_EQ(i, j);
48 const steady_clock::time_point expected = start + milliseconds(i * 500) + seconds(cnt - i);
49 BOOST_TEST_GE(steady_clock::now(), expected - milliseconds(BOOST_THREAD_TEST_TIME_MS));
50 BOOST_TEST_LE(steady_clock::now(), expected + milliseconds(BOOST_THREAD_TEST_TIME_MS));
51 }
52 }
53
call_pull_until(sync_tq * q,const steady_clock::time_point start)54 void call_pull_until(sync_tq* q, const steady_clock::time_point start)
55 {
56 // pull elements off of the queue (earliest element first)
57 for (int i = cnt - 1; i >= 0; --i)
58 {
59 int j;
60 q->pull_until(steady_clock::now() + hours(1), j);
61 BOOST_TEST_EQ(i, j);
62 const steady_clock::time_point expected = start + milliseconds(i * 500) + seconds(cnt - i);
63 BOOST_TEST_GE(steady_clock::now(), expected - milliseconds(BOOST_THREAD_TEST_TIME_MS));
64 BOOST_TEST_LE(steady_clock::now(), expected + milliseconds(BOOST_THREAD_TEST_TIME_MS));
65 }
66 }
67
call_pull_for(sync_tq * q,const steady_clock::time_point start)68 void call_pull_for(sync_tq* q, const steady_clock::time_point start)
69 {
70 // pull elements off of the queue (earliest element first)
71 for (int i = cnt - 1; i >= 0; --i)
72 {
73 int j;
74 q->pull_for(hours(1), j);
75 BOOST_TEST_EQ(i, j);
76 const steady_clock::time_point expected = start + milliseconds(i * 500) + seconds(cnt - i);
77 BOOST_TEST_GE(steady_clock::now(), expected - milliseconds(BOOST_THREAD_TEST_TIME_MS));
78 BOOST_TEST_LE(steady_clock::now(), expected + milliseconds(BOOST_THREAD_TEST_TIME_MS));
79 }
80 }
81
test_push_while_pull()82 void test_push_while_pull()
83 {
84 sync_tq tq;
85 BOOST_TEST(tq.empty());
86 boost::thread_group tg;
87 const steady_clock::time_point start = steady_clock::now();
88 tg.create_thread(boost::bind(call_push, &tq, start));
89 tg.create_thread(boost::bind(call_pull, &tq, start));
90 tg.join_all();
91 BOOST_TEST(tq.empty());
92 }
93
test_push_while_pull_until()94 void test_push_while_pull_until()
95 {
96 sync_tq tq;
97 BOOST_TEST(tq.empty());
98 boost::thread_group tg;
99 const steady_clock::time_point start = steady_clock::now();
100 tg.create_thread(boost::bind(call_push, &tq, start));
101 tg.create_thread(boost::bind(call_pull_until, &tq, start));
102 tg.join_all();
103 BOOST_TEST(tq.empty());
104 }
105
test_push_while_pull_for()106 void test_push_while_pull_for()
107 {
108 sync_tq tq;
109 BOOST_TEST(tq.empty());
110 boost::thread_group tg;
111 const steady_clock::time_point start = steady_clock::now();
112 tg.create_thread(boost::bind(call_push, &tq, start));
113 tg.create_thread(boost::bind(call_pull_for, &tq, start));
114 tg.join_all();
115 BOOST_TEST(tq.empty());
116 }
117
main()118 int main()
119 {
120 test_push_while_pull();
121 test_push_while_pull_until();
122 test_push_while_pull_for();
123 return boost::report_errors();
124 }
125