1 // Copyright 2020, The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 //! Async task tests.
16 use super::{AsyncTask, Shelf};
17 use std::sync::{
18 mpsc::{channel, sync_channel, RecvTimeoutError},
19 Arc,
20 };
21 use std::time::Duration;
22
23 #[test]
test_shelf()24 fn test_shelf() {
25 let mut shelf = Shelf::default();
26
27 let s = "A string".to_string();
28 assert_eq!(shelf.put(s), None);
29
30 let s2 = "Another string".to_string();
31 assert_eq!(shelf.put(s2), Some("A string".to_string()));
32
33 // Put something of a different type on the shelf.
34 #[derive(Debug, PartialEq, Eq)]
35 struct Elf {
36 pub name: String,
37 }
38 let e1 = Elf { name: "Glorfindel".to_string() };
39 assert_eq!(shelf.put(e1), None);
40
41 // The String value is still on the shelf.
42 let s3 = shelf.get_downcast_ref::<String>().unwrap();
43 assert_eq!(s3, "Another string");
44
45 // As is the Elf.
46 {
47 let e2 = shelf.get_downcast_mut::<Elf>().unwrap();
48 assert_eq!(e2.name, "Glorfindel");
49 e2.name = "Celeborn".to_string();
50 }
51
52 // Take the Elf off the shelf.
53 let e3 = shelf.remove_downcast_ref::<Elf>().unwrap();
54 assert_eq!(e3.name, "Celeborn");
55
56 assert_eq!(shelf.remove_downcast_ref::<Elf>(), None);
57
58 // No u64 value has been put on the shelf, so getting one gives the default value.
59 {
60 let i = shelf.get_mut::<u64>();
61 assert_eq!(*i, 0);
62 *i = 42;
63 }
64 let i2 = shelf.get_downcast_ref::<u64>().unwrap();
65 assert_eq!(*i2, 42);
66
67 // No i32 value has ever been seen near the shelf.
68 assert_eq!(shelf.get_downcast_ref::<i32>(), None);
69 assert_eq!(shelf.get_downcast_mut::<i32>(), None);
70 assert_eq!(shelf.remove_downcast_ref::<i32>(), None);
71 }
72
73 #[test]
test_async_task()74 fn test_async_task() {
75 let at = AsyncTask::default();
76
77 // First queue up a job that blocks until we release it, to avoid
78 // unpredictable synchronization.
79 let (start_sender, start_receiver) = channel();
80 at.queue_hi(move |shelf| {
81 start_receiver.recv().unwrap();
82 // Put a trace vector on the shelf
83 shelf.put(Vec::<String>::new());
84 });
85
86 // Queue up some high-priority and low-priority jobs.
87 for i in 0..3 {
88 let j = i;
89 at.queue_lo(move |shelf| {
90 let trace = shelf.get_mut::<Vec<String>>();
91 trace.push(format!("L{}", j));
92 });
93 let j = i;
94 at.queue_hi(move |shelf| {
95 let trace = shelf.get_mut::<Vec<String>>();
96 trace.push(format!("H{}", j));
97 });
98 }
99
100 // Finally queue up a low priority job that emits the trace.
101 let (trace_sender, trace_receiver) = channel();
102 at.queue_lo(move |shelf| {
103 let trace = shelf.get_downcast_ref::<Vec<String>>().unwrap();
104 trace_sender.send(trace.clone()).unwrap();
105 });
106
107 // Ready, set, go.
108 start_sender.send(()).unwrap();
109 let trace = trace_receiver.recv().unwrap();
110
111 assert_eq!(trace, vec!["H0", "H1", "H2", "L0", "L1", "L2"]);
112 }
113
114 #[test]
test_async_task_chain()115 fn test_async_task_chain() {
116 let at = Arc::new(AsyncTask::default());
117 let (sender, receiver) = channel();
118 // Queue up a job that will queue up another job. This confirms
119 // that the job is not invoked with any internal AsyncTask locks held.
120 let at_clone = at.clone();
121 at.queue_hi(move |_shelf| {
122 at_clone.queue_lo(move |_shelf| {
123 sender.send(()).unwrap();
124 });
125 });
126 receiver.recv().unwrap();
127 }
128
129 #[test]
130 #[should_panic]
test_async_task_panic()131 fn test_async_task_panic() {
132 let at = AsyncTask::default();
133 at.queue_hi(|_shelf| {
134 panic!("Panic from queued job");
135 });
136 // Queue another job afterwards to ensure that the async thread gets joined.
137 let (done_sender, done_receiver) = channel();
138 at.queue_hi(move |_shelf| {
139 done_sender.send(()).unwrap();
140 });
141 done_receiver.recv().unwrap();
142 }
143
144 #[test]
test_async_task_idle()145 fn test_async_task_idle() {
146 let at = AsyncTask::new(Duration::from_secs(3));
147 // Need a SyncSender as it is Send+Sync.
148 let (idle_done_sender, idle_done_receiver) = sync_channel::<()>(3);
149 at.add_idle(move |_shelf| {
150 idle_done_sender.send(()).unwrap();
151 });
152
153 // Queue up some high-priority and low-priority jobs that take time.
154 for _i in 0..3 {
155 at.queue_lo(|_shelf| {
156 std::thread::sleep(Duration::from_millis(500));
157 });
158 at.queue_hi(|_shelf| {
159 std::thread::sleep(Duration::from_millis(500));
160 });
161 }
162 // Final low-priority job.
163 let (done_sender, done_receiver) = channel();
164 at.queue_lo(move |_shelf| {
165 done_sender.send(()).unwrap();
166 });
167
168 // Nothing happens until the last job completes.
169 assert_eq!(
170 idle_done_receiver.recv_timeout(Duration::from_secs(1)),
171 Err(RecvTimeoutError::Timeout)
172 );
173 done_receiver.recv().unwrap();
174 // Now that the last low-priority job has completed, the idle task should
175 // fire pretty much immediately.
176 idle_done_receiver.recv_timeout(Duration::from_millis(50)).unwrap();
177
178 // Idle callback not executed again even if we wait for a while.
179 assert_eq!(
180 idle_done_receiver.recv_timeout(Duration::from_secs(3)),
181 Err(RecvTimeoutError::Timeout)
182 );
183
184 // However, if more work is done then there's another chance to go idle.
185 let (done_sender, done_receiver) = channel();
186 at.queue_hi(move |_shelf| {
187 std::thread::sleep(Duration::from_millis(500));
188 done_sender.send(()).unwrap();
189 });
190 // Idle callback not immediately executed, because the high priority
191 // job is taking a while.
192 assert_eq!(
193 idle_done_receiver.recv_timeout(Duration::from_millis(1)),
194 Err(RecvTimeoutError::Timeout)
195 );
196 done_receiver.recv().unwrap();
197 idle_done_receiver.recv_timeout(Duration::from_millis(50)).unwrap();
198 }
199
200 #[test]
test_async_task_multiple_idle()201 fn test_async_task_multiple_idle() {
202 let at = AsyncTask::new(Duration::from_secs(3));
203 let (idle_sender, idle_receiver) = sync_channel::<i32>(5);
204 // Queue a high priority job to start things off
205 at.queue_hi(|_shelf| {
206 std::thread::sleep(Duration::from_millis(500));
207 });
208
209 // Multiple idle callbacks.
210 for i in 0..3 {
211 let idle_sender = idle_sender.clone();
212 at.add_idle(move |_shelf| {
213 idle_sender.send(i).unwrap();
214 });
215 }
216
217 // Nothing happens immediately.
218 assert_eq!(
219 idle_receiver.recv_timeout(Duration::from_millis(1)),
220 Err(RecvTimeoutError::Timeout)
221 );
222 // Wait for a moment and the idle jobs should have run.
223 std::thread::sleep(Duration::from_secs(1));
224
225 let mut results = Vec::new();
226 while let Ok(i) = idle_receiver.recv_timeout(Duration::from_millis(1)) {
227 results.push(i);
228 }
229 assert_eq!(results, [0, 1, 2]);
230 }
231
232 #[test]
test_async_task_idle_queues_job()233 fn test_async_task_idle_queues_job() {
234 let at = Arc::new(AsyncTask::new(Duration::from_secs(1)));
235 let at_clone = at.clone();
236 let (idle_sender, idle_receiver) = sync_channel::<i32>(100);
237 // Add an idle callback that queues a low-priority job.
238 at.add_idle(move |shelf| {
239 at_clone.queue_lo(|_shelf| {
240 // Slow things down so the channel doesn't fill up.
241 std::thread::sleep(Duration::from_millis(50));
242 });
243 let i = shelf.get_mut::<i32>();
244 idle_sender.send(*i).unwrap();
245 *i += 1;
246 });
247
248 // Nothing happens immediately.
249 assert_eq!(
250 idle_receiver.recv_timeout(Duration::from_millis(1500)),
251 Err(RecvTimeoutError::Timeout)
252 );
253
254 // Once we queue a normal job, things start.
255 at.queue_hi(|_shelf| {});
256 assert_eq!(0, idle_receiver.recv_timeout(Duration::from_millis(200)).unwrap());
257
258 // The idle callback queues a job, and completion of that job
259 // means the task is going idle again...so the idle callback will
260 // be called repeatedly.
261 assert_eq!(1, idle_receiver.recv_timeout(Duration::from_millis(100)).unwrap());
262 assert_eq!(2, idle_receiver.recv_timeout(Duration::from_millis(100)).unwrap());
263 assert_eq!(3, idle_receiver.recv_timeout(Duration::from_millis(100)).unwrap());
264 }
265
266 #[test]
267 #[should_panic]
test_async_task_idle_panic()268 fn test_async_task_idle_panic() {
269 let at = AsyncTask::new(Duration::from_secs(1));
270 let (idle_sender, idle_receiver) = sync_channel::<()>(3);
271 // Add an idle callback that panics.
272 at.add_idle(move |_shelf| {
273 idle_sender.send(()).unwrap();
274 panic!("Panic from idle callback");
275 });
276 // Queue a job to trigger idleness and ensuing panic.
277 at.queue_hi(|_shelf| {});
278 idle_receiver.recv().unwrap();
279
280 // Queue another job afterwards to ensure that the async thread gets joined
281 // and the panic detected.
282 let (done_sender, done_receiver) = channel();
283 at.queue_hi(move |_shelf| {
284 done_sender.send(()).unwrap();
285 });
286 done_receiver.recv().unwrap();
287 }
288