1 // Copyright 2024 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 //! Launcher of forwarder_guest
16
17 use anyhow::{anyhow, Context};
18 use clap::Parser;
19 use csv_async::AsyncReader;
20 use debian_service::debian_service_client::DebianServiceClient;
21 use debian_service::{QueueOpeningRequest, ReportVmActivePortsRequest};
22 use futures::stream::StreamExt;
23 use log::{debug, error};
24 use serde::Deserialize;
25 use std::collections::HashSet;
26 use std::process::Stdio;
27 use tokio::io::BufReader;
28 use tokio::process::Command;
29 use tokio::try_join;
30 use tonic::transport::{Channel, Endpoint};
31 use tonic::Request;
32
33 mod debian_service {
34 tonic::include_proto!("com.android.virtualization.terminal.proto");
35 }
36
37 const NON_PREVILEGED_PORT_RANGE_START: i32 = 1024;
38 const TTYD_PORT: i32 = 7681;
39 const TCPSTATES_IP_4: i8 = 4;
40 const TCPSTATES_STATE_CLOSE: &str = "CLOSE";
41 const TCPSTATES_STATE_LISTEN: &str = "LISTEN";
42
43 #[derive(Debug, Deserialize)]
44 #[serde(rename_all = "UPPERCASE")]
45 struct TcpStateRow {
46 ip: i8,
47 lport: i32,
48 rport: i32,
49 newstate: String,
50 }
51
52 #[derive(Parser)]
53 /// Flags for running command
54 pub struct Args {
55 /// Host IP address
56 #[arg(long)]
57 #[arg(alias = "host")]
58 host_addr: String,
59 /// grpc port number
60 #[arg(long)]
61 #[arg(alias = "grpc_port")]
62 grpc_port: String,
63 }
64
process_forwarding_request_queue( mut client: DebianServiceClient<Channel>, ) -> Result<(), Box<dyn std::error::Error>>65 async fn process_forwarding_request_queue(
66 mut client: DebianServiceClient<Channel>,
67 ) -> Result<(), Box<dyn std::error::Error>> {
68 let cid = vsock::get_local_cid().context("Failed to get CID of VM")?;
69 let mut res_stream = client
70 .open_forwarding_request_queue(Request::new(QueueOpeningRequest { cid: cid as i32 }))
71 .await?
72 .into_inner();
73
74 while let Some(response) = res_stream.message().await? {
75 let tcp_port = i16::try_from(response.guest_tcp_port)
76 .context("Failed to convert guest_tcp_port as i16")?;
77 let vsock_port = response.vsock_port as u32;
78
79 debug!(
80 "executing forwarder_guest with guest_tcp_port: {:?}, vsock_port: {:?}",
81 &tcp_port, &vsock_port
82 );
83
84 let _ = Command::new("forwarder_guest")
85 .arg("--local")
86 .arg(format!("127.0.0.1:{}", tcp_port))
87 .arg("--remote")
88 .arg(format!("vsock:2:{}", vsock_port))
89 .spawn();
90 }
91 Err(anyhow!("process_forwarding_request_queue is terminated").into())
92 }
93
send_active_ports_report( listening_ports: HashSet<i32>, client: &mut DebianServiceClient<Channel>, ) -> Result<(), Box<dyn std::error::Error>>94 async fn send_active_ports_report(
95 listening_ports: HashSet<i32>,
96 client: &mut DebianServiceClient<Channel>,
97 ) -> Result<(), Box<dyn std::error::Error>> {
98 let res = client
99 .report_vm_active_ports(Request::new(ReportVmActivePortsRequest {
100 ports: listening_ports.into_iter().collect(),
101 }))
102 .await?
103 .into_inner();
104 if res.success {
105 debug!("Successfully reported active ports to the host");
106 } else {
107 error!("Failure response received from the host for reporting active ports");
108 }
109 Ok(())
110 }
111
is_forwardable_port(port: i32) -> bool112 fn is_forwardable_port(port: i32) -> bool {
113 port >= NON_PREVILEGED_PORT_RANGE_START && port != TTYD_PORT
114 }
115
report_active_ports( mut client: DebianServiceClient<Channel>, ) -> Result<(), Box<dyn std::error::Error>>116 async fn report_active_ports(
117 mut client: DebianServiceClient<Channel>,
118 ) -> Result<(), Box<dyn std::error::Error>> {
119 // TODO: we can remove python3 -u when https://github.com/iovisor/bcc/pull/5142 is deployed
120 let mut cmd = Command::new("python3")
121 .arg("-u")
122 .arg("/usr/sbin/tcpstates-bpfcc")
123 .arg("-s")
124 .stdout(Stdio::piped())
125 .spawn()?;
126 let stdout = cmd.stdout.take().context("Failed to get stdout of tcpstates")?;
127 let mut csv_reader = AsyncReader::from_reader(BufReader::new(stdout));
128 let header = csv_reader.headers().await?.clone();
129
130 // TODO(b/340126051): Consider using NETLINK_SOCK_DIAG for the optimization.
131 let listeners = listeners::get_all()?;
132 // TODO(b/340126051): Support distinguished port forwarding for ipv6 as well.
133 let mut listening_ports: HashSet<_> = listeners
134 .iter()
135 .map(|x| x.socket)
136 .filter(|x| x.is_ipv4())
137 .map(|x| x.port().into())
138 .filter(|x| is_forwardable_port(*x))
139 .collect();
140 send_active_ports_report(listening_ports.clone(), &mut client).await?;
141
142 let mut records = csv_reader.records();
143 while let Some(record) = records.next().await {
144 let row: TcpStateRow = record?.deserialize(Some(&header))?;
145 if row.ip != TCPSTATES_IP_4 {
146 continue;
147 }
148 if !is_forwardable_port(row.lport) {
149 continue;
150 }
151 if row.rport > 0 {
152 continue;
153 }
154 match row.newstate.as_str() {
155 TCPSTATES_STATE_LISTEN => {
156 listening_ports.insert(row.lport);
157 }
158 TCPSTATES_STATE_CLOSE => {
159 listening_ports.remove(&row.lport);
160 }
161 _ => continue,
162 }
163 send_active_ports_report(listening_ports.clone(), &mut client).await?;
164 }
165
166 Err(anyhow!("report_active_ports is terminated").into())
167 }
168
169 #[tokio::main]
main() -> Result<(), Box<dyn std::error::Error>>170 async fn main() -> Result<(), Box<dyn std::error::Error>> {
171 env_logger::init();
172 debug!("Starting forwarder_guest_launcher");
173 let args = Args::parse();
174 let addr = format!("https://{}:{}", args.host_addr, args.grpc_port);
175 let channel = Endpoint::from_shared(addr)?.connect().await?;
176 let client = DebianServiceClient::new(channel);
177
178 try_join!(process_forwarding_request_queue(client.clone()), report_active_ports(client))?;
179 Ok(())
180 }
181