xref: /aosp_15_r20/external/pigweed/pw_grpc/test_pw_rpc_server.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include <cstring>
16 #include <map>
17 #include <string>
18 #include <string_view>
19 #include <type_traits>
20 
21 #include "pw_allocator/libc_allocator.h"
22 #include "pw_assert/check.h"
23 #include "pw_bytes/byte_builder.h"
24 #include "pw_bytes/span.h"
25 #include "pw_checksum/crc32.h"
26 #include "pw_grpc/connection.h"
27 #include "pw_grpc/examples/echo/echo.rpc.pwpb.h"
28 #include "pw_grpc/grpc_channel_output.h"
29 #include "pw_grpc/pw_rpc_handler.h"
30 #include "pw_log/log.h"
31 #include "pw_result/result.h"
32 #include "pw_rpc/internal/hash.h"
33 #include "pw_rpc/internal/packet.h"
34 #include "pw_rpc_transport/service_registry.h"
35 #include "pw_span/span.h"
36 #include "pw_status/status.h"
37 #include "pw_status/try.h"
38 #include "pw_stream/socket_stream.h"
39 #include "pw_stream/stream.h"
40 #include "pw_string/string.h"
41 #include "pw_thread/test_thread_context.h"
42 #include "pw_thread/thread.h"
43 
44 using pw::grpc::StreamId;
45 
46 namespace {
47 static constexpr size_t kBufferSize = 512;
48 
49 class EchoService
50     : public ::grpc::examples::echo::pw_rpc::pwpb::Echo::Service<EchoService> {
51  public:
UnaryEcho(pw::ConstByteSpan request,pw::rpc::RawUnaryResponder & responder)52   void UnaryEcho(pw::ConstByteSpan request,
53                  pw::rpc::RawUnaryResponder& responder) {
54     auto message =
55         ::grpc::examples::echo::pwpb::EchoRequest::FindMessage(request);
56     if (!message.ok()) {
57       responder.Finish({}, message.status()).IgnoreError();
58     }
59 
60     if (message->size() < 100) {
61       PW_LOG_INFO("UnaryEcho %s", message->data());
62     } else {
63       PW_LOG_INFO("UnaryEcho (len=%zu)", message->size());
64     }
65 
66     quiet_ = message->compare("quiet") == 0;
67     last_unary_responder_ = std::move(responder);
68     if (quiet_) {
69       return;
70     }
71 
72     std::array<std::byte, kBufferSize> mem_writer_buffer_;
73     std::array<std::byte, kBufferSize> encoder_scratch_buffer_;
74     pw::stream::MemoryWriter writer(mem_writer_buffer_);
75     ::grpc::examples::echo::pwpb::EchoResponse::StreamEncoder encoder(
76         writer, encoder_scratch_buffer_);
77 
78     auto checksum = message->rfind("crc32:", 0) == 0;
79     if (checksum) {
80       uint32_t crc32 = pw::checksum::Crc32::Calculate(
81           pw::span(reinterpret_cast<const std::byte*>(message->data()),
82                    message->size()));
83       encoder.Write({.message = std::string_view(std::to_string(crc32))})
84           .IgnoreError();
85     } else {
86       encoder.Write({.message = *message}).IgnoreError();
87     }
88 
89     last_unary_responder_.Finish(writer.WrittenData(), pw::OkStatus())
90         .IgnoreError();
91   }
92 
ServerStreamingEcho(const::grpc::examples::echo::pwpb::EchoRequest::Message & request,ServerWriter<::grpc::examples::echo::pwpb::EchoResponse::Message> & writer)93   void ServerStreamingEcho(
94       const ::grpc::examples::echo::pwpb::EchoRequest::Message& request,
95       ServerWriter<::grpc::examples::echo::pwpb::EchoResponse::Message>&
96           writer) {
97     PW_LOG_INFO("ServerStreamingEcho %s", request.message.c_str());
98     quiet_ = request.message.compare("quiet") == 0;
99     last_writer_ = std::move(writer);
100     if (quiet_) {
101       PW_LOG_INFO("not writing server streaming echo");
102       return;
103     }
104     for (size_t i = 0; i < 3; ++i) {
105       last_writer_.Write({.message = request.message}).IgnoreError();
106     }
107     last_writer_.Finish(pw::OkStatus()).IgnoreError();
108   }
109 
ClientStreamingEcho(ServerReader<::grpc::examples::echo::pwpb::EchoRequest::Message,::grpc::examples::echo::pwpb::EchoResponse::Message> & reader)110   void ClientStreamingEcho(
111       ServerReader<::grpc::examples::echo::pwpb::EchoRequest::Message,
112                    ::grpc::examples::echo::pwpb::EchoResponse::Message>&
113           reader) {
114     PW_LOG_INFO("ClientStreamingEcho");
115     last_reader_ = std::move(reader);
116     last_reader_.set_on_next(
117         [this](
118             const ::grpc::examples::echo::pwpb::EchoRequest::Message& request) {
119           quiet_ = request.message.compare("quiet") == 0;
120           PW_LOG_INFO("ClientStreaming message %s", request.message.c_str());
121         });
122 
123     last_reader_.set_on_completion_requested([this]() {
124       if (quiet_) {
125         return;
126       }
127       last_reader_.Finish({.message = "done"}).IgnoreError();
128     });
129   }
130 
BidirectionalStreamingEcho(ServerReaderWriter<::grpc::examples::echo::pwpb::EchoRequest::Message,::grpc::examples::echo::pwpb::EchoResponse::Message> & reader_writer)131   void BidirectionalStreamingEcho(
132       ServerReaderWriter<::grpc::examples::echo::pwpb::EchoRequest::Message,
133                          ::grpc::examples::echo::pwpb::EchoResponse::Message>&
134           reader_writer) {
135     PW_LOG_INFO("BidirectionalStreamingEcho");
136     last_reader_writer_ = std::move(reader_writer);
137     last_reader_writer_.set_on_completion_requested([this]() {
138       if (quiet_) {
139         return;
140       }
141       last_reader_writer_.Finish(pw::OkStatus()).IgnoreError();
142     });
143     last_reader_writer_.set_on_next(
144         [this](
145             const ::grpc::examples::echo::pwpb::EchoRequest::Message& request) {
146           PW_LOG_INFO("BidiStreaming message %s", request.message.c_str());
147           quiet_ = request.message.compare("quiet") == 0;
148           if (quiet_) {
149             return;
150           }
151           last_reader_writer_.Write({.message = request.message}).IgnoreError();
152         });
153   }
154 
155  private:
156   pw::rpc::RawUnaryResponder last_unary_responder_{};
157   ServerWriter<::grpc::examples::echo::pwpb::EchoResponse::Message>
158       last_writer_{};
159   ServerReader<::grpc::examples::echo::pwpb::EchoRequest::Message,
160                ::grpc::examples::echo::pwpb::EchoResponse::Message>
161       last_reader_{};
162   ServerReaderWriter<::grpc::examples::echo::pwpb::EchoRequest::Message,
163                      ::grpc::examples::echo::pwpb::EchoResponse::Message>
164       last_reader_writer_{};
165   bool quiet_ = false;
166 };
167 
168 constexpr uint32_t kTestChannelId = 1;
169 
170 }  // namespace
171 
main(int argc,char * argv[])172 int main(int argc, char* argv[]) {
173   std::vector<std::string> args(argv, argv + argc);
174   int port = 3400;
175   int num_connections = 1;
176 
177   if (args.size() > 1) {
178     if (args[1] == "--help") {
179       PW_LOG_INFO("Usage: [port=3400] [num_connections=1]");
180       PW_LOG_INFO(
181           "  num_connections positional arg sets how many socket connections "
182           "should be processed before exit");
183       exit(0);
184     }
185     port = stoi(args[1]);
186   }
187 
188   if (args.size() > 2) {
189     num_connections = stoi(args[2]);
190   }
191 
192   std::setbuf(stdout, nullptr);  // unbuffered stdout
193 
194   pw::stream::ServerSocket server;
195   pw::grpc::GrpcChannelOutput rpc_egress;
196   std::array<pw::rpc::Channel, 1> tx_channels(
197       {pw::rpc::Channel::Create<kTestChannelId>(&rpc_egress)});
198   pw::rpc::ServiceRegistry service_registry(tx_channels);
199 
200   EchoService echo_service;
201   service_registry.RegisterService(echo_service);
202 
203   pw::grpc::PwRpcHandler handler(kTestChannelId,
204                                  service_registry.client_server().server());
205   rpc_egress.set_callbacks(handler);
206 
207   PW_LOG_INFO("Main.Listen on port=%d", port);
208   if (auto status = server.Listen(port); !status.ok()) {
209     PW_LOG_ERROR("Main.Listen failed code=%d", status.code());
210     return 1;
211   }
212 
213   for (int i = 0; i < num_connections; ++i) {
214     PW_LOG_INFO("Main.Accept");
215     auto socket = server.Accept();
216     if (!socket.ok()) {
217       PW_LOG_ERROR("Main.Accept failed code=%d", socket.status().code());
218       return 1;
219     }
220 
221     PW_LOG_INFO("Main.Run");
222 
223     pw::allocator::LibCAllocator message_assembly_allocator;
224     pw::thread::test::TestThreadContext connection_thread_context;
225     pw::thread::test::TestThreadContext send_thread_context;
226     pw::grpc::ConnectionThread conn(
227         *socket,
228         send_thread_context.options(),
229         handler,
230         [&socket]() { socket->Close(); },
231         &message_assembly_allocator);
232     rpc_egress.set_connection(conn);
233 
234     pw::Thread conn_thread(connection_thread_context.options(), conn);
235     conn_thread.join();
236   }
237 
238   PW_LOG_INFO("Main.Run completed");
239   return 0;
240 }
241