xref: /aosp_15_r20/external/grpc-grpc/src/core/load_balancing/grpclb/load_balancer_api.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/load_balancing/grpclb/load_balancer_api.h"
22 
23 #include <string.h>
24 
25 #include <algorithm>
26 
27 #include "google/protobuf/duration.upb.h"
28 #include "google/protobuf/timestamp.upb.h"
29 #include "upb/base/string_view.h"
30 
31 #include <grpc/support/log.h>
32 #include <grpc/support/time.h>
33 
34 #include "src/core/lib/gprpp/memory.h"
35 #include "src/proto/grpc/lb/v1/load_balancer.upb.h"
36 
37 namespace grpc_core {
38 
operator ==(const GrpcLbServer & other) const39 bool GrpcLbServer::operator==(const GrpcLbServer& other) const {
40   if (ip_size != other.ip_size) return false;
41   int r = memcmp(ip_addr, other.ip_addr, ip_size);
42   if (r != 0) return false;
43   if (port != other.port) return false;
44   r = strncmp(load_balance_token, other.load_balance_token,
45               sizeof(load_balance_token));
46   if (r != 0) return false;
47   return drop == other.drop;
48 }
49 
50 namespace {
51 
grpc_grpclb_request_encode(const grpc_lb_v1_LoadBalanceRequest * request,upb_Arena * arena)52 grpc_slice grpc_grpclb_request_encode(
53     const grpc_lb_v1_LoadBalanceRequest* request, upb_Arena* arena) {
54   size_t buf_length;
55   char* buf =
56       grpc_lb_v1_LoadBalanceRequest_serialize(request, arena, &buf_length);
57   return grpc_slice_from_copied_buffer(buf, buf_length);
58 }
59 
60 }  // namespace
61 
GrpcLbRequestCreate(absl::string_view lb_service_name,upb_Arena * arena)62 grpc_slice GrpcLbRequestCreate(absl::string_view lb_service_name,
63                                upb_Arena* arena) {
64   grpc_lb_v1_LoadBalanceRequest* req = grpc_lb_v1_LoadBalanceRequest_new(arena);
65   grpc_lb_v1_InitialLoadBalanceRequest* initial_request =
66       grpc_lb_v1_LoadBalanceRequest_mutable_initial_request(req, arena);
67   size_t name_len = std::min(lb_service_name.size(),
68                              size_t{GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH});
69   grpc_lb_v1_InitialLoadBalanceRequest_set_name(
70       initial_request,
71       upb_StringView_FromDataAndSize(lb_service_name.data(), name_len));
72   return grpc_grpclb_request_encode(req, arena);
73 }
74 
75 namespace {
76 
google_protobuf_Timestamp_assign(google_protobuf_Timestamp * timestamp,const gpr_timespec & value)77 void google_protobuf_Timestamp_assign(google_protobuf_Timestamp* timestamp,
78                                       const gpr_timespec& value) {
79   google_protobuf_Timestamp_set_seconds(timestamp, value.tv_sec);
80   google_protobuf_Timestamp_set_nanos(timestamp, value.tv_nsec);
81 }
82 
83 }  // namespace
84 
GrpcLbLoadReportRequestCreate(int64_t num_calls_started,int64_t num_calls_finished,int64_t num_calls_finished_with_client_failed_to_send,int64_t num_calls_finished_known_received,const GrpcLbClientStats::DroppedCallCounts * drop_token_counts,upb_Arena * arena)85 grpc_slice GrpcLbLoadReportRequestCreate(
86     int64_t num_calls_started, int64_t num_calls_finished,
87     int64_t num_calls_finished_with_client_failed_to_send,
88     int64_t num_calls_finished_known_received,
89     const GrpcLbClientStats::DroppedCallCounts* drop_token_counts,
90     upb_Arena* arena) {
91   grpc_lb_v1_LoadBalanceRequest* req = grpc_lb_v1_LoadBalanceRequest_new(arena);
92   grpc_lb_v1_ClientStats* req_stats =
93       grpc_lb_v1_LoadBalanceRequest_mutable_client_stats(req, arena);
94   google_protobuf_Timestamp_assign(
95       grpc_lb_v1_ClientStats_mutable_timestamp(req_stats, arena),
96       gpr_now(GPR_CLOCK_REALTIME));
97   grpc_lb_v1_ClientStats_set_num_calls_started(req_stats, num_calls_started);
98   grpc_lb_v1_ClientStats_set_num_calls_finished(req_stats, num_calls_finished);
99   grpc_lb_v1_ClientStats_set_num_calls_finished_with_client_failed_to_send(
100       req_stats, num_calls_finished_with_client_failed_to_send);
101   grpc_lb_v1_ClientStats_set_num_calls_finished_known_received(
102       req_stats, num_calls_finished_known_received);
103   if (drop_token_counts != nullptr) {
104     for (size_t i = 0; i < drop_token_counts->size(); ++i) {
105       const GrpcLbClientStats::DropTokenCount& cur = (*drop_token_counts)[i];
106       grpc_lb_v1_ClientStatsPerToken* cur_msg =
107           grpc_lb_v1_ClientStats_add_calls_finished_with_drop(req_stats, arena);
108       const size_t token_len = strlen(cur.token.get());
109       char* token = reinterpret_cast<char*>(upb_Arena_Malloc(arena, token_len));
110       memcpy(token, cur.token.get(), token_len);
111       grpc_lb_v1_ClientStatsPerToken_set_load_balance_token(
112           cur_msg, upb_StringView_FromDataAndSize(token, token_len));
113       grpc_lb_v1_ClientStatsPerToken_set_num_calls(cur_msg, cur.count);
114     }
115   }
116   return grpc_grpclb_request_encode(req, arena);
117 }
118 
119 namespace {
120 
ParseServerList(const grpc_lb_v1_LoadBalanceResponse & response,std::vector<GrpcLbServer> * server_list)121 bool ParseServerList(const grpc_lb_v1_LoadBalanceResponse& response,
122                      std::vector<GrpcLbServer>* server_list) {
123   // Determine the number of servers.
124   const grpc_lb_v1_ServerList* server_list_msg =
125       grpc_lb_v1_LoadBalanceResponse_server_list(&response);
126   if (server_list_msg == nullptr) return false;
127   size_t server_count = 0;
128   const grpc_lb_v1_Server* const* servers =
129       grpc_lb_v1_ServerList_servers(server_list_msg, &server_count);
130   // Populate servers.
131   if (server_count > 0) {
132     server_list->reserve(server_count);
133     for (size_t i = 0; i < server_count; ++i) {
134       GrpcLbServer& cur = *server_list->emplace(server_list->end());
135       upb_StringView address = grpc_lb_v1_Server_ip_address(servers[i]);
136       if (address.size == 0) {
137         ;  // Nothing to do because cur->ip_address is an empty string.
138       } else if (address.size <= GRPC_GRPCLB_SERVER_IP_ADDRESS_MAX_SIZE) {
139         cur.ip_size = static_cast<int32_t>(address.size);
140         memcpy(cur.ip_addr, address.data, address.size);
141       }
142       cur.port = grpc_lb_v1_Server_port(servers[i]);
143       upb_StringView token = grpc_lb_v1_Server_load_balance_token(servers[i]);
144       if (token.size == 0) {
145         ;  // Nothing to do because cur->load_balance_token is an empty string.
146       } else if (token.size <= GRPC_GRPCLB_SERVER_LOAD_BALANCE_TOKEN_MAX_SIZE) {
147         memcpy(cur.load_balance_token, token.data, token.size);
148       } else {
149         gpr_log(GPR_ERROR,
150                 "grpc_lb_v1_LoadBalanceResponse has too long token. len=%zu",
151                 token.size);
152       }
153       cur.drop = grpc_lb_v1_Server_drop(servers[i]);
154     }
155   }
156   return true;
157 }
158 
ParseDuration(const google_protobuf_Duration * duration_pb)159 Duration ParseDuration(const google_protobuf_Duration* duration_pb) {
160   return Duration::FromSecondsAndNanoseconds(
161       google_protobuf_Duration_seconds(duration_pb),
162       google_protobuf_Duration_nanos(duration_pb));
163 }
164 
165 }  // namespace
166 
GrpcLbResponseParse(const grpc_slice & serialized_response,upb_Arena * arena,GrpcLbResponse * result)167 bool GrpcLbResponseParse(const grpc_slice& serialized_response,
168                          upb_Arena* arena, GrpcLbResponse* result) {
169   grpc_lb_v1_LoadBalanceResponse* response =
170       grpc_lb_v1_LoadBalanceResponse_parse(
171           reinterpret_cast<const char*>(
172               GRPC_SLICE_START_PTR(serialized_response)),
173           GRPC_SLICE_LENGTH(serialized_response), arena);
174   // Handle serverlist responses.
175   if (ParseServerList(*response, &result->serverlist)) {
176     result->type = result->SERVERLIST;
177     return true;
178   }
179   // Handle initial responses.
180   auto* initial_response =
181       grpc_lb_v1_LoadBalanceResponse_initial_response(response);
182   if (initial_response != nullptr) {
183     result->type = result->INITIAL;
184     const google_protobuf_Duration* client_stats_report_interval =
185         grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval(
186             initial_response);
187     if (client_stats_report_interval != nullptr) {
188       result->client_stats_report_interval =
189           ParseDuration(client_stats_report_interval);
190     }
191     return true;
192   }
193   // Handle fallback.
194   if (grpc_lb_v1_LoadBalanceResponse_has_fallback_response(response)) {
195     result->type = result->FALLBACK;
196     return true;
197   }
198   // Unknown response type.
199   return false;
200 }
201 
202 }  // namespace grpc_core
203