xref: /aosp_15_r20/system/extras/profcollectd/libprofcollectd/scheduler.rs (revision 288bf5226967eb3dac5cce6c939ccc2a7f2b4fe5)
1 //
2 // Copyright (C) 2021 The Android Open Source Project
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //      http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 //! ProfCollect tracing scheduler.
18 
19 use std::fs;
20 use std::mem;
21 use std::path::Path;
22 use std::sync::mpsc::{sync_channel, SyncSender};
23 use std::sync::Arc;
24 use std::sync::Mutex;
25 use std::thread;
26 use std::time::{Duration, Instant};
27 
28 use crate::config::{get_sampling_period, Config, LOG_FILE, PROFILE_OUTPUT_DIR, TRACE_OUTPUT_DIR};
29 use crate::trace_provider::{self, TraceProvider};
30 use anyhow::{anyhow, ensure, Context, Result};
31 
32 pub struct Scheduler {
33     /// Signal to terminate the periodic collection worker thread, None if periodic collection is
34     /// not scheduled.
35     termination_ch: Option<SyncSender<()>>,
36     /// The preferred trace provider for the system.
37     trace_provider: Arc<Mutex<dyn TraceProvider + Send>>,
38     provider_ready_callbacks: Arc<Mutex<Vec<Box<dyn FnOnce() + Send>>>>,
39 }
40 
41 impl Scheduler {
new() -> Result<Self>42     pub fn new() -> Result<Self> {
43         let p = trace_provider::get_trace_provider()?;
44         p.lock().map_err(|e| anyhow!(e.to_string()))?.set_log_file(&LOG_FILE);
45         Ok(Scheduler {
46             termination_ch: None,
47             trace_provider: p,
48             provider_ready_callbacks: Arc::new(Mutex::new(Vec::new())),
49         })
50     }
51 
is_scheduled(&self) -> bool52     fn is_scheduled(&self) -> bool {
53         self.termination_ch.is_some()
54     }
55 
schedule_periodic(&mut self, config: &Config) -> Result<()>56     pub fn schedule_periodic(&mut self, config: &Config) -> Result<()> {
57         ensure!(!self.is_scheduled(), "Already scheduled.");
58 
59         let (sender, receiver) = sync_channel(1);
60         self.termination_ch = Some(sender);
61 
62         // Clone config and trace_provider ARC for the worker thread.
63         let config = config.clone();
64         let trace_provider = self.trace_provider.clone();
65 
66         thread::spawn(move || {
67             loop {
68                 match receiver.recv_timeout(config.collection_interval) {
69                     Ok(_) => break,
70                     Err(_) => {
71                         // Did not receive a termination signal, initiate trace event.
72                         if check_space_limit(&TRACE_OUTPUT_DIR, &config).unwrap() {
73                             trace_provider.lock().unwrap().trace_system(
74                                 &TRACE_OUTPUT_DIR,
75                                 "periodic",
76                                 &get_sampling_period(),
77                                 &config.binary_filter,
78                             );
79                         }
80                     }
81                 }
82             }
83         });
84         Ok(())
85     }
86 
terminate_periodic(&mut self) -> Result<()>87     pub fn terminate_periodic(&mut self) -> Result<()> {
88         self.termination_ch
89             .as_ref()
90             .ok_or_else(|| anyhow!("Not scheduled"))?
91             .send(())
92             .context("Scheduler worker disappeared.")?;
93         self.termination_ch = None;
94         Ok(())
95     }
96 
trace_system(&self, config: &Config, tag: &str) -> Result<()>97     pub fn trace_system(&self, config: &Config, tag: &str) -> Result<()> {
98         let trace_provider = self.trace_provider.clone();
99         if check_space_limit(&TRACE_OUTPUT_DIR, config)? {
100             trace_provider.lock().unwrap().trace_system(
101                 &TRACE_OUTPUT_DIR,
102                 tag,
103                 &get_sampling_period(),
104                 &config.binary_filter,
105             );
106         }
107         Ok(())
108     }
109 
trace_process( &self, config: &Config, tag: &str, processes: &str, samplng_period: f32, ) -> Result<()>110     pub fn trace_process(
111         &self,
112         config: &Config,
113         tag: &str,
114         processes: &str,
115         samplng_period: f32,
116     ) -> Result<()> {
117         let trace_provider = self.trace_provider.clone();
118         let duration = match samplng_period {
119             0.0 => get_sampling_period(),
120             _ => Duration::from_millis(samplng_period as u64),
121         };
122         if check_space_limit(&TRACE_OUTPUT_DIR, config)? {
123             trace_provider.lock().unwrap().trace_process(
124                 &TRACE_OUTPUT_DIR,
125                 tag,
126                 &duration,
127                 processes,
128             );
129         }
130         Ok(())
131     }
132 
process(&self, config: &Config) -> Result<()>133     pub fn process(&self, config: &Config) -> Result<()> {
134         let trace_provider = self.trace_provider.clone();
135         trace_provider
136             .lock()
137             .unwrap()
138             .process(&TRACE_OUTPUT_DIR, &PROFILE_OUTPUT_DIR, &config.binary_filter)
139             .context("Failed to process profiles.")?;
140         Ok(())
141     }
142 
get_trace_provider_name(&self) -> &'static str143     pub fn get_trace_provider_name(&self) -> &'static str {
144         self.trace_provider.lock().unwrap().get_name()
145     }
146 
is_provider_ready(&self) -> bool147     pub fn is_provider_ready(&self) -> bool {
148         self.trace_provider.lock().unwrap().is_ready()
149     }
150 
register_provider_ready_callback(&self, cb: Box<dyn FnOnce() + Send>)151     pub fn register_provider_ready_callback(&self, cb: Box<dyn FnOnce() + Send>) {
152         let mut locked_callbacks = self.provider_ready_callbacks.lock().unwrap();
153         locked_callbacks.push(cb);
154         if locked_callbacks.len() == 1 {
155             self.start_thread_waiting_for_provider_ready();
156         }
157     }
158 
start_thread_waiting_for_provider_ready(&self)159     fn start_thread_waiting_for_provider_ready(&self) {
160         let provider = self.trace_provider.clone();
161         let callbacks = self.provider_ready_callbacks.clone();
162 
163         thread::spawn(move || {
164             let start_time = Instant::now();
165             loop {
166                 let elapsed = Instant::now().duration_since(start_time);
167                 if provider.lock().unwrap().is_ready() {
168                     break;
169                 }
170                 // Decide check period based on how long we have waited:
171                 // For the first 10s waiting, check every 100ms (likely to work on EVT devices).
172                 // For the first 10m waiting, check every 10s (likely to work on DVT devices).
173                 // For others, check every 10m.
174                 let sleep_duration = if elapsed < Duration::from_secs(10) {
175                     Duration::from_millis(100)
176                 } else if elapsed < Duration::from_secs(60 * 10) {
177                     Duration::from_secs(10)
178                 } else {
179                     Duration::from_secs(60 * 10)
180                 };
181                 thread::sleep(sleep_duration);
182             }
183 
184             let mut locked_callbacks = callbacks.lock().unwrap();
185             let v = mem::take(&mut *locked_callbacks);
186             for cb in v {
187                 cb();
188             }
189         });
190     }
191 
clear_trace_log(&self) -> Result<()>192     pub fn clear_trace_log(&self) -> Result<()> {
193         let provider = self.trace_provider.lock().map_err(|e| anyhow!(e.to_string()))?;
194         provider.reset_log_file();
195         let mut result = Ok(());
196         if LOG_FILE.exists() {
197             result = fs::remove_file(*LOG_FILE).map_err(|e| anyhow!(e));
198         }
199         provider.set_log_file(&LOG_FILE);
200         result
201     }
202 }
203 
204 /// Run if space usage is under limit.
check_space_limit(path: &Path, config: &Config) -> Result<bool>205 fn check_space_limit(path: &Path, config: &Config) -> Result<bool> {
206     // Returns the size of a directory, non-recursive.
207     let dir_size = |path| -> Result<u64> {
208         fs::read_dir(path)?.try_fold(0, |acc, file| {
209             let metadata = file?.metadata()?;
210             let size = if metadata.is_file() { metadata.len() } else { 0 };
211             Ok(acc + size)
212         })
213     };
214 
215     if dir_size(path)? > config.max_trace_limit_mb * 1024 * 1024 {
216         log::error!("trace storage exhausted.");
217         return Ok(false);
218     }
219     Ok(true)
220 }
221