1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include <cstring>
16 #include <map>
17 #include <string>
18 #include <string_view>
19 #include <type_traits>
20
21 #include "pw_allocator/libc_allocator.h"
22 #include "pw_assert/check.h"
23 #include "pw_bytes/byte_builder.h"
24 #include "pw_bytes/span.h"
25 #include "pw_checksum/crc32.h"
26 #include "pw_grpc/connection.h"
27 #include "pw_grpc/examples/echo/echo.rpc.pwpb.h"
28 #include "pw_grpc/grpc_channel_output.h"
29 #include "pw_grpc/pw_rpc_handler.h"
30 #include "pw_log/log.h"
31 #include "pw_result/result.h"
32 #include "pw_rpc/internal/hash.h"
33 #include "pw_rpc/internal/packet.h"
34 #include "pw_rpc_transport/service_registry.h"
35 #include "pw_span/span.h"
36 #include "pw_status/status.h"
37 #include "pw_status/try.h"
38 #include "pw_stream/socket_stream.h"
39 #include "pw_stream/stream.h"
40 #include "pw_string/string.h"
41 #include "pw_thread/test_thread_context.h"
42 #include "pw_thread/thread.h"
43
44 using pw::grpc::StreamId;
45
46 namespace {
47 static constexpr size_t kBufferSize = 512;
48
49 class EchoService
50 : public ::grpc::examples::echo::pw_rpc::pwpb::Echo::Service<EchoService> {
51 public:
UnaryEcho(pw::ConstByteSpan request,pw::rpc::RawUnaryResponder & responder)52 void UnaryEcho(pw::ConstByteSpan request,
53 pw::rpc::RawUnaryResponder& responder) {
54 auto message =
55 ::grpc::examples::echo::pwpb::EchoRequest::FindMessage(request);
56 if (!message.ok()) {
57 responder.Finish({}, message.status()).IgnoreError();
58 }
59
60 if (message->size() < 100) {
61 PW_LOG_INFO("UnaryEcho %s", message->data());
62 } else {
63 PW_LOG_INFO("UnaryEcho (len=%zu)", message->size());
64 }
65
66 quiet_ = message->compare("quiet") == 0;
67 last_unary_responder_ = std::move(responder);
68 if (quiet_) {
69 return;
70 }
71
72 std::array<std::byte, kBufferSize> mem_writer_buffer_;
73 std::array<std::byte, kBufferSize> encoder_scratch_buffer_;
74 pw::stream::MemoryWriter writer(mem_writer_buffer_);
75 ::grpc::examples::echo::pwpb::EchoResponse::StreamEncoder encoder(
76 writer, encoder_scratch_buffer_);
77
78 auto checksum = message->rfind("crc32:", 0) == 0;
79 if (checksum) {
80 uint32_t crc32 = pw::checksum::Crc32::Calculate(
81 pw::span(reinterpret_cast<const std::byte*>(message->data()),
82 message->size()));
83 encoder.Write({.message = std::string_view(std::to_string(crc32))})
84 .IgnoreError();
85 } else {
86 encoder.Write({.message = *message}).IgnoreError();
87 }
88
89 last_unary_responder_.Finish(writer.WrittenData(), pw::OkStatus())
90 .IgnoreError();
91 }
92
ServerStreamingEcho(const::grpc::examples::echo::pwpb::EchoRequest::Message & request,ServerWriter<::grpc::examples::echo::pwpb::EchoResponse::Message> & writer)93 void ServerStreamingEcho(
94 const ::grpc::examples::echo::pwpb::EchoRequest::Message& request,
95 ServerWriter<::grpc::examples::echo::pwpb::EchoResponse::Message>&
96 writer) {
97 PW_LOG_INFO("ServerStreamingEcho %s", request.message.c_str());
98 quiet_ = request.message.compare("quiet") == 0;
99 last_writer_ = std::move(writer);
100 if (quiet_) {
101 PW_LOG_INFO("not writing server streaming echo");
102 return;
103 }
104 for (size_t i = 0; i < 3; ++i) {
105 last_writer_.Write({.message = request.message}).IgnoreError();
106 }
107 last_writer_.Finish(pw::OkStatus()).IgnoreError();
108 }
109
ClientStreamingEcho(ServerReader<::grpc::examples::echo::pwpb::EchoRequest::Message,::grpc::examples::echo::pwpb::EchoResponse::Message> & reader)110 void ClientStreamingEcho(
111 ServerReader<::grpc::examples::echo::pwpb::EchoRequest::Message,
112 ::grpc::examples::echo::pwpb::EchoResponse::Message>&
113 reader) {
114 PW_LOG_INFO("ClientStreamingEcho");
115 last_reader_ = std::move(reader);
116 last_reader_.set_on_next(
117 [this](
118 const ::grpc::examples::echo::pwpb::EchoRequest::Message& request) {
119 quiet_ = request.message.compare("quiet") == 0;
120 PW_LOG_INFO("ClientStreaming message %s", request.message.c_str());
121 });
122
123 last_reader_.set_on_completion_requested([this]() {
124 if (quiet_) {
125 return;
126 }
127 last_reader_.Finish({.message = "done"}).IgnoreError();
128 });
129 }
130
BidirectionalStreamingEcho(ServerReaderWriter<::grpc::examples::echo::pwpb::EchoRequest::Message,::grpc::examples::echo::pwpb::EchoResponse::Message> & reader_writer)131 void BidirectionalStreamingEcho(
132 ServerReaderWriter<::grpc::examples::echo::pwpb::EchoRequest::Message,
133 ::grpc::examples::echo::pwpb::EchoResponse::Message>&
134 reader_writer) {
135 PW_LOG_INFO("BidirectionalStreamingEcho");
136 last_reader_writer_ = std::move(reader_writer);
137 last_reader_writer_.set_on_completion_requested([this]() {
138 if (quiet_) {
139 return;
140 }
141 last_reader_writer_.Finish(pw::OkStatus()).IgnoreError();
142 });
143 last_reader_writer_.set_on_next(
144 [this](
145 const ::grpc::examples::echo::pwpb::EchoRequest::Message& request) {
146 PW_LOG_INFO("BidiStreaming message %s", request.message.c_str());
147 quiet_ = request.message.compare("quiet") == 0;
148 if (quiet_) {
149 return;
150 }
151 last_reader_writer_.Write({.message = request.message}).IgnoreError();
152 });
153 }
154
155 private:
156 pw::rpc::RawUnaryResponder last_unary_responder_{};
157 ServerWriter<::grpc::examples::echo::pwpb::EchoResponse::Message>
158 last_writer_{};
159 ServerReader<::grpc::examples::echo::pwpb::EchoRequest::Message,
160 ::grpc::examples::echo::pwpb::EchoResponse::Message>
161 last_reader_{};
162 ServerReaderWriter<::grpc::examples::echo::pwpb::EchoRequest::Message,
163 ::grpc::examples::echo::pwpb::EchoResponse::Message>
164 last_reader_writer_{};
165 bool quiet_ = false;
166 };
167
168 constexpr uint32_t kTestChannelId = 1;
169
170 } // namespace
171
main(int argc,char * argv[])172 int main(int argc, char* argv[]) {
173 std::vector<std::string> args(argv, argv + argc);
174 int port = 3400;
175 int num_connections = 1;
176
177 if (args.size() > 1) {
178 if (args[1] == "--help") {
179 PW_LOG_INFO("Usage: [port=3400] [num_connections=1]");
180 PW_LOG_INFO(
181 " num_connections positional arg sets how many socket connections "
182 "should be processed before exit");
183 exit(0);
184 }
185 port = stoi(args[1]);
186 }
187
188 if (args.size() > 2) {
189 num_connections = stoi(args[2]);
190 }
191
192 std::setbuf(stdout, nullptr); // unbuffered stdout
193
194 pw::stream::ServerSocket server;
195 pw::grpc::GrpcChannelOutput rpc_egress;
196 std::array<pw::rpc::Channel, 1> tx_channels(
197 {pw::rpc::Channel::Create<kTestChannelId>(&rpc_egress)});
198 pw::rpc::ServiceRegistry service_registry(tx_channels);
199
200 EchoService echo_service;
201 service_registry.RegisterService(echo_service);
202
203 pw::grpc::PwRpcHandler handler(kTestChannelId,
204 service_registry.client_server().server());
205 rpc_egress.set_callbacks(handler);
206
207 PW_LOG_INFO("Main.Listen on port=%d", port);
208 if (auto status = server.Listen(port); !status.ok()) {
209 PW_LOG_ERROR("Main.Listen failed code=%d", status.code());
210 return 1;
211 }
212
213 for (int i = 0; i < num_connections; ++i) {
214 PW_LOG_INFO("Main.Accept");
215 auto socket = server.Accept();
216 if (!socket.ok()) {
217 PW_LOG_ERROR("Main.Accept failed code=%d", socket.status().code());
218 return 1;
219 }
220
221 PW_LOG_INFO("Main.Run");
222
223 pw::allocator::LibCAllocator message_assembly_allocator;
224 pw::thread::test::TestThreadContext connection_thread_context;
225 pw::thread::test::TestThreadContext send_thread_context;
226 pw::grpc::ConnectionThread conn(
227 *socket,
228 send_thread_context.options(),
229 handler,
230 [&socket]() { socket->Close(); },
231 &message_assembly_allocator);
232 rpc_egress.set_connection(conn);
233
234 pw::Thread conn_thread(connection_thread_context.options(), conn);
235 conn_thread.join();
236 }
237
238 PW_LOG_INFO("Main.Run completed");
239 return 0;
240 }
241