xref: /aosp_15_r20/external/pigweed/pw_grpc/public/pw_grpc/grpc_channel_output.h (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include <cstdint>
17 #include <optional>
18 
19 #include "pw_bytes/span.h"
20 #include "pw_grpc/connection.h"
21 #include "pw_rpc/channel.h"
22 #include "pw_rpc/internal/packet.h"
23 
24 namespace pw::grpc {
25 
26 class GrpcChannelOutput : public rpc::ChannelOutput {
27  public:
GrpcChannelOutput()28   GrpcChannelOutput() : pw::rpc::ChannelOutput("grpc") {}
29 
30   class StreamCallbacks {
31    public:
32     // Called when a stream is completed from the server. Called on the same
33     // thread as Send is called on.
34     virtual void OnClose(StreamId) = 0;
35   };
36 
set_callbacks(StreamCallbacks & callbacks)37   void set_callbacks(StreamCallbacks& callbacks) { callbacks_ = callbacks; }
38 
set_connection(Connection & conn)39   void set_connection(Connection& conn) { connection_ = conn; }
40 
Send(ConstByteSpan data)41   Status Send(ConstByteSpan data) override {
42     using pw::rpc::internal::pwpb::PacketType;
43 
44     if (!connection_.has_value()) {
45       return Status::FailedPrecondition();
46     }
47     // TODO: b/319162657 - Avoid this extra decode
48     PW_TRY_ASSIGN(rpc::internal::Packet packet,
49                   rpc::internal::Packet::FromBuffer(data));
50 
51     switch (packet.type()) {
52       case PacketType::kResponse:
53         if (packet.payload().size()) {
54           PW_TRY(connection_->get().SendResponseMessage(packet.call_id(),
55                                                         packet.payload()));
56         }
57         PW_TRY(connection_->get().SendResponseComplete(packet.call_id(),
58                                                        packet.status()));
59         if (callbacks_.has_value()) {
60           callbacks_->get().OnClose(packet.call_id());
61         }
62         break;
63       case PacketType::kServerStream:
64         PW_TRY(connection_->get().SendResponseMessage(packet.call_id(),
65                                                       packet.payload()));
66         break;
67       case PacketType::kServerError:
68         PW_TRY(connection_->get().SendResponseComplete(packet.call_id(),
69                                                        packet.status()));
70         if (callbacks_.has_value()) {
71           callbacks_->get().OnClose(packet.call_id());
72         }
73         break;
74       default:
75         return Status::FailedPrecondition();
76     }
77 
78     return OkStatus();
79   }
80 
81  private:
82   std::optional<std::reference_wrapper<StreamCallbacks>> callbacks_;
83   std::optional<std::reference_wrapper<Connection>> connection_;
84 };
85 
86 }  // namespace pw::grpc
87