xref: /aosp_15_r20/external/pigweed/pw_rpc/public/pw_rpc/internal/call.h (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include <cassert>
17 #include <cstddef>
18 #include <limits>
19 #include <utility>
20 
21 #include "pw_containers/intrusive_list.h"
22 #include "pw_function/function.h"
23 #include "pw_rpc/channel.h"
24 #include "pw_rpc/internal/call_context.h"
25 #include "pw_rpc/internal/lock.h"
26 #include "pw_rpc/internal/method.h"
27 #include "pw_rpc/internal/packet.h"
28 #include "pw_rpc/method_type.h"
29 #include "pw_rpc/service.h"
30 #include "pw_rpc/writer.h"
31 #include "pw_span/span.h"
32 #include "pw_status/status.h"
33 #include "pw_status/status_with_size.h"
34 #include "pw_sync/lock_annotations.h"
35 
36 namespace pw::rpc {
37 namespace internal {
38 
39 class Endpoint;
40 class LockedEndpoint;
41 class Packet;
42 
43 // Whether a call object is associated with a server or a client.
44 enum CallType : bool { kServerCall, kClientCall };
45 
46 // Whether callbacks that take a proto use the raw data directly or decode it
47 // to a struct. The RPC lock is held when invoking callbacks that decode to a
48 // struct.
49 enum CallbackProtoType : bool { kRawProto, kProtoStruct };
50 
51 // Immutable properties of a call object. These do not change after an active
52 // call is initialized.
53 //
54 // Bits
55 //     0-1: MethodType
56 //       2: CallType
57 //       3: Bool for whether callbacks decode to proto structs
58 //
59 class CallProperties {
60  public:
CallProperties()61   constexpr CallProperties() : bits_(0u) {}
62 
CallProperties(MethodType method_type,CallType call_type,CallbackProtoType callback_proto_type)63   constexpr CallProperties(MethodType method_type,
64                            CallType call_type,
65                            CallbackProtoType callback_proto_type)
66       : bits_(static_cast<uint8_t>(
67             (static_cast<uint8_t>(method_type) << 0) |
68             (static_cast<uint8_t>(call_type) << 2) |
69             (static_cast<uint8_t>(callback_proto_type) << 3))) {}
70 
71   constexpr CallProperties(const CallProperties&) = default;
72 
73   constexpr CallProperties& operator=(const CallProperties&) = default;
74 
method_type()75   constexpr MethodType method_type() const {
76     return static_cast<MethodType>(bits_ & 0b0011u);
77   }
78 
call_type()79   constexpr CallType call_type() const {
80     return static_cast<CallType>((bits_ & 0b0100u) >> 2);
81   }
82 
callback_proto_type()83   constexpr CallbackProtoType callback_proto_type() const {
84     return static_cast<CallbackProtoType>((bits_ & 0b1000u) >> 3);
85   }
86 
87  private:
88   uint8_t bits_;
89 };
90 
91 // Unrequested RPCs always use this call ID. When a subsequent request
92 // or response is sent with a matching channel + service + method,
93 // it will match a calls with this ID if one exists.
94 inline constexpr uint32_t kOpenCallId = std::numeric_limits<uint32_t>::max();
95 
96 // Legacy clients and servers didn't make use of call IDs at all, and will send
97 // unrequested responses with an "empty" (zero) call ID.
98 inline constexpr uint32_t kLegacyOpenCallId = 0;
99 
100 // Internal RPC Call class. The Call is used to respond to any type of RPC.
101 // Public classes like ServerWriters inherit from it with private inheritance
102 // and provide a public API for their use case. The Call's public API is used by
103 // the Server and Client classes.
104 //
105 // Private inheritance is used in place of composition or more complex
106 // inheritance hierarchy so that these objects all inherit from a common
107 // IntrusiveList::Item object. Private inheritance also gives the derived class
108 // full control over their interfaces.
109 //
110 // IMPLEMENTATION NOTE:
111 //
112 // Subclasses of `Call` must include a destructor which calls
113 // `DestroyServerCall` or `DestroyClientCall` (as appropriate) if the subclass
114 // contains any fields which might be referenced by the call's callbacks. This
115 // ensures that the callbacks do not reference fields which may have already
116 // been destroyed.
117 //
118 // At the top level, `ServerCall` and `ClientCall` invoke `DestroyServerCall`
119 // `DestroyClientCall` respectively to perform cleanup in the case where no
120 // subclass carries additional state.
121 class Call : public IntrusiveList<Call>::Item, private rpc::Writer {
122  public:
123   Call(const Call&) = delete;
124 
125   // Move support is provided to derived classes through the MoveFrom function.
126   Call(Call&&) = delete;
127 
128   Call& operator=(const Call&) = delete;
129   Call& operator=(Call&&) = delete;
130 
~Call()131   ~Call() {
132     // Ensure that calls have already been closed and unregistered.
133     // See class IMPLEMENTATION NOTE for further details.
134     PW_DASSERT((state_ & kHasBeenDestroyed) != 0);
135     PW_DASSERT(!active_locked() && !CallbacksAreRunning());
136   }
137 
138   // True if the Call is active and ready to send responses.
active()139   [[nodiscard]] bool active() const PW_LOCKS_EXCLUDED(rpc_lock()) {
140     RpcLockGuard lock;
141     return active_locked();
142   }
143 
active_locked()144   [[nodiscard]] bool active_locked() const
145       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
146     return (state_ & kActive) != 0;
147   }
148 
awaiting_cleanup()149   [[nodiscard]] bool awaiting_cleanup() const
150       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
151     return awaiting_cleanup_ != OkStatus().code();
152   }
153 
id()154   uint32_t id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { return id_; }
155 
set_id(uint32_t id)156   void set_id(uint32_t id) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { id_ = id; }
157 
158   // Public function for accessing the channel ID of this call. Set to 0 when
159   // the call is closed.
channel_id()160   uint32_t channel_id() const PW_LOCKS_EXCLUDED(rpc_lock()) {
161     RpcLockGuard lock;
162     return channel_id_locked();
163   }
164 
channel_id_locked()165   uint32_t channel_id_locked() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
166     return channel_id_;
167   }
168 
service_id()169   uint32_t service_id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
170     return service_id_;
171   }
172 
method_id()173   uint32_t method_id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
174     return method_id_;
175   }
176 
177   // Return whether this is a server or client call.
type()178   CallType type() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
179     return properties_.call_type();
180   }
181 
182   // Closes the Call and sends a RESPONSE packet, if it is active. Returns the
183   // status from sending the packet, or FAILED_PRECONDITION if the Call is not
184   // active.
CloseAndSendResponse(ConstByteSpan response,Status status)185   Status CloseAndSendResponse(ConstByteSpan response, Status status)
186       PW_LOCKS_EXCLUDED(rpc_lock()) {
187     RpcLockGuard lock;
188     return CloseAndSendResponseLocked(response, status);
189   }
190 
CloseAndSendResponseCallback(const Function<StatusWithSize (ByteSpan)> & callback,Status status)191   Status CloseAndSendResponseCallback(
192       const Function<StatusWithSize(ByteSpan)>& callback, Status status)
193       PW_LOCKS_EXCLUDED(rpc_lock()) {
194     RpcLockGuard lock;
195     return CloseAndSendResponseCallbackLocked(callback, status);
196   }
197 
CloseAndSendResponseLocked(ConstByteSpan response,Status status)198   Status CloseAndSendResponseLocked(ConstByteSpan response, Status status)
199       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
200     return CloseAndSendFinalPacketLocked(
201         pwpb::PacketType::RESPONSE, response, status);
202   }
203 
204   Status CloseAndSendResponseCallbackLocked(
205       const Function<StatusWithSize(ByteSpan)>& callback, Status status)
206       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
207 
CloseAndSendResponse(Status status)208   Status CloseAndSendResponse(Status status) PW_LOCKS_EXCLUDED(rpc_lock()) {
209     return CloseAndSendResponse({}, status);
210   }
211 
CloseAndSendServerErrorLocked(Status error)212   Status CloseAndSendServerErrorLocked(Status error)
213       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
214     return CloseAndSendFinalPacketLocked(
215         pwpb::PacketType::SERVER_ERROR, {}, error);
216   }
217 
218   // Closes the Call and sends a RESPONSE packet, if the RESPONSE packet failed
219   // to send , keep the call alive and return error. This API allows user to
220   // resend RESPONSE packet when transmission failed.
TryCloseAndSendResponse(ConstByteSpan response,Status status)221   Status TryCloseAndSendResponse(ConstByteSpan response, Status status)
222       PW_LOCKS_EXCLUDED(rpc_lock()) {
223     RpcLockGuard lock;
224     return TryCloseAndSendResponseLocked(response, status);
225   }
226 
TryCloseAndSendResponseCallback(const Function<StatusWithSize (ByteSpan)> & callback,Status status)227   Status TryCloseAndSendResponseCallback(
228       const Function<StatusWithSize(ByteSpan)>& callback, Status status)
229       PW_LOCKS_EXCLUDED(rpc_lock()) {
230     RpcLockGuard lock;
231     return TryCloseAndSendResponseCallbackLocked(callback, status);
232   }
233 
TryCloseAndSendResponseLocked(ConstByteSpan response,Status status)234   Status TryCloseAndSendResponseLocked(ConstByteSpan response, Status status)
235       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
236     return TryCloseAndSendFinalPacketLocked(
237         pwpb::PacketType::RESPONSE, response, status);
238   }
239 
240   Status TryCloseAndSendResponseCallbackLocked(
241       const Function<StatusWithSize(ByteSpan)>& callback, Status status)
242       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
243 
TryCloseAndSendResponse(Status status)244   Status TryCloseAndSendResponse(Status status) PW_LOCKS_EXCLUDED(rpc_lock()) {
245     return TryCloseAndSendResponse({}, status);
246   }
247 
TryCloseAndSendServerErrorLocked(Status error)248   Status TryCloseAndSendServerErrorLocked(Status error)
249       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
250     return TryCloseAndSendFinalPacketLocked(
251         pwpb::PacketType::SERVER_ERROR, {}, error);
252   }
253 
254   // Public function that indicates that the client requests completion of the
255   // RPC, but is still active and listening to responses. For client streaming
256   // and bi-directional streaming RPCs, this also closes the client stream. If
257   // PW_RPC_COMPLETION_REQUEST_CALLBACK is enabled and
258   // on_client_requested_completion callback is set using the
259   // set_on_completion_requested_if_enabled, then the callback will be invoked
260   // on the server side. The server may then take an appropriate action to
261   // cleanup and stop server streaming.
RequestCompletion()262   Status RequestCompletion() PW_LOCKS_EXCLUDED(rpc_lock()) {
263     RpcLockGuard lock;
264     return RequestCompletionLocked();
265   }
266 
267   // Internal function that closes the client stream (if applicable) and sends
268   // CLIENT_REQUEST_COMPLETION packet to request call completion.
RequestCompletionLocked()269   Status RequestCompletionLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
270     MarkStreamCompleted();
271     return SendPacket(pwpb::PacketType::CLIENT_REQUEST_COMPLETION, {}, {});
272   }
273 
274   // Sends a payload in either a server or client stream packet.
Write(ConstByteSpan payload)275   Status Write(ConstByteSpan payload) PW_LOCKS_EXCLUDED(rpc_lock()) {
276     RpcLockGuard lock;
277     return WriteLocked(payload);
278   }
279 
280   /// Provides a buffer into which to encode an RPC server or client stream
281   /// payload, then sends it.
282   ///
283   /// @param callback Callback function invoked with the allocated payload
284   ///   buffer. The callback should return an OK status with the size of the
285   ///   encoded payload if it was successfully written, or an error status
286   ///   otherwise.
Write(const Function<StatusWithSize (ByteSpan)> & callback)287   Status Write(const Function<StatusWithSize(ByteSpan)>& callback)
288       PW_LOCKS_EXCLUDED(rpc_lock()) {
289     RpcLockGuard lock;
290     return WriteCallbackLocked(callback);
291   }
292 
293   Status WriteLocked(ConstByteSpan payload)
294       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
295 
296   Status WriteCallbackLocked(const Function<StatusWithSize(ByteSpan)>& callback)
297       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
298 
299   // Sends the initial request for a client call. If the request fails, the call
300   // is closed.
SendInitialClientRequest(ConstByteSpan payload)301   void SendInitialClientRequest(ConstByteSpan payload)
302       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
303     if (const Status status = SendPacket(pwpb::PacketType::REQUEST, payload);
304         !status.ok()) {
305       CloseAndMarkForCleanup(status);
306     }
307   }
308 
309   void CloseAndMarkForCleanup(Status error)
310       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
311 
312   // Whenever a payload arrives (in a server/client stream or in a response),
313   // call the on_next_ callback.
314   // Precondition: rpc_lock() must be held.
315   void HandlePayload(ConstByteSpan payload) PW_UNLOCK_FUNCTION(rpc_lock());
316 
317   // Handles an error condition for the call. This closes the call and calls the
318   // on_error callback, if set.
HandleError(Status status)319   void HandleError(Status status) PW_UNLOCK_FUNCTION(rpc_lock()) {
320     UnregisterAndMarkClosed();
321     CallOnError(status);
322   }
323 
324   // Closes the RPC, but does NOT unregister the call or call on_error. The
325   // call must be moved to the endpoint's to_cleanup_ list and have its
326   // CleanUp() method called at a later time. Only for use by the Endpoint.
CloseAndMarkForCleanupFromEndpoint(Status error)327   void CloseAndMarkForCleanupFromEndpoint(Status error)
328       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
329     MarkClosed();
330     awaiting_cleanup_ = error.code();
331   }
332 
333   // Clears the awaiting_cleanup_ variable and calls the on_error callback. Only
334   // for use by the Endpoint, which will unlist the call.
CleanUpFromEndpoint()335   void CleanUpFromEndpoint() PW_UNLOCK_FUNCTION(rpc_lock()) {
336     const Status status(static_cast<Status::Code>(awaiting_cleanup_));
337     awaiting_cleanup_ = OkStatus().code();
338     CallOnError(status);
339   }
340 
has_client_stream()341   bool has_client_stream() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
342     return HasClientStream(properties_.method_type());
343   }
344 
has_server_stream()345   bool has_server_stream() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
346     return HasServerStream(properties_.method_type());
347   }
348 
349   // Returns true if the client has already requested completion.
client_requested_completion()350   bool client_requested_completion() const
351       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
352     return (state_ & kClientRequestedCompletion) != 0;
353   }
354 
355   // Closes a call without doing anything else. Called from the Endpoint
356   // destructor.
CloseFromDeletedEndpoint()357   void CloseFromDeletedEndpoint() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
358     MarkClosed();
359     awaiting_cleanup_ = OkStatus().code();
360     endpoint_ = nullptr;
361   }
362 
363   // Logs detailed info about this call at INFO level. NOT for production use!
364   void DebugLog() const;
365 
366  protected:
367   // Creates an inactive Call.
Call()368   constexpr Call()
369       : endpoint_{},
370         channel_id_{},
371         id_{},
372         service_id_{},
373         method_id_{},
374         state_{},
375         awaiting_cleanup_{},
376         callbacks_executing_{},
377         properties_{} {}
378 
379   // Creates an active server-side Call.
380   Call(const LockedCallContext& context, CallProperties properties)
381       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
382 
383   // Creates an active client-side Call.
384   Call(LockedEndpoint& client,
385        uint32_t channel_id,
386        uint32_t service_id,
387        uint32_t method_id,
388        CallProperties properties) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
389 
390   // Closes the call and waits for their callbacks to complete so destructors
391   // can run safely.
392   void DestroyServerCall() PW_LOCKS_EXCLUDED(rpc_lock());
393   void DestroyClientCall() PW_LOCKS_EXCLUDED(rpc_lock());
394 
CallbackStarted()395   void CallbackStarted() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
396     callbacks_executing_ += 1;
397   }
398 
CallbackFinished()399   void CallbackFinished() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
400     callbacks_executing_ -= 1;
401   }
402 
403   // This call must be in a closed state when this is called.
404   void MoveFrom(Call& other) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
405 
endpoint()406   Endpoint& endpoint() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
407     return *endpoint_;
408   }
409 
410   // Public function that sets the on_next function in the raw API.
set_on_next(Function<void (ConstByteSpan)> && on_next)411   void set_on_next(Function<void(ConstByteSpan)>&& on_next)
412       PW_LOCKS_EXCLUDED(rpc_lock()) {
413     RpcLockGuard lock;
414     set_on_next_locked(std::move(on_next));
415   }
416 
417   // Internal function that sets on_next.
set_on_next_locked(Function<void (ConstByteSpan)> && on_next)418   void set_on_next_locked(Function<void(ConstByteSpan)>&& on_next)
419       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
420     on_next_ = std::move(on_next);
421   }
422 
423   // Public function that sets the on_error callback.
set_on_error(Function<void (Status)> && on_error)424   void set_on_error(Function<void(Status)>&& on_error)
425       PW_LOCKS_EXCLUDED(rpc_lock()) {
426     RpcLockGuard lock;
427     set_on_error_locked(std::move(on_error));
428   }
429 
430   // Internal function that sets on_error.
set_on_error_locked(Function<void (Status)> && on_error)431   void set_on_error_locked(Function<void(Status)>&& on_error)
432       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
433     on_error_ = std::move(on_error);
434   }
435 
MarkStreamCompleted()436   void MarkStreamCompleted() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
437     state_ |= kClientRequestedCompletion;
438   }
439 
440   // Closes a client call. Sends a CLIENT_REQUEST_COMPLETION for client /
441   // bidirectional streaming RPCs if not sent yet.
442   void CloseClientCall() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
443 
444   // Closes a server call.
CloseAndSendResponseLocked(Status status)445   Status CloseAndSendResponseLocked(Status status)
446       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
447     return CloseAndSendFinalPacketLocked(
448         pwpb::PacketType::RESPONSE, {}, status);
449   }
450 
451   // Cancels an RPC. Public function for client calls only.
Cancel()452   Status Cancel() PW_LOCKS_EXCLUDED(rpc_lock()) {
453     RpcLockGuard lock;
454     return CloseAndSendFinalPacketLocked(
455         pwpb::PacketType::CLIENT_ERROR, {}, Status::Cancelled());
456   }
457 
458   // Unregisters the RPC from the endpoint & marks as closed. The call may be
459   // active or inactive when this is called.
460   void UnregisterAndMarkClosed() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
461 
462   // Define conversions to the generic server/client RPC writer class.
as_writer()463   constexpr Writer& as_writer() { return *this; }
as_writer()464   constexpr const Writer& as_writer() const { return *this; }
465 
466   // Indicates if the on_next and unary on_completed callbacks are internal
467   // wrappers that decode the raw proto before invoking the user's callback. If
468   // they are, the lock must be held when they are invoked.
hold_lock_while_invoking_callback_with_payload()469   bool hold_lock_while_invoking_callback_with_payload() const
470       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
471     return properties_.callback_proto_type() == kProtoStruct;
472   }
473 
474   // Decodes a raw protobuf into a proto struct (pwpb or Nanopb) and invokes the
475   // pwpb or Nanopb version of the on_next callback.
476   //
477   // This must ONLY be called from derived classes the wrap the on_next
478   // callback. These classes MUST indicate that they call calls in their
479   // constructor.
480   template <typename Decoder, typename ProtoStruct>
DecodeToStructAndInvokeOnNext(ConstByteSpan payload,const Decoder & decoder,Function<void (const ProtoStruct &)> & proto_on_next)481   void DecodeToStructAndInvokeOnNext(
482       ConstByteSpan payload,
483       const Decoder& decoder,
484       Function<void(const ProtoStruct&)>& proto_on_next)
485       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
486     if (proto_on_next == nullptr) {
487       return;
488     }
489 
490     ProtoStruct proto_struct{};
491 
492     if (!decoder.Decode(payload, proto_struct).ok()) {
493       CloseAndMarkForCleanup(Status::DataLoss());
494       return;
495     }
496 
497     const uint32_t original_id = id();
498     auto proto_on_next_local = std::move(proto_on_next);
499 
500     rpc_lock().unlock();
501     proto_on_next_local(proto_struct);
502     rpc_lock().lock();
503 
504     // Restore the original callback if the original call is still active and
505     // the callback has not been replaced.
506     // NOLINTNEXTLINE(bugprone-use-after-move)
507     if (active_locked() && id() == original_id && proto_on_next == nullptr) {
508       proto_on_next = std::move(proto_on_next_local);
509     }
510   }
511 
512   // The call is already unregistered and closed.
513   template <typename Decoder, typename ProtoStruct>
DecodeToStructAndInvokeOnCompleted(ConstByteSpan payload,const Decoder & decoder,Function<void (const ProtoStruct &,Status)> & proto_on_completed,Status status)514   void DecodeToStructAndInvokeOnCompleted(
515       ConstByteSpan payload,
516       const Decoder& decoder,
517       Function<void(const ProtoStruct&, Status)>& proto_on_completed,
518       Status status) PW_UNLOCK_FUNCTION(rpc_lock()) {
519     // Always move proto_on_completed so it goes out of scope in this function.
520     auto proto_on_completed_local = std::move(proto_on_completed);
521 
522     // Move on_error in case an error occurs.
523     auto on_error_local = std::move(on_error_);
524 
525     // Release the lock before decoding, since decoder is a global.
526     rpc_lock().unlock();
527 
528     if (proto_on_completed_local == nullptr) {
529       return;
530     }
531 
532     ProtoStruct proto_struct{};
533     if (decoder.Decode(payload, proto_struct).ok()) {
534       proto_on_completed_local(proto_struct, status);
535     } else if (on_error_local != nullptr) {
536       on_error_local(Status::DataLoss());
537     }
538   }
539 
540   // An active call cannot be moved if its callbacks are running. This function
541   // must be called on the call being moved before updating any state.
542   static void WaitUntilReadyForMove(Call& destination, Call& source)
543       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
544 
545  private:
546   friend class rpc::Writer;
547 
548   enum State : uint8_t {
549     kActive = 0b001,
550     kClientRequestedCompletion = 0b010,
551     kHasBeenDestroyed = 0b100,
552   };
553 
554   // Common constructor for server & client calls.
555   Call(LockedEndpoint& endpoint,
556        uint32_t id,
557        uint32_t channel_id,
558        uint32_t service_id,
559        uint32_t method_id,
560        CallProperties properties);
561 
562   Packet MakePacket(pwpb::PacketType type,
563                     ConstByteSpan payload,
564                     Status status = OkStatus()) const
PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock ())565       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
566     return Packet(type,
567                   channel_id_locked(),
568                   service_id(),
569                   method_id(),
570                   id_,
571                   payload,
572                   status);
573   }
574 
575   // Marks a call object closed without doing anything else. The call is not
576   // removed from the calls list and no callbacks are called.
MarkClosed()577   void MarkClosed() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
578     channel_id_ = Channel::kUnassignedChannelId;
579     id_ = 0;
580     state_ = kClientRequestedCompletion;
581   }
582 
583   // Calls the on_error callback without closing the RPC. This is used when the
584   // call has already completed.
585   void CallOnError(Status error) PW_UNLOCK_FUNCTION(rpc_lock());
586 
587   // If required, removes this call from the endpoint's to_cleanup_ list and
588   // calls CleanUp(). Returns true if cleanup was required, which means the lock
589   // was released.
590   bool CleanUpIfRequired() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
591 
592   // Sends a payload with the specified type. The payload may either be in a
593   // previously acquired buffer or in a standalone buffer.
594   //
595   // Returns FAILED_PRECONDITION if the call is not active().
596   Status SendPacket(pwpb::PacketType type,
597                     ConstByteSpan payload,
598                     Status status = OkStatus())
599       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
600 
601   Status CloseAndSendFinalPacketLocked(pwpb::PacketType type,
602                                        ConstByteSpan response,
603                                        Status status)
604       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
605 
606   Status TryCloseAndSendFinalPacketLocked(pwpb::PacketType type,
607                                           ConstByteSpan response,
608                                           Status status)
609       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
610 
CallbacksAreRunning()611   bool CallbacksAreRunning() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
612     return callbacks_executing_ != 0u;
613   }
614 
615   // Waits for callbacks to complete so that a call object can be destroyed.
616   void WaitForCallbacksToComplete() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
617 
618   Endpoint* endpoint_ PW_GUARDED_BY(rpc_lock());
619   uint32_t channel_id_ PW_GUARDED_BY(rpc_lock());
620   uint32_t id_ PW_GUARDED_BY(rpc_lock());
621   uint32_t service_id_ PW_GUARDED_BY(rpc_lock());
622   uint32_t method_id_ PW_GUARDED_BY(rpc_lock());
623 
624   // State of call and client stream.
625   //
626   //   bit 0: call is active
627   //   bit 1: client stream is active
628   //   bit 2: call has been destroyed
629   uint8_t state_ PW_GUARDED_BY(rpc_lock());
630 
631   // If non-OK, indicates that the call was closed and needs to have its
632   // on_error called with this Status code. Uses a uint8_t for compactness.
633   uint8_t awaiting_cleanup_ PW_GUARDED_BY(rpc_lock());
634 
635   // Tracks how many of this call's callbacks are running. Must be 0 for the
636   // call to be destroyed.
637   uint8_t callbacks_executing_ PW_GUARDED_BY(rpc_lock());
638 
639   CallProperties properties_ PW_GUARDED_BY(rpc_lock());
640 
641   // Called when the RPC is terminated due to an error.
642   Function<void(Status error)> on_error_ PW_GUARDED_BY(rpc_lock());
643 
644   // Called when a request is received. Only used for RPCs with client streams.
645   // The raw payload buffer is passed to the callback.
646   Function<void(ConstByteSpan payload)> on_next_ PW_GUARDED_BY(rpc_lock());
647 };
648 
649 }  // namespace internal
650 
active()651 inline bool Writer::active() const {
652   return static_cast<const internal::Call*>(this)->active();
653 }
654 
channel_id()655 inline uint32_t Writer::channel_id() const {
656   return static_cast<const internal::Call*>(this)->channel_id();
657 }
658 
Write(ConstByteSpan payload)659 inline Status Writer::Write(ConstByteSpan payload) {
660   return static_cast<internal::Call*>(this)->Write(payload);
661 }
662 
Write(const Function<StatusWithSize (ByteSpan)> & callback)663 inline Status Writer::Write(
664     const Function<StatusWithSize(ByteSpan)>& callback) {
665   return static_cast<internal::Call*>(this)->Write(callback);
666 }
667 
668 }  // namespace pw::rpc
669