xref: /aosp_15_r20/external/crosvm/fuse/src/worker.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::fs::File;
6 use std::io;
7 use std::io::BufRead;
8 use std::io::BufReader;
9 use std::io::Cursor;
10 use std::io::Read;
11 use std::io::Write;
12 use std::mem::size_of;
13 use std::os::unix::fs::FileExt;
14 use std::os::unix::io::AsRawFd;
15 use std::sync::Arc;
16 
17 use base::Protection;
18 
19 use crate::filesystem::FileSystem;
20 use crate::filesystem::ZeroCopyReader;
21 use crate::filesystem::ZeroCopyWriter;
22 use crate::server::Mapper;
23 use crate::server::Reader;
24 use crate::server::Server;
25 use crate::server::Writer;
26 use crate::sys;
27 use crate::Error;
28 use crate::Result;
29 
30 struct DevFuseReader {
31     // File representing /dev/fuse for reading, with sufficient buffer to accommodate a FUSE read
32     // transaction.
33     reader: BufReader<File>,
34 }
35 
36 impl DevFuseReader {
new(reader: BufReader<File>) -> Self37     pub fn new(reader: BufReader<File>) -> Self {
38         DevFuseReader { reader }
39     }
40 
drain(&mut self)41     fn drain(&mut self) {
42         self.reader.consume(self.reader.buffer().len());
43     }
44 }
45 
46 impl Read for DevFuseReader {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>47     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
48         self.reader.read(buf)
49     }
50 }
51 
52 impl Reader for DevFuseReader {}
53 
54 impl ZeroCopyReader for DevFuseReader {
read_to(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize>55     fn read_to(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize> {
56         let buf = self.reader.fill_buf()?;
57         let end = std::cmp::min(count, buf.len());
58         let written = f.write_at(&buf[..end], off)?;
59         self.reader.consume(written);
60         Ok(written)
61     }
62 }
63 
64 struct DevFuseWriter {
65     // File representing /dev/fuse for writing.
66     dev_fuse: File,
67 
68     // An internal buffer to allow generating data and header out of order, such that they can be
69     // flushed at once. This is wrapped by a cursor for tracking the current written position.
70     write_buf: Cursor<Vec<u8>>,
71 }
72 
73 impl DevFuseWriter {
new(dev_fuse: File, write_buf: Cursor<Vec<u8>>) -> Self74     pub fn new(dev_fuse: File, write_buf: Cursor<Vec<u8>>) -> Self {
75         debug_assert_eq!(write_buf.position(), 0);
76 
77         DevFuseWriter {
78             dev_fuse,
79             write_buf,
80         }
81     }
82 }
83 
84 impl Write for DevFuseWriter {
write(&mut self, buf: &[u8]) -> io::Result<usize>85     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
86         self.write_buf.write(buf)
87     }
88 
flush(&mut self) -> io::Result<()>89     fn flush(&mut self) -> io::Result<()> {
90         self.dev_fuse.write_all(&self.write_buf.get_ref()[..])?;
91         self.write_buf.set_position(0);
92         self.write_buf.get_mut().clear();
93         Ok(())
94     }
95 }
96 
97 impl Writer for DevFuseWriter {
98     type ClosureWriter = Self;
99 
write_at<F>(&mut self, offset: usize, f: F) -> io::Result<usize> where F: Fn(&mut Self) -> io::Result<usize>,100     fn write_at<F>(&mut self, offset: usize, f: F) -> io::Result<usize>
101     where
102         F: Fn(&mut Self) -> io::Result<usize>,
103     {
104         // Restore the cursor for idempotent.
105         let original = self.write_buf.position();
106         self.write_buf.set_position(offset as u64);
107         let r = f(self);
108         self.write_buf.set_position(original);
109         r
110     }
111 
has_sufficient_buffer(&self, size: u32) -> bool112     fn has_sufficient_buffer(&self, size: u32) -> bool {
113         (self.write_buf.position() as usize + size as usize) < self.write_buf.get_ref().capacity()
114     }
115 }
116 
117 impl ZeroCopyWriter for DevFuseWriter {
write_from(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize>118     fn write_from(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize> {
119         let pos = self.write_buf.position() as usize;
120         let end = pos + count;
121         let buf = self.write_buf.get_mut();
122 
123         let old_end = buf.len();
124         buf.resize(end, 0);
125         let read = f.read_at(&mut buf[pos..end], off)?;
126 
127         let new_end = pos + read;
128         debug_assert!(new_end >= old_end);
129         buf.truncate(new_end);
130         self.write_buf.set_position(new_end as u64);
131         Ok(read)
132     }
133 }
134 
135 struct DevFuseMapper;
136 
137 impl DevFuseMapper {
new() -> Self138     fn new() -> Self {
139         Self {}
140     }
141 }
142 
143 impl Mapper for DevFuseMapper {
map( &self, _mem_offset: u64, _size: usize, _fd: &dyn AsRawFd, _file_offset: u64, _prot: Protection, ) -> io::Result<()>144     fn map(
145         &self,
146         _mem_offset: u64,
147         _size: usize,
148         _fd: &dyn AsRawFd,
149         _file_offset: u64,
150         _prot: Protection,
151     ) -> io::Result<()> {
152         Err(io::Error::from_raw_os_error(libc::EOPNOTSUPP))
153     }
154 
unmap(&self, _offset: u64, _size: u64) -> io::Result<()>155     fn unmap(&self, _offset: u64, _size: u64) -> io::Result<()> {
156         Err(io::Error::from_raw_os_error(libc::EOPNOTSUPP))
157     }
158 }
159 
160 /// Start the FUSE message handling loop. Returns when an error happens.
161 ///
162 /// # Arguments
163 ///
164 /// * `dev_fuse` - A `File` object of /dev/fuse
165 /// * `input_buffer_size` - Maximum bytes of the buffer when reads from /dev/fuse.
166 /// * `output_buffer_size` - Maximum bytes of the buffer when writes to /dev/fuse. Must be large
167 ///   enough (usually equal) to `n` in `MountOption::MaxRead(n)`.
168 ///
169 /// [deprecated(note="Please migrate to the `FuseConfig` builder API"]
start_message_loop<F: FileSystem + Sync>( dev_fuse: File, input_buffer_size: u32, output_buffer_size: u32, fs: F, ) -> Result<()>170 pub fn start_message_loop<F: FileSystem + Sync>(
171     dev_fuse: File,
172     input_buffer_size: u32,
173     output_buffer_size: u32,
174     fs: F,
175 ) -> Result<()> {
176     let server = Server::new(fs);
177     do_start_message_loop(dev_fuse, input_buffer_size, output_buffer_size, &server)
178 }
179 
do_start_message_loop<F: FileSystem + Sync>( dev_fuse: File, input_buffer_size: u32, output_buffer_size: u32, server: &Server<F>, ) -> Result<()>180 fn do_start_message_loop<F: FileSystem + Sync>(
181     dev_fuse: File,
182     input_buffer_size: u32,
183     output_buffer_size: u32,
184     server: &Server<F>,
185 ) -> Result<()> {
186     let mut dev_fuse_reader = {
187         let rfile = dev_fuse.try_clone().map_err(Error::EndpointSetup)?;
188         let buf_reader = BufReader::with_capacity(
189             input_buffer_size as usize + size_of::<sys::InHeader>() + size_of::<sys::WriteIn>(),
190             rfile,
191         );
192         DevFuseReader::new(buf_reader)
193     };
194     let mut dev_fuse_writer = {
195         let wfile = dev_fuse;
196         let write_buf = Cursor::new(Vec::with_capacity(output_buffer_size as usize));
197         DevFuseWriter::new(wfile, write_buf)
198     };
199     let dev_fuse_mapper = DevFuseMapper::new();
200     loop {
201         server.handle_message(&mut dev_fuse_reader, &mut dev_fuse_writer, &dev_fuse_mapper)?;
202 
203         // Since we're reusing the buffer to avoid repeated allocation, drain the possible
204         // residual from the buffer.
205         dev_fuse_reader.drain();
206     }
207 }
208 
209 // TODO: Remove worker and this namespace from public
210 pub mod internal {
211     use crossbeam_utils::thread;
212 
213     use super::*;
214 
215     /// Start the FUSE message handling loops in multiple threads. Returns when an error happens.
216     ///
217     /// # Arguments
218     ///
219     /// * `dev_fuse` - A `File` object of /dev/fuse
220     /// * `input_buffer_size` - Maximum bytes of the buffer when reads from /dev/fuse.
221     /// * `output_buffer_size` - Maximum bytes of the buffer when writes to /dev/fuse.
222     ///
223     /// [deprecated(note="Please migrate to the `FuseConfig` builder API"]
start_message_loop_mt<F: FileSystem + Sync + Send>( dev_fuse: File, input_buffer_size: u32, output_buffer_size: u32, thread_numbers: usize, fs: F, ) -> Result<()>224     pub fn start_message_loop_mt<F: FileSystem + Sync + Send>(
225         dev_fuse: File,
226         input_buffer_size: u32,
227         output_buffer_size: u32,
228         thread_numbers: usize,
229         fs: F,
230     ) -> Result<()> {
231         let result = thread::scope(|s| {
232             let server = Arc::new(Server::new(fs));
233             for _ in 0..thread_numbers {
234                 let dev_fuse = dev_fuse
235                     .try_clone()
236                     .map_err(Error::EndpointSetup)
237                     .expect("Failed to clone /dev/fuse FD");
238                 let server = server.clone();
239                 s.spawn(move |_| {
240                     do_start_message_loop(dev_fuse, input_buffer_size, output_buffer_size, &server)
241                 });
242             }
243         });
244 
245         unreachable!("Threads exited or crashed unexpectedly: {:?}", result);
246     }
247 }
248