xref: /aosp_15_r20/external/pigweed/pw_rpc/pwpb/public/pw_rpc/pwpb/server_reader_writer.h (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 // This file defines the ServerReaderWriter, ServerReader, ServerWriter, and
16 // UnaryResponder classes for the pw_protobuf RPC interface. These classes are
17 // used for bidirectional, client, and server streaming, and unary RPCs.
18 #pragma once
19 
20 #include "pw_bytes/span.h"
21 #include "pw_function/function.h"
22 #include "pw_rpc/channel.h"
23 #include "pw_rpc/internal/lock.h"
24 #include "pw_rpc/internal/method_info.h"
25 #include "pw_rpc/internal/method_lookup.h"
26 #include "pw_rpc/internal/server_call.h"
27 #include "pw_rpc/method_type.h"
28 #include "pw_rpc/pwpb/internal/common.h"
29 #include "pw_rpc/server.h"
30 
31 namespace pw::rpc {
32 namespace internal {
33 
34 // Forward declarations for internal classes needed in friend statements.
35 namespace test {
36 template <typename, typename, uint32_t>
37 class InvocationContext;
38 }  // namespace test
39 
40 class PwpbMethod;
41 
42 // internal::PwpbServerCall extends internal::ServerCall by adding a method
43 // serializer/deserializer that is initialized based on the method context.
44 class PwpbServerCall : public ServerCall {
45  public:
46   // Allow construction using a call context and method type which creates
47   // a working server call.
48   PwpbServerCall(const LockedCallContext& context, MethodType type)
49       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
50 
~PwpbServerCall()51   ~PwpbServerCall() { DestroyServerCall(); }
52 
53   // Sends a unary response.
54   // Returns the following Status codes:
55   //
56   //   OK - the response was successfully sent
57   //   FAILED_PRECONDITION - the writer is closed
58   //   INTERNAL - pw_rpc was unable to encode the pw_protobuf protobuf
59   //   other errors - the ChannelOutput failed to send the packet; the error
60   //       codes are determined by the ChannelOutput implementation
61   //
62   template <typename Response>
63   Status SendUnaryResponse(const Response& response, Status status = OkStatus())
PW_LOCKS_EXCLUDED(rpc_lock ())64       PW_LOCKS_EXCLUDED(rpc_lock()) {
65     RpcLockGuard lock;
66     if (!active_locked()) {
67       return Status::FailedPrecondition();
68     }
69 
70     Result<ByteSpan> buffer =
71         EncodeToPayloadBuffer(response, serde_->response());
72     if (!buffer.ok()) {
73       return CloseAndSendServerErrorLocked(Status::Internal());
74     }
75 
76     return CloseAndSendResponseLocked(*buffer, status);
77   }
78 
79   template <typename Response>
80   Status TrySendUnaryResponse(const Response& response,
81                               Status status = OkStatus())
PW_LOCKS_EXCLUDED(rpc_lock ())82       PW_LOCKS_EXCLUDED(rpc_lock()) {
83     RpcLockGuard lock;
84     if (!active_locked()) {
85       return Status::FailedPrecondition();
86     }
87 
88     Result<ByteSpan> buffer =
89         EncodeToPayloadBuffer(response, serde_->response());
90     if (!buffer.ok()) {
91       return TryCloseAndSendServerErrorLocked(Status::Internal());
92     }
93 
94     return TryCloseAndSendResponseLocked(*buffer, status);
95   }
96 
97  protected:
98   // Derived classes allow default construction so that users can declare a
99   // variable into which to move server reader/writers from RPC calls.
PwpbServerCall()100   constexpr PwpbServerCall() : serde_(nullptr) {}
101 
102   // Give access to the serializer/deserializer object for converting requests
103   // and responses between the wire format and pw_protobuf structs.
serde()104   const PwpbMethodSerde& serde() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
105     return *serde_;
106   }
107 
108   // Allow derived classes to be constructed moving another instance.
PW_LOCKS_EXCLUDED(rpc_lock ())109   PwpbServerCall(PwpbServerCall&& other) PW_LOCKS_EXCLUDED(rpc_lock()) {
110     *this = std::move(other);
111   }
112 
113   // Allow derived classes to use move assignment from another instance.
114   PwpbServerCall& operator=(PwpbServerCall&& other)
PW_LOCKS_EXCLUDED(rpc_lock ())115       PW_LOCKS_EXCLUDED(rpc_lock()) {
116     RpcLockGuard lock;
117     MovePwpbServerCallFrom(other);
118     return *this;
119   }
120 
121   // Implement moving by copying the serde pointer.
MovePwpbServerCallFrom(PwpbServerCall & other)122   void MovePwpbServerCallFrom(PwpbServerCall& other)
123       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
124     MoveServerCallFrom(other);
125     serde_ = other.serde_;
126   }
127 
128   // Sends a streamed response.
129   // Returns the following Status codes:
130   //
131   //   OK - the response was successfully sent
132   //   FAILED_PRECONDITION - the writer is closed
133   //   INTERNAL - pw_rpc was unable to encode the pw_protobuf protobuf
134   //   other errors - the ChannelOutput failed to send the packet; the error
135   //       codes are determined by the ChannelOutput implementation
136   //
137   template <typename Response>
SendStreamResponse(const Response & response)138   Status SendStreamResponse(const Response& response)
139       PW_LOCKS_EXCLUDED(rpc_lock()) {
140     RpcLockGuard lock;
141     return PwpbSendStream(*this, response, serde_);
142   }
143 
144  private:
145   const PwpbMethodSerde* serde_ PW_GUARDED_BY(rpc_lock());
146 };
147 
148 // internal::BasePwpbServerReader extends internal::PwpbServerCall further by
149 // adding an on_next callback templated on the request type.
150 template <typename Request>
151 class BasePwpbServerReader : public PwpbServerCall {
152  public:
BasePwpbServerReader(const LockedCallContext & context,MethodType type)153   BasePwpbServerReader(const LockedCallContext& context, MethodType type)
154       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock())
155       : PwpbServerCall(context, type) {}
156 
~BasePwpbServerReader()157   ~BasePwpbServerReader() { DestroyServerCall(); }
158 
159  protected:
160   // Allow default construction so that users can declare a variable into
161   // which to move server reader/writers from RPC calls.
162   constexpr BasePwpbServerReader() = default;
163 
164   // Allow derived classes to be constructed moving another instance.
165   BasePwpbServerReader(BasePwpbServerReader&& other)
PW_LOCKS_EXCLUDED(rpc_lock ())166       PW_LOCKS_EXCLUDED(rpc_lock()) {
167     *this = std::move(other);
168   }
169 
170   // Allow derived classes to use move assignment from another instance.
171   BasePwpbServerReader& operator=(BasePwpbServerReader&& other)
PW_LOCKS_EXCLUDED(rpc_lock ())172       PW_LOCKS_EXCLUDED(rpc_lock()) {
173     RpcLockGuard lock;
174     MoveBasePwpbServerReaderFrom(other);
175     return *this;
176   }
177 
178   // Implement moving by copying the on_next function.
MoveBasePwpbServerReaderFrom(BasePwpbServerReader & other)179   void MoveBasePwpbServerReaderFrom(BasePwpbServerReader& other)
180       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
181     MovePwpbServerCallFrom(other);
182     set_pwpb_on_next_locked(std::move(other.pwpb_on_next_));
183   }
184 
set_on_next(Function<void (const Request & request)> && on_next)185   void set_on_next(Function<void(const Request& request)>&& on_next)
186       PW_LOCKS_EXCLUDED(rpc_lock()) {
187     RpcLockGuard lock;
188     set_pwpb_on_next_locked(std::move(on_next));
189   }
190 
191  private:
set_pwpb_on_next_locked(Function<void (const Request & request)> && on_next)192   void set_pwpb_on_next_locked(Function<void(const Request& request)>&& on_next)
193       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
194     pwpb_on_next_ = std::move(on_next);
195 
196     Call::set_on_next_locked(
197         [this](ConstByteSpan payload) PW_NO_LOCK_SAFETY_ANALYSIS {
198           DecodeToStructAndInvokeOnNext(
199               payload, serde().request(), pwpb_on_next_);
200         });
201   }
202 
203   Function<void(const Request&)> pwpb_on_next_ PW_GUARDED_BY(rpc_lock());
204 };
205 
206 }  // namespace internal
207 
208 // The PwpbServerReaderWriter is used to send and receive typed messages in a
209 // pw_protobuf bidirectional streaming RPC.
210 //
211 // These classes use private inheritance to hide the internal::Call API while
212 // allow direct use of its public and protected functions.
213 template <typename Request, typename Response>
214 class PwpbServerReaderWriter : private internal::BasePwpbServerReader<Request> {
215  public:
216   // Creates a PwpbServerReaderWriter that is ready to send responses for a
217   // particular RPC. This can be used for testing or to send responses to an RPC
218   // that has not been started by a client.
219   template <auto kMethod, typename ServiceImpl>
Open(Server & server,uint32_t channel_id,ServiceImpl & service)220   [[nodiscard]] static PwpbServerReaderWriter Open(Server& server,
221                                                    uint32_t channel_id,
222                                                    ServiceImpl& service)
223       PW_LOCKS_EXCLUDED(internal::rpc_lock()) {
224     using MethodInfo = internal::MethodInfo<kMethod>;
225     static_assert(std::is_same_v<Request, typename MethodInfo::Request>,
226                   "The request type of a PwpbServerReaderWriter must match "
227                   "the method.");
228     static_assert(std::is_same_v<Response, typename MethodInfo::Response>,
229                   "The response type of a PwpbServerReaderWriter must match "
230                   "the method.");
231     return server.OpenCall<PwpbServerReaderWriter,
232                            kMethod,
233                            MethodType::kBidirectionalStreaming>(
234         channel_id,
235         service,
236         internal::MethodLookup::GetPwpbMethod<ServiceImpl,
237                                               MethodInfo::kMethodId>());
238   }
239 
240   // Allow default construction so that users can declare a variable into
241   // which to move server reader/writers from RPC calls.
242   constexpr PwpbServerReaderWriter() = default;
243 
244   PwpbServerReaderWriter(PwpbServerReaderWriter&&) = default;
245   PwpbServerReaderWriter& operator=(PwpbServerReaderWriter&&) = default;
246 
247   using internal::Call::active;
248   using internal::Call::channel_id;
249 
250   // Functions for setting RPC event callbacks.
251   using internal::Call::set_on_error;
252   using internal::BasePwpbServerReader<Request>::set_on_next;
253   using internal::ServerCall::set_on_completion_requested;
254   using internal::ServerCall::set_on_completion_requested_if_enabled;
255 
256   // Writes a response. Returns the following Status codes:
257   //
258   //   OK - the response was successfully sent
259   //   FAILED_PRECONDITION - the writer is closed
260   //   INTERNAL - pw_rpc was unable to encode the pw_protobuf message
261   //   other errors - the ChannelOutput failed to send the packet; the error
262   //       codes are determined by the ChannelOutput implementation
263   //
Write(const Response & response)264   Status Write(const Response& response) {
265     return internal::PwpbServerCall::SendStreamResponse(response);
266   }
267 
268   Status Finish(Status status = OkStatus()) {
269     return internal::Call::CloseAndSendResponse(status);
270   }
271 
272   Status TryFinish(Status status = OkStatus()) {
273     return internal::Call::TryCloseAndSendResponse(status);
274   }
275 
276  private:
277   friend class internal::PwpbMethod;
278   friend class Server;
279 
280   template <typename, typename, uint32_t>
281   friend class internal::test::InvocationContext;
282 
283   PwpbServerReaderWriter(const internal::LockedCallContext& context,
284                          MethodType type = MethodType::kBidirectionalStreaming)
PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock ())285       PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock())
286       : internal::BasePwpbServerReader<Request>(context, type) {}
287 };
288 
289 // The PwpbServerReader is used to receive typed messages and send a typed
290 // response in a pw_protobuf client streaming RPC.
291 //
292 // These classes use private inheritance to hide the internal::Call API while
293 // allow direct use of its public and protected functions.
294 template <typename Request, typename Response>
295 class PwpbServerReader : private internal::BasePwpbServerReader<Request> {
296  public:
297   // Creates a PwpbServerReader that is ready to send a response to a particular
298   // RPC. This can be used for testing or to finish an RPC that has not been
299   // started by the client.
300   template <auto kMethod, typename ServiceImpl>
Open(Server & server,uint32_t channel_id,ServiceImpl & service)301   [[nodiscard]] static PwpbServerReader Open(Server& server,
302                                              uint32_t channel_id,
303                                              ServiceImpl& service)
304       PW_LOCKS_EXCLUDED(internal::rpc_lock()) {
305     using MethodInfo = internal::MethodInfo<kMethod>;
306     static_assert(std::is_same_v<Request, typename MethodInfo::Request>,
307                   "The request type of a PwpbServerReader must match "
308                   "the method.");
309     static_assert(std::is_same_v<Response, typename MethodInfo::Response>,
310                   "The response type of a PwpbServerReader must match "
311                   "the method.");
312     return server
313         .OpenCall<PwpbServerReader, kMethod, MethodType::kClientStreaming>(
314             channel_id,
315             service,
316             internal::MethodLookup::GetPwpbMethod<ServiceImpl,
317                                                   MethodInfo::kMethodId>());
318   }
319 
320   // Allow default construction so that users can declare a variable into
321   // which to move server reader/writers from RPC calls.
322   constexpr PwpbServerReader() = default;
323 
324   PwpbServerReader(PwpbServerReader&&) = default;
325   PwpbServerReader& operator=(PwpbServerReader&&) = default;
326 
327   using internal::Call::active;
328   using internal::Call::channel_id;
329 
330   // Functions for setting RPC event callbacks.
331   using internal::Call::set_on_error;
332   using internal::BasePwpbServerReader<Request>::set_on_next;
333   using internal::ServerCall::set_on_completion_requested;
334   using internal::ServerCall::set_on_completion_requested_if_enabled;
335 
336   // Sends the response. Returns the following Status codes:
337   //
338   //   OK - the response was successfully sent
339   //   FAILED_PRECONDITION - the writer is closed
340   //   INTERNAL - pw_rpc was unable to encode the pw_protobuf message
341   //   other errors - the ChannelOutput failed to send the packet; the error
342   //       codes are determined by the ChannelOutput implementation
343   //
344   Status Finish(const Response& response, Status status = OkStatus()) {
345     return internal::PwpbServerCall::SendUnaryResponse(response, status);
346   }
347 
348   Status TryFinish(const Response& response, Status status = OkStatus()) {
349     return internal::PwpbServerCall::TrySendUnaryResponse(response, status);
350   }
351 
352  private:
353   friend class internal::PwpbMethod;
354   friend class Server;
355 
356   template <typename, typename, uint32_t>
357   friend class internal::test::InvocationContext;
358 
359   PwpbServerReader(const internal::LockedCallContext& context)
PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock ())360       PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock())
361       : internal::BasePwpbServerReader<Request>(context,
362                                                 MethodType::kClientStreaming) {}
363 };
364 
365 // The PwpbServerWriter is used to send typed responses in a pw_protobuf server
366 // streaming RPC.
367 //
368 // These classes use private inheritance to hide the internal::Call API while
369 // allow direct use of its public and protected functions.
370 template <typename Response>
371 class PwpbServerWriter : private internal::PwpbServerCall {
372  public:
373   // Creates a PwpbServerWriter that is ready to send responses for a particular
374   // RPC. This can be used for testing or to send responses to an RPC that has
375   // not been started by a client.
376   template <auto kMethod, typename ServiceImpl>
Open(Server & server,uint32_t channel_id,ServiceImpl & service)377   [[nodiscard]] static PwpbServerWriter Open(Server& server,
378                                              uint32_t channel_id,
379                                              ServiceImpl& service)
380       PW_LOCKS_EXCLUDED(internal::rpc_lock()) {
381     using MethodInfo = internal::MethodInfo<kMethod>;
382     static_assert(std::is_same_v<Response, typename MethodInfo::Response>,
383                   "The response type of a PwpbServerWriter must match "
384                   "the method.");
385     return server
386         .OpenCall<PwpbServerWriter, kMethod, MethodType::kServerStreaming>(
387             channel_id,
388             service,
389             internal::MethodLookup::GetPwpbMethod<ServiceImpl,
390                                                   MethodInfo::kMethodId>());
391   }
392 
393   // Allow default construction so that users can declare a variable into
394   // which to move server reader/writers from RPC calls.
395   constexpr PwpbServerWriter() = default;
396 
397   PwpbServerWriter(PwpbServerWriter&&) = default;
398   PwpbServerWriter& operator=(PwpbServerWriter&&) = default;
399 
400   using internal::Call::active;
401   using internal::Call::channel_id;
402 
403   // Functions for setting RPC event callbacks.
404   using internal::Call::set_on_error;
405   using internal::ServerCall::set_on_completion_requested;
406   using internal::ServerCall::set_on_completion_requested_if_enabled;
407 
408   // Writes a response. Returns the following Status codes:
409   //
410   //   OK - the response was successfully sent
411   //   FAILED_PRECONDITION - the writer is closed
412   //   INTERNAL - pw_rpc was unable to encode the pw_protobuf message
413   //   other errors - the ChannelOutput failed to send the packet; the error
414   //       codes are determined by the ChannelOutput implementation
415   //
Write(const Response & response)416   Status Write(const Response& response) {
417     return internal::PwpbServerCall::SendStreamResponse(response);
418   }
419 
420   Status Finish(Status status = OkStatus()) {
421     return internal::Call::CloseAndSendResponse(status);
422   }
423 
424   Status TryFinish(Status status = OkStatus()) {
425     return internal::Call::TryCloseAndSendResponse(status);
426   }
427 
428  private:
429   friend class internal::PwpbMethod;
430   friend class Server;
431 
432   template <typename, typename, uint32_t>
433   friend class internal::test::InvocationContext;
434 
435   PwpbServerWriter(const internal::LockedCallContext& context)
PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock ())436       PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock())
437       : internal::PwpbServerCall(context, MethodType::kServerStreaming) {}
438 };
439 
440 // The PwpbUnaryResponder is used to send a typed response in a pw_protobuf
441 // unary RPC.
442 //
443 // These classes use private inheritance to hide the internal::Call API while
444 // allow direct use of its public and protected functions.
445 template <typename Response>
446 class PwpbUnaryResponder : private internal::PwpbServerCall {
447  public:
448   // Creates a PwpbUnaryResponder that is ready to send responses for a
449   // particular RPC. This can be used for testing or to send responses to an
450   // RPC that has not been started by a client.
451   template <auto kMethod, typename ServiceImpl>
Open(Server & server,uint32_t channel_id,ServiceImpl & service)452   [[nodiscard]] static PwpbUnaryResponder Open(Server& server,
453                                                uint32_t channel_id,
454                                                ServiceImpl& service)
455       PW_LOCKS_EXCLUDED(internal::rpc_lock()) {
456     using MethodInfo = internal::MethodInfo<kMethod>;
457     static_assert(std::is_same_v<Response, typename MethodInfo::Response>,
458                   "The response type of a PwpbUnaryResponder must match "
459                   "the method.");
460     return server
461         .OpenCall<PwpbUnaryResponder<Response>, kMethod, MethodType::kUnary>(
462             channel_id,
463             service,
464             internal::MethodLookup::GetPwpbMethod<ServiceImpl,
465                                                   MethodInfo::kMethodId>());
466   }
467 
468   // Allow default construction so that users can declare a variable into
469   // which to move server reader/writers from RPC calls.
470   constexpr PwpbUnaryResponder() = default;
471 
472   PwpbUnaryResponder(PwpbUnaryResponder&&) = default;
473   PwpbUnaryResponder& operator=(PwpbUnaryResponder&&) = default;
474 
475   using internal::ServerCall::active;
476   using internal::ServerCall::channel_id;
477 
478   // Functions for setting RPC event callbacks.
479   using internal::Call::set_on_error;
480 
481   // Sends the response. Returns the following Status codes:
482   //
483   //   OK - the response was successfully sent
484   //   FAILED_PRECONDITION - the writer is closed
485   //   INTERNAL - pw_rpc was unable to encode the pw_protobuf message
486   //   other errors - the ChannelOutput failed to send the packet; the error
487   //       codes are determined by the ChannelOutput implementation
488   //
489   Status Finish(const Response& response, Status status = OkStatus()) {
490     return internal::PwpbServerCall::SendUnaryResponse(response, status);
491   }
492 
493   Status TryFinish(const Response& response, Status status = OkStatus()) {
494     return internal::PwpbServerCall::TrySendUnaryResponse(response, status);
495   }
496 
497  private:
498   friend class internal::PwpbMethod;
499   friend class Server;
500 
501   template <typename, typename, uint32_t>
502   friend class internal::test::InvocationContext;
503 
504   PwpbUnaryResponder(const internal::LockedCallContext& context)
PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock ())505       PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock())
506       : internal::PwpbServerCall(context, MethodType::kUnary) {}
507 };
508 
509 }  // namespace pw::rpc
510