xref: /aosp_15_r20/external/pigweed/pw_rpc/call.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2020 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_rpc/internal/call.h"
16 
17 #include "pw_assert/check.h"
18 #include "pw_bytes/span.h"
19 #include "pw_log/log.h"
20 #include "pw_preprocessor/util.h"
21 #include "pw_rpc/channel.h"
22 #include "pw_rpc/client.h"
23 #include "pw_rpc/internal/encoding_buffer.h"
24 #include "pw_rpc/internal/endpoint.h"
25 #include "pw_rpc/internal/method.h"
26 #include "pw_rpc/internal/packet.pwpb.h"
27 #include "pw_rpc/server.h"
28 #include "pw_status/status_with_size.h"
29 #include "pw_status/try.h"
30 
31 // If the callback timeout is enabled, count the number of iterations of the
32 // waiting loop and crash if it exceeds PW_RPC_CALLBACK_TIMEOUT_TICKS.
33 #if PW_RPC_CALLBACK_TIMEOUT_TICKS > 0
34 #define PW_RPC_CHECK_FOR_DEADLOCK(timeout_source, call) \
35   iterations += 1;                                      \
36   PW_CHECK(                                                                  \
37       iterations < PW_RPC_CALLBACK_TIMEOUT_TICKS,                            \
38       "A callback for RPC %u:%08x/%08x has not finished after "              \
39       PW_STRINGIFY(PW_RPC_CALLBACK_TIMEOUT_TICKS)                            \
40       " ticks. This may indicate that an RPC callback attempted to "         \
41       timeout_source                                                         \
42       " its own call object, which is not permitted. Fix this condition or " \
43       "change the value of PW_RPC_CALLBACK_TIMEOUT_TICKS to avoid this "     \
44       "crash. See https://pigweed.dev/pw_rpc"                                \
45       "#destructors-moves-wait-for-callbacks-to-complete for details.",      \
46       static_cast<unsigned>((call).channel_id_),                             \
47       static_cast<unsigned>((call).service_id_),                             \
48       static_cast<unsigned>((call).method_id_))
49 #else
50 #define PW_RPC_CHECK_FOR_DEADLOCK(timeout_source, call) \
51   static_cast<void>(iterations)
52 #endif  // PW_RPC_CALLBACK_TIMEOUT_TICKS > 0
53 
54 namespace pw::rpc::internal {
55 
56 using pwpb::PacketType;
57 
EncodeCallbackToPayloadBuffer(const Function<StatusWithSize (ByteSpan)> & callback)58 Result<ConstByteSpan> EncodeCallbackToPayloadBuffer(
59     const Function<StatusWithSize(ByteSpan)>& callback)
60     PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
61   if (callback == nullptr) {
62     return Status::InvalidArgument();
63   }
64 
65   ByteSpan payload_buffer =
66       encoding_buffer.AllocatePayloadBuffer(MaxSafePayloadSize());
67   PW_TRY_ASSIGN(const size_t payload_size, callback(payload_buffer));
68 
69   return payload_buffer.first(payload_size);
70 }
71 
72 // Creates an active server-side Call.
Call(const LockedCallContext & context,CallProperties properties)73 Call::Call(const LockedCallContext& context, CallProperties properties)
74     : Call(context.server().ClaimLocked(),
75            context.call_id(),
76            context.channel_id(),
77            UnwrapServiceId(context.service().service_id()),
78            context.method().id(),
79            properties) {}
80 
81 // Creates an active client-side call, assigning it a new ID.
Call(LockedEndpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,CallProperties properties)82 Call::Call(LockedEndpoint& client,
83            uint32_t channel_id,
84            uint32_t service_id,
85            uint32_t method_id,
86            CallProperties properties)
87     : Call(client,
88            client.NewCallId(),
89            channel_id,
90            service_id,
91            method_id,
92            properties) {}
93 
Call(LockedEndpoint & endpoint_ref,uint32_t call_id,uint32_t channel_id,uint32_t service_id,uint32_t method_id,CallProperties properties)94 Call::Call(LockedEndpoint& endpoint_ref,
95            uint32_t call_id,
96            uint32_t channel_id,
97            uint32_t service_id,
98            uint32_t method_id,
99            CallProperties properties)
100     : endpoint_(&endpoint_ref),
101       channel_id_(channel_id),
102       id_(call_id),
103       service_id_(service_id),
104       method_id_(method_id),
105       // Note: Bit kActive set to 1 and kClientRequestedCompletion is set to 0.
106       state_(kActive),
107       awaiting_cleanup_(OkStatus().code()),
108       callbacks_executing_(0),
109       properties_(properties) {
110   PW_CHECK_UINT_NE(channel_id,
111                    Channel::kUnassignedChannelId,
112                    "Calls cannot be created with channel ID 0 "
113                    "(Channel::kUnassignedChannelId)");
114   endpoint().RegisterCall(*this);
115 }
116 
DestroyServerCall()117 void Call::DestroyServerCall() {
118   RpcLockGuard lock;
119   // Any errors are logged in Channel::Send.
120   CloseAndSendResponseLocked(OkStatus()).IgnoreError();
121   WaitForCallbacksToComplete();
122   state_ |= kHasBeenDestroyed;
123 }
124 
DestroyClientCall()125 void Call::DestroyClientCall() {
126   RpcLockGuard lock;
127   CloseClientCall();
128   WaitForCallbacksToComplete();
129   state_ |= kHasBeenDestroyed;
130 }
131 
WaitForCallbacksToComplete()132 void Call::WaitForCallbacksToComplete() {
133   do {
134     int iterations = 0;
135     while (CallbacksAreRunning()) {
136       PW_RPC_CHECK_FOR_DEADLOCK("destroy", *this);
137       YieldRpcLock();
138     }
139 
140   } while (CleanUpIfRequired());
141 }
142 
MoveFrom(Call & other)143 void Call::MoveFrom(Call& other) {
144   PW_DCHECK(!active_locked());
145   PW_DCHECK(!awaiting_cleanup() && !other.awaiting_cleanup());
146 
147   // An active call with an executing callback cannot be moved. Derived call
148   // classes must wait for callbacks to finish before calling MoveFrom.
149   PW_DCHECK(!other.active_locked() || !other.CallbacksAreRunning());
150 
151   // Copy all members from the other call.
152   endpoint_ = other.endpoint_;
153   channel_id_ = other.channel_id_;
154   id_ = other.id_;
155   service_id_ = other.service_id_;
156   method_id_ = other.method_id_;
157 
158   state_ = other.state_;
159 
160   // No need to move awaiting_cleanup_, since it is 0 in both calls here.
161 
162   properties_ = other.properties_;
163 
164   // callbacks_executing_ is not moved since it is associated with the object in
165   // memory, not the call.
166 
167   on_error_ = std::move(other.on_error_);
168   on_next_ = std::move(other.on_next_);
169 
170   if (other.active_locked()) {
171     // Mark the other call inactive, unregister it, and register this one.
172     other.MarkClosed();
173     endpoint().UnregisterCall(other);
174     endpoint().RegisterUniqueCall(*this);
175   }
176 }
177 
WaitUntilReadyForMove(Call & destination,Call & source)178 void Call::WaitUntilReadyForMove(Call& destination, Call& source) {
179   do {
180     // Wait for the source's callbacks to finish if it is active.
181     int iterations = 0;
182     while (source.active_locked() && source.CallbacksAreRunning()) {
183       PW_RPC_CHECK_FOR_DEADLOCK("move", source);
184       YieldRpcLock();
185     }
186 
187     // At this point, no callbacks are running in the source call. If cleanup
188     // is required for the destination call, perform it and retry since
189     // cleanup releases and reacquires the RPC lock.
190   } while (source.CleanUpIfRequired() || destination.CleanUpIfRequired());
191 }
192 
CallOnError(Status error)193 void Call::CallOnError(Status error) {
194   auto on_error_local = std::move(on_error_);
195 
196   CallbackStarted();
197 
198   rpc_lock().unlock();
199   if (on_error_local) {
200     on_error_local(error);
201   }
202 
203   // This mutex lock could be avoided by making callbacks_executing_ atomic.
204   RpcLockGuard lock;
205   CallbackFinished();
206 }
207 
CleanUpIfRequired()208 bool Call::CleanUpIfRequired() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
209   if (!awaiting_cleanup()) {
210     return false;
211   }
212   endpoint_->CleanUpCall(*this);
213   rpc_lock().lock();
214   return true;
215 }
216 
SendPacket(PacketType type,ConstByteSpan payload,Status status)217 Status Call::SendPacket(PacketType type, ConstByteSpan payload, Status status) {
218   if (!active_locked()) {
219     encoding_buffer.ReleaseIfAllocated();
220     return Status::FailedPrecondition();
221   }
222 
223   ChannelBase* channel = endpoint_->GetInternalChannel(channel_id_);
224   if (channel == nullptr) {
225     encoding_buffer.ReleaseIfAllocated();
226     return Status::Unavailable();
227   }
228   return channel->Send(MakePacket(type, payload, status));
229 }
230 
CloseAndSendResponseCallbackLocked(const Function<StatusWithSize (ByteSpan)> & callback,Status status)231 Status Call::CloseAndSendResponseCallbackLocked(
232     const Function<StatusWithSize(ByteSpan)>& callback, Status status) {
233   PW_TRY_ASSIGN(ConstByteSpan payload, EncodeCallbackToPayloadBuffer(callback));
234   return CloseAndSendFinalPacketLocked(
235       pwpb::PacketType::RESPONSE, payload, status);
236 }
237 
TryCloseAndSendResponseCallbackLocked(const Function<StatusWithSize (ByteSpan)> & callback,Status status)238 Status Call::TryCloseAndSendResponseCallbackLocked(
239     const Function<StatusWithSize(ByteSpan)>& callback, Status status) {
240   PW_TRY_ASSIGN(ConstByteSpan payload, EncodeCallbackToPayloadBuffer(callback));
241   return TryCloseAndSendFinalPacketLocked(
242       pwpb::PacketType::RESPONSE, payload, status);
243 }
244 
CloseAndSendFinalPacketLocked(PacketType type,ConstByteSpan response,Status status)245 Status Call::CloseAndSendFinalPacketLocked(PacketType type,
246                                            ConstByteSpan response,
247                                            Status status) {
248   const Status send_status = SendPacket(type, response, status);
249   UnregisterAndMarkClosed();
250   return send_status;
251 }
252 
TryCloseAndSendFinalPacketLocked(PacketType type,ConstByteSpan response,Status status)253 Status Call::TryCloseAndSendFinalPacketLocked(PacketType type,
254                                               ConstByteSpan response,
255                                               Status status) {
256   const Status send_status = SendPacket(type, response, status);
257   // Only close the call if the final packet gets sent out successfully.
258   if (send_status.ok()) {
259     UnregisterAndMarkClosed();
260   }
261   return send_status;
262 }
263 
WriteLocked(ConstByteSpan payload)264 Status Call::WriteLocked(ConstByteSpan payload) {
265   return SendPacket(properties_.call_type() == kServerCall
266                         ? PacketType::SERVER_STREAM
267                         : PacketType::CLIENT_STREAM,
268                     payload);
269 }
270 
WriteCallbackLocked(const Function<StatusWithSize (ByteSpan)> & callback)271 Status Call::WriteCallbackLocked(
272     const Function<StatusWithSize(ByteSpan)>& callback) {
273   PW_TRY_ASSIGN(ConstByteSpan payload, EncodeCallbackToPayloadBuffer(callback));
274   return SendPacket(properties_.call_type() == kServerCall
275                         ? PacketType::SERVER_STREAM
276                         : PacketType::CLIENT_STREAM,
277                     payload);
278 }
279 
280 // This definition is in the .cc file because the Endpoint class is not defined
281 // in the Call header, due to circular dependencies between the two.
CloseAndMarkForCleanup(Status error)282 void Call::CloseAndMarkForCleanup(Status error) {
283   endpoint_->CloseCallAndMarkForCleanup(*this, error);
284 }
285 
HandlePayload(ConstByteSpan payload)286 void Call::HandlePayload(ConstByteSpan payload) {
287   // pw_rpc only supports handling packets for a particular RPC one at a time.
288   // Check if any callbacks are running and drop the packet if they are.
289   //
290   // The on_next callback cannot support multiple packets at once since it is
291   // moved before it is invoked. on_error and on_completed are only called
292   // after the call is closed.
293   if (CallbacksAreRunning()) {
294     PW_LOG_WARN(
295         "Received stream packet for %u:%08x/%08x before the callback for a "
296         "previous packet completed! This packet will be dropped. This can be "
297         "avoided by handling packets for a particular RPC on only one thread.",
298         static_cast<unsigned>(channel_id_),
299         static_cast<unsigned>(service_id_),
300         static_cast<unsigned>(method_id_));
301     rpc_lock().unlock();
302     return;
303   }
304 
305   if (on_next_ == nullptr) {
306     rpc_lock().unlock();
307     return;
308   }
309 
310   const uint32_t original_id = id();
311   auto on_next_local = std::move(on_next_);
312   CallbackStarted();
313 
314   if (hold_lock_while_invoking_callback_with_payload()) {
315     on_next_local(payload);
316   } else {
317     rpc_lock().unlock();
318     on_next_local(payload);
319     rpc_lock().lock();
320   }
321 
322   CallbackFinished();
323 
324   // Restore the original callback if the original call is still active and
325   // the callback has not been replaced.
326   // NOLINTNEXTLINE(bugprone-use-after-move)
327   if (active_locked() && id() == original_id && on_next_ == nullptr) {
328     on_next_ = std::move(on_next_local);
329   }
330 
331   // The call could have been reinitialized and cleaned up already by another
332   // thread that acquired the rpc_lock() while on_next_local was executing
333   // without lock held.
334   if (endpoint_ != nullptr) {
335     // Clean up calls in case decoding failed.
336     endpoint_->CleanUpCalls();
337   } else {
338     rpc_lock().unlock();
339   }
340 }
341 
CloseClientCall()342 void Call::CloseClientCall() {
343   // When a client call is closed, for bidirectional and client streaming RPCs,
344   // the server may be waiting for client stream messages, so we need to notify
345   // the server that the client has requested for completion and no further
346   // requests should be expected from the client. For unary and server streaming
347   // RPCs, since the client is not sending messages, server does not need to be
348   // notified.
349   if (has_client_stream() && !client_requested_completion()) {
350     RequestCompletionLocked().IgnoreError();
351   }
352   UnregisterAndMarkClosed();
353 }
354 
UnregisterAndMarkClosed()355 void Call::UnregisterAndMarkClosed() {
356   if (active_locked()) {
357     endpoint().UnregisterCall(*this);
358     MarkClosed();
359   }
360 }
361 
DebugLog() const362 void Call::DebugLog() const PW_NO_LOCK_SAFETY_ANALYSIS {
363   PW_LOG_INFO(
364       "Call %p\n"
365       "\tEndpoint: %p\n"
366       "\tCall ID:  %8u\n"
367       "\tChannel:  %8u\n"
368       "\tService:  %08x\n"
369       "\tMethod:   %08x\n"
370       "\tState:    %8x\n"
371       "\tCleanup:  %8s\n"
372       "\tBusy CBs: %8x\n"
373       "\tType:     %8d\n"
374       "\tClient:   %8d\n"
375       "\tWrapped:  %8d\n"
376       "\ton_error: %8d\n"
377       "\ton_next:  %8d\n",
378       static_cast<const void*>(this),
379       static_cast<const void*>(endpoint_),
380       static_cast<unsigned>(id_),
381       static_cast<unsigned>(channel_id_),
382       static_cast<unsigned>(service_id_),
383       static_cast<unsigned>(method_id_),
384       static_cast<int>(state_),
385       Status(static_cast<Status::Code>(awaiting_cleanup_)).str(),
386       static_cast<int>(callbacks_executing_),
387       static_cast<int>(properties_.method_type()),
388       static_cast<int>(properties_.call_type()),
389       static_cast<int>(hold_lock_while_invoking_callback_with_payload()),
390       static_cast<int>(on_error_ == nullptr),
391       static_cast<int>(on_next_ == nullptr));
392 }
393 
394 }  // namespace pw::rpc::internal
395