xref: /aosp_15_r20/external/pigweed/pw_transfer/integration_test/server.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 // Simple RPC server with the transfer service registered. Reads HDLC frames
16 // with RPC packets through a socket. This server has a single resource ID that
17 // is available, and data must be written to the server before data can be read
18 // from the resource ID.
19 //
20 // Usage:
21 //
22 //   integration_test_server 3300 <<< "resource_id: 12 file: '/tmp/gotbytes'"
23 
24 #include <sys/socket.h>
25 
26 #include <chrono>
27 #include <cstddef>
28 #include <cstdlib>
29 #include <deque>
30 #include <map>
31 #include <memory>
32 #include <string>
33 #include <thread>
34 #include <utility>
35 #include <variant>
36 #include <vector>
37 
38 #include "google/protobuf/text_format.h"
39 #include "pw_assert/check.h"
40 #include "pw_chrono/system_clock.h"
41 #include "pw_log/log.h"
42 #include "pw_rpc_system_server/rpc_server.h"
43 #include "pw_rpc_system_server/socket.h"
44 #include "pw_stream/std_file_stream.h"
45 #include "pw_thread/thread.h"
46 #include "pw_thread_stl/options.h"
47 #include "pw_transfer/integration_test/config.pb.h"
48 #include "pw_transfer/transfer.h"
49 
50 namespace pw::transfer {
51 namespace {
52 
53 using stream::MemoryReader;
54 using stream::MemoryWriter;
55 
56 // This is the maximum size of the socket send buffers. Ideally, this is set
57 // to the lowest allowed value to minimize buffering between the proxy and
58 // clients so rate limiting causes the client to block and wait for the
59 // integration test proxy to drain rather than allowing OS buffers to backlog
60 // large quantities of data.
61 //
62 // Note that the OS may chose to not strictly follow this requested buffer size.
63 // Still, setting this value to be as small as possible does reduce bufer sizes
64 // significantly enough to better reflect typical inter-device communication.
65 //
66 // For this to be effective, servers should also configure their sockets to a
67 // smaller receive buffer size.
68 constexpr int kMaxSocketSendBufferSize = 1;
69 
70 class FileTransferHandler final : public ReadWriteHandler {
71  public:
FileTransferHandler(uint32_t resource_id,std::deque<std::string> && sources,std::deque<std::string> && destinations,std::string default_source_path,std::string default_destination_path,bool offsettable)72   FileTransferHandler(uint32_t resource_id,
73                       std::deque<std::string>&& sources,
74                       std::deque<std::string>&& destinations,
75                       std::string default_source_path,
76                       std::string default_destination_path,
77                       bool offsettable)
78       : ReadWriteHandler(resource_id),
79         sources_(sources),
80         destinations_(destinations),
81         default_source_path_(default_source_path),
82         default_destination_path_(default_destination_path),
83         offsettable(offsettable) {}
84 
85   ~FileTransferHandler() = default;
86 
PrepareRead()87   Status PrepareRead() final {
88     if (sources_.empty() && default_source_path_.length() == 0) {
89       PW_LOG_ERROR("Source paths exhausted");
90       return Status::ResourceExhausted();
91     }
92 
93     std::string path;
94     if (!sources_.empty()) {
95       path = sources_.front();
96       sources_.pop_front();
97     } else {
98       path = default_source_path_;
99     }
100 
101     PW_LOG_DEBUG("Preparing read for file %s", path.c_str());
102     set_reader(stream_.emplace<stream::StdFileReader>(path.c_str()));
103     return OkStatus();
104   }
105 
PrepareRead(uint32_t offset)106   Status PrepareRead(uint32_t offset) final {
107     if (!offsettable) {
108       return Status::Unimplemented();
109     }
110 
111     if (Status status = PrepareRead(); !status.ok()) {
112       return status;
113     }
114 
115     if (offset >
116         std::get<stream::StdFileReader>(stream_).ConservativeReadLimit()) {
117       return Status::ResourceExhausted();
118     }
119 
120     return std::get<stream::StdFileReader>(stream_).Seek(offset);
121   }
122 
FinalizeRead(Status)123   void FinalizeRead(Status) final {
124     std::get<stream::StdFileReader>(stream_).Close();
125   }
126 
PrepareWrite()127   Status PrepareWrite() final {
128     if (destinations_.empty() && default_destination_path_.length() == 0) {
129       PW_LOG_ERROR("Destination paths exhausted");
130       return Status::ResourceExhausted();
131     }
132 
133     std::string path;
134     if (!destinations_.empty()) {
135       path = destinations_.front();
136       destinations_.pop_front();
137     } else {
138       path = default_destination_path_;
139     }
140 
141     PW_LOG_DEBUG("Preparing write for file %s", path.c_str());
142     set_writer(stream_.emplace<stream::StdFileWriter>(path.c_str()));
143     return OkStatus();
144   }
145 
PrepareWrite(uint32_t offset)146   Status PrepareWrite(uint32_t offset) final {
147     if (!offsettable) {
148       return Status::Unimplemented();
149     }
150 
151     if (Status status = PrepareWrite(); !status.ok()) {
152       return status;
153     }
154 
155     // It does not appear possible to hit this limit
156     if (offset >
157         std::get<stream::StdFileWriter>(stream_).ConservativeWriteLimit()) {
158       return Status::ResourceExhausted();
159     }
160 
161     return std::get<stream::StdFileWriter>(stream_).Seek(offset);
162   }
163 
FinalizeWrite(Status)164   Status FinalizeWrite(Status) final {
165     std::get<stream::StdFileWriter>(stream_).Close();
166     return OkStatus();
167   }
168 
169  private:
170   std::deque<std::string> sources_;
171   std::deque<std::string> destinations_;
172   std::string default_source_path_;
173   std::string default_destination_path_;
174   std::variant<std::monostate, stream::StdFileReader, stream::StdFileWriter>
175       stream_;
176   bool offsettable;
177 };
178 
RunServer(int socket_port,ServerConfig config)179 void RunServer(int socket_port, ServerConfig config) {
180   std::vector<std::byte> chunk_buffer(config.chunk_size_bytes());
181   std::vector<std::byte> encode_buffer(config.chunk_size_bytes());
182   transfer::Thread<4, 4> transfer_thread(chunk_buffer, encode_buffer);
183   TransferService transfer_service(
184       transfer_thread,
185       config.pending_bytes(),
186       std::chrono::seconds(config.chunk_timeout_seconds()),
187       config.transfer_service_retries(),
188       config.extend_window_divisor());
189 
190   rpc::system_server::set_socket_port(socket_port);
191 
192   rpc::system_server::Init();
193   rpc::system_server::Server().RegisterService(transfer_service);
194 
195   // Start transfer thread.
196   pw::Thread transfer_thread_handle(thread::stl::Options(), transfer_thread);
197 
198   int retval =
199       rpc::system_server::SetServerSockOpt(SOL_SOCKET,
200                                            SO_SNDBUF,
201                                            &kMaxSocketSendBufferSize,
202                                            sizeof(kMaxSocketSendBufferSize));
203   PW_CHECK_INT_EQ(retval,
204                   0,
205                   "Failed to configure socket send buffer size with errno=%d",
206                   errno);
207 
208   std::vector<std::unique_ptr<FileTransferHandler>> handlers;
209   for (const auto& resource : config.resources()) {
210     uint32_t id = resource.first;
211 
212     std::deque<std::string> source_paths(resource.second.source_paths().begin(),
213                                          resource.second.source_paths().end());
214     std::deque<std::string> destination_paths(
215         resource.second.destination_paths().begin(),
216         resource.second.destination_paths().end());
217 
218     auto handler = std::make_unique<FileTransferHandler>(
219         id,
220         std::move(source_paths),
221         std::move(destination_paths),
222         resource.second.default_source_path(),
223         resource.second.default_destination_path(),
224         resource.second.offsettable());
225 
226     transfer_service.RegisterHandler(*handler);
227     handlers.push_back(std::move(handler));
228   }
229 
230   PW_LOG_INFO("Starting pw_rpc server");
231   PW_CHECK_OK(rpc::system_server::Start());
232 
233   // Unregister transfer handler before cleaning up the thread since doing so
234   // requires the transfer thread to be running.
235   for (auto& handler : handlers) {
236     transfer_service.UnregisterHandler(*handler);
237   }
238 
239   // End transfer thread.
240   transfer_thread.Terminate();
241   transfer_thread_handle.join();
242 }
243 
244 }  // namespace
245 }  // namespace pw::transfer
246 
main(int argc,char * argv[])247 int main(int argc, char* argv[]) {
248   if (argc != 2) {
249     PW_LOG_INFO("Usage: %s PORT <<< config textproto", argv[0]);
250     return 1;
251   }
252 
253   int port = std::atoi(argv[1]);
254   PW_CHECK_UINT_GT(port, 0, "Invalid port!");
255 
256   std::string config_string;
257   std::string line;
258   while (std::getline(std::cin, line)) {
259     config_string = config_string + line + '\n';
260   }
261   pw::transfer::ServerConfig config;
262 
263   bool ok =
264       google::protobuf::TextFormat::ParseFromString(config_string, &config);
265   if (!ok) {
266     PW_LOG_INFO("Failed to parse config: %s", config_string.c_str());
267     PW_LOG_INFO("Usage: %s PORT <<< config textproto", argv[0]);
268     return 1;
269   } else {
270     PW_LOG_INFO("Server loaded config:\n%s", config.DebugString().c_str());
271   }
272 
273   pw::transfer::RunServer(port, config);
274   return 0;
275 }
276