1 // Copyright 2024 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 #include <cstdint> 17 #include <optional> 18 19 #include "pw_bytes/span.h" 20 #include "pw_grpc/connection.h" 21 #include "pw_rpc/channel.h" 22 #include "pw_rpc/internal/packet.h" 23 24 namespace pw::grpc { 25 26 class GrpcChannelOutput : public rpc::ChannelOutput { 27 public: GrpcChannelOutput()28 GrpcChannelOutput() : pw::rpc::ChannelOutput("grpc") {} 29 30 class StreamCallbacks { 31 public: 32 // Called when a stream is completed from the server. Called on the same 33 // thread as Send is called on. 34 virtual void OnClose(StreamId) = 0; 35 }; 36 set_callbacks(StreamCallbacks & callbacks)37 void set_callbacks(StreamCallbacks& callbacks) { callbacks_ = callbacks; } 38 set_connection(Connection & conn)39 void set_connection(Connection& conn) { connection_ = conn; } 40 Send(ConstByteSpan data)41 Status Send(ConstByteSpan data) override { 42 using pw::rpc::internal::pwpb::PacketType; 43 44 if (!connection_.has_value()) { 45 return Status::FailedPrecondition(); 46 } 47 // TODO: b/319162657 - Avoid this extra decode 48 PW_TRY_ASSIGN(rpc::internal::Packet packet, 49 rpc::internal::Packet::FromBuffer(data)); 50 51 switch (packet.type()) { 52 case PacketType::kResponse: 53 if (packet.payload().size()) { 54 PW_TRY(connection_->get().SendResponseMessage(packet.call_id(), 55 packet.payload())); 56 } 57 PW_TRY(connection_->get().SendResponseComplete(packet.call_id(), 58 packet.status())); 59 if (callbacks_.has_value()) { 60 callbacks_->get().OnClose(packet.call_id()); 61 } 62 break; 63 case PacketType::kServerStream: 64 PW_TRY(connection_->get().SendResponseMessage(packet.call_id(), 65 packet.payload())); 66 break; 67 case PacketType::kServerError: 68 PW_TRY(connection_->get().SendResponseComplete(packet.call_id(), 69 packet.status())); 70 if (callbacks_.has_value()) { 71 callbacks_->get().OnClose(packet.call_id()); 72 } 73 break; 74 default: 75 return Status::FailedPrecondition(); 76 } 77 78 return OkStatus(); 79 } 80 81 private: 82 std::optional<std::reference_wrapper<StreamCallbacks>> callbacks_; 83 std::optional<std::reference_wrapper<Connection>> connection_; 84 }; 85 86 } // namespace pw::grpc 87