1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::fs::File;
6 use std::io;
7 use std::io::BufRead;
8 use std::io::BufReader;
9 use std::io::Cursor;
10 use std::io::Read;
11 use std::io::Write;
12 use std::mem::size_of;
13 use std::os::unix::fs::FileExt;
14 use std::os::unix::io::AsRawFd;
15 use std::sync::Arc;
16
17 use base::Protection;
18
19 use crate::filesystem::FileSystem;
20 use crate::filesystem::ZeroCopyReader;
21 use crate::filesystem::ZeroCopyWriter;
22 use crate::server::Mapper;
23 use crate::server::Reader;
24 use crate::server::Server;
25 use crate::server::Writer;
26 use crate::sys;
27 use crate::Error;
28 use crate::Result;
29
30 struct DevFuseReader {
31 // File representing /dev/fuse for reading, with sufficient buffer to accommodate a FUSE read
32 // transaction.
33 reader: BufReader<File>,
34 }
35
36 impl DevFuseReader {
new(reader: BufReader<File>) -> Self37 pub fn new(reader: BufReader<File>) -> Self {
38 DevFuseReader { reader }
39 }
40
drain(&mut self)41 fn drain(&mut self) {
42 self.reader.consume(self.reader.buffer().len());
43 }
44 }
45
46 impl Read for DevFuseReader {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>47 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
48 self.reader.read(buf)
49 }
50 }
51
52 impl Reader for DevFuseReader {}
53
54 impl ZeroCopyReader for DevFuseReader {
read_to(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize>55 fn read_to(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize> {
56 let buf = self.reader.fill_buf()?;
57 let end = std::cmp::min(count, buf.len());
58 let written = f.write_at(&buf[..end], off)?;
59 self.reader.consume(written);
60 Ok(written)
61 }
62 }
63
64 struct DevFuseWriter {
65 // File representing /dev/fuse for writing.
66 dev_fuse: File,
67
68 // An internal buffer to allow generating data and header out of order, such that they can be
69 // flushed at once. This is wrapped by a cursor for tracking the current written position.
70 write_buf: Cursor<Vec<u8>>,
71 }
72
73 impl DevFuseWriter {
new(dev_fuse: File, write_buf: Cursor<Vec<u8>>) -> Self74 pub fn new(dev_fuse: File, write_buf: Cursor<Vec<u8>>) -> Self {
75 debug_assert_eq!(write_buf.position(), 0);
76
77 DevFuseWriter {
78 dev_fuse,
79 write_buf,
80 }
81 }
82 }
83
84 impl Write for DevFuseWriter {
write(&mut self, buf: &[u8]) -> io::Result<usize>85 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
86 self.write_buf.write(buf)
87 }
88
flush(&mut self) -> io::Result<()>89 fn flush(&mut self) -> io::Result<()> {
90 self.dev_fuse.write_all(&self.write_buf.get_ref()[..])?;
91 self.write_buf.set_position(0);
92 self.write_buf.get_mut().clear();
93 Ok(())
94 }
95 }
96
97 impl Writer for DevFuseWriter {
98 type ClosureWriter = Self;
99
write_at<F>(&mut self, offset: usize, f: F) -> io::Result<usize> where F: Fn(&mut Self) -> io::Result<usize>,100 fn write_at<F>(&mut self, offset: usize, f: F) -> io::Result<usize>
101 where
102 F: Fn(&mut Self) -> io::Result<usize>,
103 {
104 // Restore the cursor for idempotent.
105 let original = self.write_buf.position();
106 self.write_buf.set_position(offset as u64);
107 let r = f(self);
108 self.write_buf.set_position(original);
109 r
110 }
111
has_sufficient_buffer(&self, size: u32) -> bool112 fn has_sufficient_buffer(&self, size: u32) -> bool {
113 (self.write_buf.position() as usize + size as usize) < self.write_buf.get_ref().capacity()
114 }
115 }
116
117 impl ZeroCopyWriter for DevFuseWriter {
write_from(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize>118 fn write_from(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize> {
119 let pos = self.write_buf.position() as usize;
120 let end = pos + count;
121 let buf = self.write_buf.get_mut();
122
123 let old_end = buf.len();
124 buf.resize(end, 0);
125 let read = f.read_at(&mut buf[pos..end], off)?;
126
127 let new_end = pos + read;
128 debug_assert!(new_end >= old_end);
129 buf.truncate(new_end);
130 self.write_buf.set_position(new_end as u64);
131 Ok(read)
132 }
133 }
134
135 struct DevFuseMapper;
136
137 impl DevFuseMapper {
new() -> Self138 fn new() -> Self {
139 Self {}
140 }
141 }
142
143 impl Mapper for DevFuseMapper {
map( &self, _mem_offset: u64, _size: usize, _fd: &dyn AsRawFd, _file_offset: u64, _prot: Protection, ) -> io::Result<()>144 fn map(
145 &self,
146 _mem_offset: u64,
147 _size: usize,
148 _fd: &dyn AsRawFd,
149 _file_offset: u64,
150 _prot: Protection,
151 ) -> io::Result<()> {
152 Err(io::Error::from_raw_os_error(libc::EOPNOTSUPP))
153 }
154
unmap(&self, _offset: u64, _size: u64) -> io::Result<()>155 fn unmap(&self, _offset: u64, _size: u64) -> io::Result<()> {
156 Err(io::Error::from_raw_os_error(libc::EOPNOTSUPP))
157 }
158 }
159
160 /// Start the FUSE message handling loop. Returns when an error happens.
161 ///
162 /// # Arguments
163 ///
164 /// * `dev_fuse` - A `File` object of /dev/fuse
165 /// * `input_buffer_size` - Maximum bytes of the buffer when reads from /dev/fuse.
166 /// * `output_buffer_size` - Maximum bytes of the buffer when writes to /dev/fuse. Must be large
167 /// enough (usually equal) to `n` in `MountOption::MaxRead(n)`.
168 ///
169 /// [deprecated(note="Please migrate to the `FuseConfig` builder API"]
start_message_loop<F: FileSystem + Sync>( dev_fuse: File, input_buffer_size: u32, output_buffer_size: u32, fs: F, ) -> Result<()>170 pub fn start_message_loop<F: FileSystem + Sync>(
171 dev_fuse: File,
172 input_buffer_size: u32,
173 output_buffer_size: u32,
174 fs: F,
175 ) -> Result<()> {
176 let server = Server::new(fs);
177 do_start_message_loop(dev_fuse, input_buffer_size, output_buffer_size, &server)
178 }
179
do_start_message_loop<F: FileSystem + Sync>( dev_fuse: File, input_buffer_size: u32, output_buffer_size: u32, server: &Server<F>, ) -> Result<()>180 fn do_start_message_loop<F: FileSystem + Sync>(
181 dev_fuse: File,
182 input_buffer_size: u32,
183 output_buffer_size: u32,
184 server: &Server<F>,
185 ) -> Result<()> {
186 let mut dev_fuse_reader = {
187 let rfile = dev_fuse.try_clone().map_err(Error::EndpointSetup)?;
188 let buf_reader = BufReader::with_capacity(
189 input_buffer_size as usize + size_of::<sys::InHeader>() + size_of::<sys::WriteIn>(),
190 rfile,
191 );
192 DevFuseReader::new(buf_reader)
193 };
194 let mut dev_fuse_writer = {
195 let wfile = dev_fuse;
196 let write_buf = Cursor::new(Vec::with_capacity(output_buffer_size as usize));
197 DevFuseWriter::new(wfile, write_buf)
198 };
199 let dev_fuse_mapper = DevFuseMapper::new();
200 loop {
201 server.handle_message(&mut dev_fuse_reader, &mut dev_fuse_writer, &dev_fuse_mapper)?;
202
203 // Since we're reusing the buffer to avoid repeated allocation, drain the possible
204 // residual from the buffer.
205 dev_fuse_reader.drain();
206 }
207 }
208
209 // TODO: Remove worker and this namespace from public
210 pub mod internal {
211 use crossbeam_utils::thread;
212
213 use super::*;
214
215 /// Start the FUSE message handling loops in multiple threads. Returns when an error happens.
216 ///
217 /// # Arguments
218 ///
219 /// * `dev_fuse` - A `File` object of /dev/fuse
220 /// * `input_buffer_size` - Maximum bytes of the buffer when reads from /dev/fuse.
221 /// * `output_buffer_size` - Maximum bytes of the buffer when writes to /dev/fuse.
222 ///
223 /// [deprecated(note="Please migrate to the `FuseConfig` builder API"]
start_message_loop_mt<F: FileSystem + Sync + Send>( dev_fuse: File, input_buffer_size: u32, output_buffer_size: u32, thread_numbers: usize, fs: F, ) -> Result<()>224 pub fn start_message_loop_mt<F: FileSystem + Sync + Send>(
225 dev_fuse: File,
226 input_buffer_size: u32,
227 output_buffer_size: u32,
228 thread_numbers: usize,
229 fs: F,
230 ) -> Result<()> {
231 let result = thread::scope(|s| {
232 let server = Arc::new(Server::new(fs));
233 for _ in 0..thread_numbers {
234 let dev_fuse = dev_fuse
235 .try_clone()
236 .map_err(Error::EndpointSetup)
237 .expect("Failed to clone /dev/fuse FD");
238 let server = server.clone();
239 s.spawn(move |_| {
240 do_start_message_loop(dev_fuse, input_buffer_size, output_buffer_size, &server)
241 });
242 }
243 });
244
245 unreachable!("Threads exited or crashed unexpectedly: {:?}", result);
246 }
247 }
248