1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15
16 #include <cassert>
17 #include <cstddef>
18 #include <limits>
19 #include <utility>
20
21 #include "pw_containers/intrusive_list.h"
22 #include "pw_function/function.h"
23 #include "pw_rpc/channel.h"
24 #include "pw_rpc/internal/call_context.h"
25 #include "pw_rpc/internal/lock.h"
26 #include "pw_rpc/internal/method.h"
27 #include "pw_rpc/internal/packet.h"
28 #include "pw_rpc/method_type.h"
29 #include "pw_rpc/service.h"
30 #include "pw_rpc/writer.h"
31 #include "pw_span/span.h"
32 #include "pw_status/status.h"
33 #include "pw_status/status_with_size.h"
34 #include "pw_sync/lock_annotations.h"
35
36 namespace pw::rpc {
37 namespace internal {
38
39 class Endpoint;
40 class LockedEndpoint;
41 class Packet;
42
43 // Whether a call object is associated with a server or a client.
44 enum CallType : bool { kServerCall, kClientCall };
45
46 // Whether callbacks that take a proto use the raw data directly or decode it
47 // to a struct. The RPC lock is held when invoking callbacks that decode to a
48 // struct.
49 enum CallbackProtoType : bool { kRawProto, kProtoStruct };
50
51 // Immutable properties of a call object. These do not change after an active
52 // call is initialized.
53 //
54 // Bits
55 // 0-1: MethodType
56 // 2: CallType
57 // 3: Bool for whether callbacks decode to proto structs
58 //
59 class CallProperties {
60 public:
CallProperties()61 constexpr CallProperties() : bits_(0u) {}
62
CallProperties(MethodType method_type,CallType call_type,CallbackProtoType callback_proto_type)63 constexpr CallProperties(MethodType method_type,
64 CallType call_type,
65 CallbackProtoType callback_proto_type)
66 : bits_(static_cast<uint8_t>(
67 (static_cast<uint8_t>(method_type) << 0) |
68 (static_cast<uint8_t>(call_type) << 2) |
69 (static_cast<uint8_t>(callback_proto_type) << 3))) {}
70
71 constexpr CallProperties(const CallProperties&) = default;
72
73 constexpr CallProperties& operator=(const CallProperties&) = default;
74
method_type()75 constexpr MethodType method_type() const {
76 return static_cast<MethodType>(bits_ & 0b0011u);
77 }
78
call_type()79 constexpr CallType call_type() const {
80 return static_cast<CallType>((bits_ & 0b0100u) >> 2);
81 }
82
callback_proto_type()83 constexpr CallbackProtoType callback_proto_type() const {
84 return static_cast<CallbackProtoType>((bits_ & 0b1000u) >> 3);
85 }
86
87 private:
88 uint8_t bits_;
89 };
90
91 // Unrequested RPCs always use this call ID. When a subsequent request
92 // or response is sent with a matching channel + service + method,
93 // it will match a calls with this ID if one exists.
94 inline constexpr uint32_t kOpenCallId = std::numeric_limits<uint32_t>::max();
95
96 // Legacy clients and servers didn't make use of call IDs at all, and will send
97 // unrequested responses with an "empty" (zero) call ID.
98 inline constexpr uint32_t kLegacyOpenCallId = 0;
99
100 // Internal RPC Call class. The Call is used to respond to any type of RPC.
101 // Public classes like ServerWriters inherit from it with private inheritance
102 // and provide a public API for their use case. The Call's public API is used by
103 // the Server and Client classes.
104 //
105 // Private inheritance is used in place of composition or more complex
106 // inheritance hierarchy so that these objects all inherit from a common
107 // IntrusiveList::Item object. Private inheritance also gives the derived class
108 // full control over their interfaces.
109 //
110 // IMPLEMENTATION NOTE:
111 //
112 // Subclasses of `Call` must include a destructor which calls
113 // `DestroyServerCall` or `DestroyClientCall` (as appropriate) if the subclass
114 // contains any fields which might be referenced by the call's callbacks. This
115 // ensures that the callbacks do not reference fields which may have already
116 // been destroyed.
117 //
118 // At the top level, `ServerCall` and `ClientCall` invoke `DestroyServerCall`
119 // `DestroyClientCall` respectively to perform cleanup in the case where no
120 // subclass carries additional state.
121 class Call : public IntrusiveList<Call>::Item, private rpc::Writer {
122 public:
123 Call(const Call&) = delete;
124
125 // Move support is provided to derived classes through the MoveFrom function.
126 Call(Call&&) = delete;
127
128 Call& operator=(const Call&) = delete;
129 Call& operator=(Call&&) = delete;
130
~Call()131 ~Call() {
132 // Ensure that calls have already been closed and unregistered.
133 // See class IMPLEMENTATION NOTE for further details.
134 PW_DASSERT((state_ & kHasBeenDestroyed) != 0);
135 PW_DASSERT(!active_locked() && !CallbacksAreRunning());
136 }
137
138 // True if the Call is active and ready to send responses.
active()139 [[nodiscard]] bool active() const PW_LOCKS_EXCLUDED(rpc_lock()) {
140 RpcLockGuard lock;
141 return active_locked();
142 }
143
active_locked()144 [[nodiscard]] bool active_locked() const
145 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
146 return (state_ & kActive) != 0;
147 }
148
awaiting_cleanup()149 [[nodiscard]] bool awaiting_cleanup() const
150 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
151 return awaiting_cleanup_ != OkStatus().code();
152 }
153
id()154 uint32_t id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { return id_; }
155
set_id(uint32_t id)156 void set_id(uint32_t id) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { id_ = id; }
157
158 // Public function for accessing the channel ID of this call. Set to 0 when
159 // the call is closed.
channel_id()160 uint32_t channel_id() const PW_LOCKS_EXCLUDED(rpc_lock()) {
161 RpcLockGuard lock;
162 return channel_id_locked();
163 }
164
channel_id_locked()165 uint32_t channel_id_locked() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
166 return channel_id_;
167 }
168
service_id()169 uint32_t service_id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
170 return service_id_;
171 }
172
method_id()173 uint32_t method_id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
174 return method_id_;
175 }
176
177 // Return whether this is a server or client call.
type()178 CallType type() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
179 return properties_.call_type();
180 }
181
182 // Closes the Call and sends a RESPONSE packet, if it is active. Returns the
183 // status from sending the packet, or FAILED_PRECONDITION if the Call is not
184 // active.
CloseAndSendResponse(ConstByteSpan response,Status status)185 Status CloseAndSendResponse(ConstByteSpan response, Status status)
186 PW_LOCKS_EXCLUDED(rpc_lock()) {
187 RpcLockGuard lock;
188 return CloseAndSendResponseLocked(response, status);
189 }
190
CloseAndSendResponseCallback(const Function<StatusWithSize (ByteSpan)> & callback,Status status)191 Status CloseAndSendResponseCallback(
192 const Function<StatusWithSize(ByteSpan)>& callback, Status status)
193 PW_LOCKS_EXCLUDED(rpc_lock()) {
194 RpcLockGuard lock;
195 return CloseAndSendResponseCallbackLocked(callback, status);
196 }
197
CloseAndSendResponseLocked(ConstByteSpan response,Status status)198 Status CloseAndSendResponseLocked(ConstByteSpan response, Status status)
199 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
200 return CloseAndSendFinalPacketLocked(
201 pwpb::PacketType::RESPONSE, response, status);
202 }
203
204 Status CloseAndSendResponseCallbackLocked(
205 const Function<StatusWithSize(ByteSpan)>& callback, Status status)
206 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
207
CloseAndSendResponse(Status status)208 Status CloseAndSendResponse(Status status) PW_LOCKS_EXCLUDED(rpc_lock()) {
209 return CloseAndSendResponse({}, status);
210 }
211
CloseAndSendServerErrorLocked(Status error)212 Status CloseAndSendServerErrorLocked(Status error)
213 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
214 return CloseAndSendFinalPacketLocked(
215 pwpb::PacketType::SERVER_ERROR, {}, error);
216 }
217
218 // Closes the Call and sends a RESPONSE packet, if the RESPONSE packet failed
219 // to send , keep the call alive and return error. This API allows user to
220 // resend RESPONSE packet when transmission failed.
TryCloseAndSendResponse(ConstByteSpan response,Status status)221 Status TryCloseAndSendResponse(ConstByteSpan response, Status status)
222 PW_LOCKS_EXCLUDED(rpc_lock()) {
223 RpcLockGuard lock;
224 return TryCloseAndSendResponseLocked(response, status);
225 }
226
TryCloseAndSendResponseCallback(const Function<StatusWithSize (ByteSpan)> & callback,Status status)227 Status TryCloseAndSendResponseCallback(
228 const Function<StatusWithSize(ByteSpan)>& callback, Status status)
229 PW_LOCKS_EXCLUDED(rpc_lock()) {
230 RpcLockGuard lock;
231 return TryCloseAndSendResponseCallbackLocked(callback, status);
232 }
233
TryCloseAndSendResponseLocked(ConstByteSpan response,Status status)234 Status TryCloseAndSendResponseLocked(ConstByteSpan response, Status status)
235 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
236 return TryCloseAndSendFinalPacketLocked(
237 pwpb::PacketType::RESPONSE, response, status);
238 }
239
240 Status TryCloseAndSendResponseCallbackLocked(
241 const Function<StatusWithSize(ByteSpan)>& callback, Status status)
242 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
243
TryCloseAndSendResponse(Status status)244 Status TryCloseAndSendResponse(Status status) PW_LOCKS_EXCLUDED(rpc_lock()) {
245 return TryCloseAndSendResponse({}, status);
246 }
247
TryCloseAndSendServerErrorLocked(Status error)248 Status TryCloseAndSendServerErrorLocked(Status error)
249 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
250 return TryCloseAndSendFinalPacketLocked(
251 pwpb::PacketType::SERVER_ERROR, {}, error);
252 }
253
254 // Public function that indicates that the client requests completion of the
255 // RPC, but is still active and listening to responses. For client streaming
256 // and bi-directional streaming RPCs, this also closes the client stream. If
257 // PW_RPC_COMPLETION_REQUEST_CALLBACK is enabled and
258 // on_client_requested_completion callback is set using the
259 // set_on_completion_requested_if_enabled, then the callback will be invoked
260 // on the server side. The server may then take an appropriate action to
261 // cleanup and stop server streaming.
RequestCompletion()262 Status RequestCompletion() PW_LOCKS_EXCLUDED(rpc_lock()) {
263 RpcLockGuard lock;
264 return RequestCompletionLocked();
265 }
266
267 // Internal function that closes the client stream (if applicable) and sends
268 // CLIENT_REQUEST_COMPLETION packet to request call completion.
RequestCompletionLocked()269 Status RequestCompletionLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
270 MarkStreamCompleted();
271 return SendPacket(pwpb::PacketType::CLIENT_REQUEST_COMPLETION, {}, {});
272 }
273
274 // Sends a payload in either a server or client stream packet.
Write(ConstByteSpan payload)275 Status Write(ConstByteSpan payload) PW_LOCKS_EXCLUDED(rpc_lock()) {
276 RpcLockGuard lock;
277 return WriteLocked(payload);
278 }
279
280 /// Provides a buffer into which to encode an RPC server or client stream
281 /// payload, then sends it.
282 ///
283 /// @param callback Callback function invoked with the allocated payload
284 /// buffer. The callback should return an OK status with the size of the
285 /// encoded payload if it was successfully written, or an error status
286 /// otherwise.
Write(const Function<StatusWithSize (ByteSpan)> & callback)287 Status Write(const Function<StatusWithSize(ByteSpan)>& callback)
288 PW_LOCKS_EXCLUDED(rpc_lock()) {
289 RpcLockGuard lock;
290 return WriteCallbackLocked(callback);
291 }
292
293 Status WriteLocked(ConstByteSpan payload)
294 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
295
296 Status WriteCallbackLocked(const Function<StatusWithSize(ByteSpan)>& callback)
297 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
298
299 // Sends the initial request for a client call. If the request fails, the call
300 // is closed.
SendInitialClientRequest(ConstByteSpan payload)301 void SendInitialClientRequest(ConstByteSpan payload)
302 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
303 if (const Status status = SendPacket(pwpb::PacketType::REQUEST, payload);
304 !status.ok()) {
305 CloseAndMarkForCleanup(status);
306 }
307 }
308
309 void CloseAndMarkForCleanup(Status error)
310 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
311
312 // Whenever a payload arrives (in a server/client stream or in a response),
313 // call the on_next_ callback.
314 // Precondition: rpc_lock() must be held.
315 void HandlePayload(ConstByteSpan payload) PW_UNLOCK_FUNCTION(rpc_lock());
316
317 // Handles an error condition for the call. This closes the call and calls the
318 // on_error callback, if set.
HandleError(Status status)319 void HandleError(Status status) PW_UNLOCK_FUNCTION(rpc_lock()) {
320 UnregisterAndMarkClosed();
321 CallOnError(status);
322 }
323
324 // Closes the RPC, but does NOT unregister the call or call on_error. The
325 // call must be moved to the endpoint's to_cleanup_ list and have its
326 // CleanUp() method called at a later time. Only for use by the Endpoint.
CloseAndMarkForCleanupFromEndpoint(Status error)327 void CloseAndMarkForCleanupFromEndpoint(Status error)
328 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
329 MarkClosed();
330 awaiting_cleanup_ = error.code();
331 }
332
333 // Clears the awaiting_cleanup_ variable and calls the on_error callback. Only
334 // for use by the Endpoint, which will unlist the call.
CleanUpFromEndpoint()335 void CleanUpFromEndpoint() PW_UNLOCK_FUNCTION(rpc_lock()) {
336 const Status status(static_cast<Status::Code>(awaiting_cleanup_));
337 awaiting_cleanup_ = OkStatus().code();
338 CallOnError(status);
339 }
340
has_client_stream()341 bool has_client_stream() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
342 return HasClientStream(properties_.method_type());
343 }
344
has_server_stream()345 bool has_server_stream() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
346 return HasServerStream(properties_.method_type());
347 }
348
349 // Returns true if the client has already requested completion.
client_requested_completion()350 bool client_requested_completion() const
351 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
352 return (state_ & kClientRequestedCompletion) != 0;
353 }
354
355 // Closes a call without doing anything else. Called from the Endpoint
356 // destructor.
CloseFromDeletedEndpoint()357 void CloseFromDeletedEndpoint() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
358 MarkClosed();
359 awaiting_cleanup_ = OkStatus().code();
360 endpoint_ = nullptr;
361 }
362
363 // Logs detailed info about this call at INFO level. NOT for production use!
364 void DebugLog() const;
365
366 protected:
367 // Creates an inactive Call.
Call()368 constexpr Call()
369 : endpoint_{},
370 channel_id_{},
371 id_{},
372 service_id_{},
373 method_id_{},
374 state_{},
375 awaiting_cleanup_{},
376 callbacks_executing_{},
377 properties_{} {}
378
379 // Creates an active server-side Call.
380 Call(const LockedCallContext& context, CallProperties properties)
381 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
382
383 // Creates an active client-side Call.
384 Call(LockedEndpoint& client,
385 uint32_t channel_id,
386 uint32_t service_id,
387 uint32_t method_id,
388 CallProperties properties) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
389
390 // Closes the call and waits for their callbacks to complete so destructors
391 // can run safely.
392 void DestroyServerCall() PW_LOCKS_EXCLUDED(rpc_lock());
393 void DestroyClientCall() PW_LOCKS_EXCLUDED(rpc_lock());
394
CallbackStarted()395 void CallbackStarted() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
396 callbacks_executing_ += 1;
397 }
398
CallbackFinished()399 void CallbackFinished() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
400 callbacks_executing_ -= 1;
401 }
402
403 // This call must be in a closed state when this is called.
404 void MoveFrom(Call& other) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
405
endpoint()406 Endpoint& endpoint() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
407 return *endpoint_;
408 }
409
410 // Public function that sets the on_next function in the raw API.
set_on_next(Function<void (ConstByteSpan)> && on_next)411 void set_on_next(Function<void(ConstByteSpan)>&& on_next)
412 PW_LOCKS_EXCLUDED(rpc_lock()) {
413 RpcLockGuard lock;
414 set_on_next_locked(std::move(on_next));
415 }
416
417 // Internal function that sets on_next.
set_on_next_locked(Function<void (ConstByteSpan)> && on_next)418 void set_on_next_locked(Function<void(ConstByteSpan)>&& on_next)
419 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
420 on_next_ = std::move(on_next);
421 }
422
423 // Public function that sets the on_error callback.
set_on_error(Function<void (Status)> && on_error)424 void set_on_error(Function<void(Status)>&& on_error)
425 PW_LOCKS_EXCLUDED(rpc_lock()) {
426 RpcLockGuard lock;
427 set_on_error_locked(std::move(on_error));
428 }
429
430 // Internal function that sets on_error.
set_on_error_locked(Function<void (Status)> && on_error)431 void set_on_error_locked(Function<void(Status)>&& on_error)
432 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
433 on_error_ = std::move(on_error);
434 }
435
MarkStreamCompleted()436 void MarkStreamCompleted() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
437 state_ |= kClientRequestedCompletion;
438 }
439
440 // Closes a client call. Sends a CLIENT_REQUEST_COMPLETION for client /
441 // bidirectional streaming RPCs if not sent yet.
442 void CloseClientCall() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
443
444 // Closes a server call.
CloseAndSendResponseLocked(Status status)445 Status CloseAndSendResponseLocked(Status status)
446 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
447 return CloseAndSendFinalPacketLocked(
448 pwpb::PacketType::RESPONSE, {}, status);
449 }
450
451 // Cancels an RPC. Public function for client calls only.
Cancel()452 Status Cancel() PW_LOCKS_EXCLUDED(rpc_lock()) {
453 RpcLockGuard lock;
454 return CloseAndSendFinalPacketLocked(
455 pwpb::PacketType::CLIENT_ERROR, {}, Status::Cancelled());
456 }
457
458 // Unregisters the RPC from the endpoint & marks as closed. The call may be
459 // active or inactive when this is called.
460 void UnregisterAndMarkClosed() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
461
462 // Define conversions to the generic server/client RPC writer class.
as_writer()463 constexpr Writer& as_writer() { return *this; }
as_writer()464 constexpr const Writer& as_writer() const { return *this; }
465
466 // Indicates if the on_next and unary on_completed callbacks are internal
467 // wrappers that decode the raw proto before invoking the user's callback. If
468 // they are, the lock must be held when they are invoked.
hold_lock_while_invoking_callback_with_payload()469 bool hold_lock_while_invoking_callback_with_payload() const
470 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
471 return properties_.callback_proto_type() == kProtoStruct;
472 }
473
474 // Decodes a raw protobuf into a proto struct (pwpb or Nanopb) and invokes the
475 // pwpb or Nanopb version of the on_next callback.
476 //
477 // This must ONLY be called from derived classes the wrap the on_next
478 // callback. These classes MUST indicate that they call calls in their
479 // constructor.
480 template <typename Decoder, typename ProtoStruct>
DecodeToStructAndInvokeOnNext(ConstByteSpan payload,const Decoder & decoder,Function<void (const ProtoStruct &)> & proto_on_next)481 void DecodeToStructAndInvokeOnNext(
482 ConstByteSpan payload,
483 const Decoder& decoder,
484 Function<void(const ProtoStruct&)>& proto_on_next)
485 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
486 if (proto_on_next == nullptr) {
487 return;
488 }
489
490 ProtoStruct proto_struct{};
491
492 if (!decoder.Decode(payload, proto_struct).ok()) {
493 CloseAndMarkForCleanup(Status::DataLoss());
494 return;
495 }
496
497 const uint32_t original_id = id();
498 auto proto_on_next_local = std::move(proto_on_next);
499
500 rpc_lock().unlock();
501 proto_on_next_local(proto_struct);
502 rpc_lock().lock();
503
504 // Restore the original callback if the original call is still active and
505 // the callback has not been replaced.
506 // NOLINTNEXTLINE(bugprone-use-after-move)
507 if (active_locked() && id() == original_id && proto_on_next == nullptr) {
508 proto_on_next = std::move(proto_on_next_local);
509 }
510 }
511
512 // The call is already unregistered and closed.
513 template <typename Decoder, typename ProtoStruct>
DecodeToStructAndInvokeOnCompleted(ConstByteSpan payload,const Decoder & decoder,Function<void (const ProtoStruct &,Status)> & proto_on_completed,Status status)514 void DecodeToStructAndInvokeOnCompleted(
515 ConstByteSpan payload,
516 const Decoder& decoder,
517 Function<void(const ProtoStruct&, Status)>& proto_on_completed,
518 Status status) PW_UNLOCK_FUNCTION(rpc_lock()) {
519 // Always move proto_on_completed so it goes out of scope in this function.
520 auto proto_on_completed_local = std::move(proto_on_completed);
521
522 // Move on_error in case an error occurs.
523 auto on_error_local = std::move(on_error_);
524
525 // Release the lock before decoding, since decoder is a global.
526 rpc_lock().unlock();
527
528 if (proto_on_completed_local == nullptr) {
529 return;
530 }
531
532 ProtoStruct proto_struct{};
533 if (decoder.Decode(payload, proto_struct).ok()) {
534 proto_on_completed_local(proto_struct, status);
535 } else if (on_error_local != nullptr) {
536 on_error_local(Status::DataLoss());
537 }
538 }
539
540 // An active call cannot be moved if its callbacks are running. This function
541 // must be called on the call being moved before updating any state.
542 static void WaitUntilReadyForMove(Call& destination, Call& source)
543 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
544
545 private:
546 friend class rpc::Writer;
547
548 enum State : uint8_t {
549 kActive = 0b001,
550 kClientRequestedCompletion = 0b010,
551 kHasBeenDestroyed = 0b100,
552 };
553
554 // Common constructor for server & client calls.
555 Call(LockedEndpoint& endpoint,
556 uint32_t id,
557 uint32_t channel_id,
558 uint32_t service_id,
559 uint32_t method_id,
560 CallProperties properties);
561
562 Packet MakePacket(pwpb::PacketType type,
563 ConstByteSpan payload,
564 Status status = OkStatus()) const
PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock ())565 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
566 return Packet(type,
567 channel_id_locked(),
568 service_id(),
569 method_id(),
570 id_,
571 payload,
572 status);
573 }
574
575 // Marks a call object closed without doing anything else. The call is not
576 // removed from the calls list and no callbacks are called.
MarkClosed()577 void MarkClosed() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
578 channel_id_ = Channel::kUnassignedChannelId;
579 id_ = 0;
580 state_ = kClientRequestedCompletion;
581 }
582
583 // Calls the on_error callback without closing the RPC. This is used when the
584 // call has already completed.
585 void CallOnError(Status error) PW_UNLOCK_FUNCTION(rpc_lock());
586
587 // If required, removes this call from the endpoint's to_cleanup_ list and
588 // calls CleanUp(). Returns true if cleanup was required, which means the lock
589 // was released.
590 bool CleanUpIfRequired() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
591
592 // Sends a payload with the specified type. The payload may either be in a
593 // previously acquired buffer or in a standalone buffer.
594 //
595 // Returns FAILED_PRECONDITION if the call is not active().
596 Status SendPacket(pwpb::PacketType type,
597 ConstByteSpan payload,
598 Status status = OkStatus())
599 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
600
601 Status CloseAndSendFinalPacketLocked(pwpb::PacketType type,
602 ConstByteSpan response,
603 Status status)
604 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
605
606 Status TryCloseAndSendFinalPacketLocked(pwpb::PacketType type,
607 ConstByteSpan response,
608 Status status)
609 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
610
CallbacksAreRunning()611 bool CallbacksAreRunning() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
612 return callbacks_executing_ != 0u;
613 }
614
615 // Waits for callbacks to complete so that a call object can be destroyed.
616 void WaitForCallbacksToComplete() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
617
618 Endpoint* endpoint_ PW_GUARDED_BY(rpc_lock());
619 uint32_t channel_id_ PW_GUARDED_BY(rpc_lock());
620 uint32_t id_ PW_GUARDED_BY(rpc_lock());
621 uint32_t service_id_ PW_GUARDED_BY(rpc_lock());
622 uint32_t method_id_ PW_GUARDED_BY(rpc_lock());
623
624 // State of call and client stream.
625 //
626 // bit 0: call is active
627 // bit 1: client stream is active
628 // bit 2: call has been destroyed
629 uint8_t state_ PW_GUARDED_BY(rpc_lock());
630
631 // If non-OK, indicates that the call was closed and needs to have its
632 // on_error called with this Status code. Uses a uint8_t for compactness.
633 uint8_t awaiting_cleanup_ PW_GUARDED_BY(rpc_lock());
634
635 // Tracks how many of this call's callbacks are running. Must be 0 for the
636 // call to be destroyed.
637 uint8_t callbacks_executing_ PW_GUARDED_BY(rpc_lock());
638
639 CallProperties properties_ PW_GUARDED_BY(rpc_lock());
640
641 // Called when the RPC is terminated due to an error.
642 Function<void(Status error)> on_error_ PW_GUARDED_BY(rpc_lock());
643
644 // Called when a request is received. Only used for RPCs with client streams.
645 // The raw payload buffer is passed to the callback.
646 Function<void(ConstByteSpan payload)> on_next_ PW_GUARDED_BY(rpc_lock());
647 };
648
649 } // namespace internal
650
active()651 inline bool Writer::active() const {
652 return static_cast<const internal::Call*>(this)->active();
653 }
654
channel_id()655 inline uint32_t Writer::channel_id() const {
656 return static_cast<const internal::Call*>(this)->channel_id();
657 }
658
Write(ConstByteSpan payload)659 inline Status Writer::Write(ConstByteSpan payload) {
660 return static_cast<internal::Call*>(this)->Write(payload);
661 }
662
Write(const Function<StatusWithSize (ByteSpan)> & callback)663 inline Status Writer::Write(
664 const Function<StatusWithSize(ByteSpan)>& callback) {
665 return static_cast<internal::Call*>(this)->Write(callback);
666 }
667
668 } // namespace pw::rpc
669