xref: /aosp_15_r20/external/grpc-grpc/src/php/tests/interop/xds_client.php (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1<?php
2/*
3 *
4 * Copyright 2020 gRPC authors.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 */
19
20/**
21 * This is the PHP xDS Interop test client. This script is meant to be run by
22 * the main xDS Interep test runner "run_xds_tests.py", not to be run
23 * by itself standalone.
24 */
25$autoload_path = realpath(dirname(__FILE__).'/../../vendor/autoload.php');
26require_once $autoload_path;
27
28class XdsUpdateClientConfigureService
29        extends \Grpc\Testing\XdsUpdateClientConfigureServiceStub
30{
31    function configure(
32        \Grpc\Testing\ClientConfigureRequest $request,
33        \Grpc\ServerContext $context
34    ): ?\Grpc\Testing\ClientConfigureResponse {
35        $rpc_types = $request->getTypes();
36        $all_metadata = $request->getMetadata();
37        $rpcs_to_send = [];
38        foreach ($rpc_types as $rpc_type) {
39            if ($rpc_type ==
40                \Grpc\Testing\ClientConfigureRequest\RpcType::EMPTY_CALL) {
41                $rpcs_to_send[] = 'EmptyCall';
42            } else if ($rpc_type ==
43                       \Grpc\Testing\ClientConfigureRequest\RpcType::UNARY_CALL) {
44                $rpcs_to_send[] = 'UnaryCall';
45            }
46        }
47        $metadata_to_send = [];
48        foreach ($all_metadata as $metadata) {
49            $rpc_type = $metadata->getType();
50            if ($rpc_type ==
51                \Grpc\Testing\ClientConfigureRequest\RpcType::EMPTY_CALL) {
52                $rpc_type_key = 'EmptyCall';
53            } else if ($rpc_type ==
54                       \Grpc\Testing\ClientConfigureRequest\RpcType::UNARY_CALL) {
55                $rpc_type_key = 'UnaryCall';
56            }
57            $key = $metadata->getKey();
58            $value = $metadata->getValue();
59            if (!isset($metadata_to_send[$rpc_type_key])) {
60                $metadata_to_send[$rpc_type_key] = [];
61            }
62            $metadata_to_send[$rpc_type_key][$key] = $value;
63        }
64        global $client_thread;
65        echo "PHP parent: Setting client_thread rpc_config to \n";
66        print_r($rpcs_to_send);
67        print_r($metadata_to_send);
68        echo "PHP parent: timeout_sec = ".$request->getTimeoutSec()."\n";
69        $client_thread->rpc_config->update($rpcs_to_send,
70                                           $metadata_to_send,
71                                           $request->getTimeoutSec());
72        return new Grpc\Testing\ClientConfigureResponse();
73    }
74}
75
76// The main xds interop test runner will ping this service to ask for
77// the stats of the distribution of the backends, for the next X rpcs.
78class LoadBalancerStatsService
79    extends \Grpc\Testing\LoadBalancerStatsServiceStub
80{
81    function getClientStats(
82        \Grpc\Testing\LoadBalancerStatsRequest $request,
83        \Grpc\ServerContext $context
84    ): ?\Grpc\Testing\LoadBalancerStatsResponse {
85        $num_rpcs = $request->getNumRpcs();
86        $timeout_sec = $request->getTimeoutSec();
87        $rpcs_by_method = [];
88        $rpcs_by_peer = [];
89        $num_failures = 0;
90
91        // Heavy limitation now: the server is blocking, until all
92        // the necessary num_rpcs are finished, or timeout is reached
93        global $client_thread;
94        $start_id = $client_thread->num_results + 1;
95        $end_id = $start_id + $num_rpcs;
96        $now = hrtime(true);
97        $timeout = $now[0] + ($now[1] / 1e9) + $timeout_sec;
98        while (true) {
99            $curr_hr = hrtime(true);
100            $curr_time = $curr_hr[0] + ($curr_hr[1] / 1e9);
101            if ($curr_time > $timeout) {
102                break;
103            }
104            // Thread variable seems to be read-only
105            $curr_id = $client_thread->num_results;
106            if ($curr_id >= $end_id) {
107                break;
108            }
109            usleep(50000);
110        }
111
112        // Tally up results
113        $end_id = min($end_id, $client_thread->num_results);
114        // "$client_thread->results" will be in the form of
115        // [
116        //   'rpc1' => [
117        //     'hostname1', '', 'hostname2', 'hostname1', '', ...
118        //   ],
119        //   'rpc2' => [
120        //     '', 'hostname1', 'hostname2', '', 'hostname2', ...
121        //   ],
122        // ]
123        foreach ((array)$client_thread->rpc_config->rpcs_to_send as $rpc) {
124            $results = $client_thread->results[$rpc];
125            // initialize, can always start from scratch here
126            $rpcs_by_method[$rpc] = [];
127            for ($i = $start_id; $i < $end_id; $i++) {
128                $hostname = $results[$i];
129                if ($hostname) {
130                    // initialize in case we haven't seen this hostname
131                    // before
132                    if (!array_key_exists($hostname, $rpcs_by_method[$rpc])) {
133                        $rpcs_by_method[$rpc][$hostname] = 0;
134                    }
135                    if (!array_key_exists($hostname, $rpcs_by_peer)) {
136                        $rpcs_by_peer[$hostname] = 0;
137                    }
138                    // increment the remote hostname distribution histogram
139                    // both by overall, and broken down per RPC
140                    $rpcs_by_method[$rpc][$hostname] += 1;
141                    $rpcs_by_peer[$hostname] += 1;
142                } else {
143                    // $num_failures here are counted per individual RPC
144                    $num_failures += 1;
145                }
146            }
147        }
148
149        // Convert our hashmaps above into protobuf objects
150        $response = new Grpc\Testing\LoadBalancerStatsResponse();
151        $rpcs_by_method_map = [];
152        foreach ($rpcs_by_method as $rpc => $rpcs_by_peer_per_method) {
153            $rpcs_by_peer_proto_obj
154                = new Grpc\Testing\LoadBalancerStatsResponse\RpcsByPeer();
155            $rpcs_by_peer_proto_obj->setRpcsByPeer($rpcs_by_peer_per_method);
156            $rpcs_by_method_map[$rpc] = $rpcs_by_peer_proto_obj;
157        }
158        $response->setRpcsByPeer($rpcs_by_peer);
159        $response->setRpcsByMethod($rpcs_by_method_map);
160        $response->setNumFailures($num_failures);
161        return $response;
162    }
163
164    function GetClientAccumulatedStats(
165        \Grpc\Testing\LoadBalancerAccumulatedStatsRequest $request,
166        \Grpc\ServerContext $context
167    ): ?\Grpc\Testing\LoadBalancerAccumulatedStatsResponse {
168        global $client_thread;
169        $response = new Grpc\Testing\LoadBalancerAccumulatedStatsResponse();
170        $response->setNumRpcsStartedByMethod(
171            (array)$client_thread->num_rpcs_started_by_method);
172        $response->setNumRpcsSucceededByMethod(
173            (array)$client_thread->num_rpcs_succeeded_by_method);
174        $response->setNumRpcsFailedByMethod(
175            (array)$client_thread->num_rpcs_failed_by_method);
176        $accumulated_method_stats
177            = (array)$client_thread->accumulated_method_stats;
178        $stats_per_method = [];
179        foreach ($accumulated_method_stats as $rpc_name => $stats) {
180            $methodStats
181                = new Grpc\Testing\LoadBalancerAccumulatedStatsResponse\MethodStats();
182            $methodStats->setRpcsStarted($stats['rpcs_started']);
183            $methodStats->setResult((array)$stats['result']);
184            $stats_per_method[$rpc_name] = $methodStats;
185        }
186        $response->setStatsPerMethod($stats_per_method);
187        return $response;
188    }
189}
190
191class RpcConfig extends Volatile {
192    public $server_address;
193    public $qps;
194    public $fail_on_failed_rpcs;
195    public $rpcs_to_send;
196    public $metadata_to_send;
197    public $tmp_file1;
198    public $tmp_file2;
199    public $timeout_sec;
200    public function __construct($server_address,
201                                $qps,
202                                $fail_on_failed_rpcs,
203                                $rpcs_to_send,
204                                $metadata_to_send,
205                                $tmp_file1,
206                                $tmp_file2) {
207        $this->server_address = $server_address;
208        $this->qps = $qps;
209        $this->fail_on_failed_rpcs = $fail_on_failed_rpcs;
210        $this->rpcs_to_send = (array)$rpcs_to_send;
211        $this->metadata_to_send = (array)$metadata_to_send;
212        $this->tmp_file1 = $tmp_file1;
213        $this->tmp_file2 = $tmp_file2;
214        $this->timeout_sec = 30;
215    }
216    public function update($rpcs_to_send, $metadata_to_send, $timeout_sec) {
217        $this->rpcs_to_send = (array)$rpcs_to_send;
218        $this->metadata_to_send = (array)$metadata_to_send;
219        $this->timeout_sec = $timeout_sec;
220    }
221}
222
223// This client thread blindly sends a unary RPC to the server once
224// every 1 / qps seconds.
225class ClientThread extends Thread {
226    private $target_seconds_between_rpcs_;
227    private $autoload_path_;
228    private $TIMEOUT_US = 30 * 1e6; // 30 seconds
229    public $rpc_config;
230    public $num_results = 0;
231    public $results;
232
233    public $RPC_MAP = [
234        'UnaryCall' => 'UNARY_CALL',
235        'EmptyCall' => 'EMPTY_CALL',
236    ];
237
238    public $num_rpcs_started_by_method = [];
239    public $num_rpcs_succeeded_by_method = [];
240    public $num_rpcs_failed_by_method = [];
241    public $accumulated_method_stats = [];
242
243    public function __construct($rpc_config,
244                                $autoload_path) {
245        $this->rpc_config = $rpc_config;
246        $this->target_seconds_between_rpcs_ = 1.0 / $rpc_config->qps;
247        $this->autoload_path_ = $autoload_path;
248        $this->simple_request = new Grpc\Testing\SimpleRequest();
249        $this->empty_request = new Grpc\Testing\EmptyMessage();
250        $this->results = [];
251        foreach (['UnaryCall', 'EmptyCall'] as $rpc) {
252            $this->results[$rpc] = [];
253        }
254        $this->outstanding_rpcs = [];
255        foreach (['UNARY_CALL', 'EMPTY_CALL'] as $rpc_stats_key) {
256            $this->num_rpcs_started_by_method[$rpc_stats_key] = 0;
257            $this->num_rpcs_succeeded_by_method[$rpc_stats_key] = 0;
258            $this->num_rpcs_failed_by_method[$rpc_stats_key] = 0;
259            $this->accumulated_method_stats[$rpc_stats_key] = [
260                'rpcs_started' => 0,
261                'result' => [],
262            ];
263        }
264    }
265
266    public function sendUnaryCall($stub, $metadata) {
267        $timeout = $this->rpc_config->timeout_sec ?
268                 $this->rpc_config->timeout_sec * 1e6 :
269                 $this->TIMEOUT_US;
270        return $stub->UnaryCall($this->simple_request,
271                                $metadata,
272                                ['timeout' => $timeout]);
273    }
274
275    public function sendEmptyCall($stub, $metadata) {
276        $timeout = $this->rpc_config->timeout_sec ?
277                 $this->rpc_config->timeout_sec * 1e6 :
278                 $this->TIMEOUT_US;
279        return $stub->EmptyCall($this->empty_request,
280                                $metadata,
281                                ['timeout' => $timeout]);
282    }
283
284    public function add_rpc_result($rpc, $status_code) {
285        // $rpc here needs to be in the format of 'UnaryCall', 'EmptyCall'
286        if (!isset($this->accumulated_method_stats[$this->RPC_MAP[$rpc]]
287                   ['result'][$status_code])) {
288            $this->accumulated_method_stats[$this->RPC_MAP[$rpc]]
289                ['result'][$status_code] = 0;
290        }
291        $this->accumulated_method_stats[$this->RPC_MAP[$rpc]]
292            ['result'][$status_code] += 1;
293    }
294
295    public function check_child_process_result() {
296        if (sizeof($this->outstanding_rpcs) > 0 &&
297            $this->rpc_config->tmp_file2) {
298            $keys_to_delete = [];
299            // tmp_file2 contains the RPC result of each RPC we
300            // originally wrote to tmp_file1
301            $f2 = fopen($this->rpc_config->tmp_file2, 'r+');
302            flock($f2, LOCK_EX);
303            while (true) {
304                $f2_line = fgets($f2);
305                if (!$f2_line) {
306                    break;
307                }
308                // format here needs to be in sync with
309                // src/php/bin/xds_manager.py
310                $parts = explode(',', trim($f2_line));
311                $key = $parts[0];
312                $returncode = $parts[1];
313                if (isset($this->outstanding_rpcs[$key])) {
314                    $parts2 = explode('|', $key);
315                    $result_num = $parts2[0];
316                    $rpc_name = $parts2[1];
317                    // Child processes can only communicate back the
318                    // status code for now.
319                    // Current interop test specs only call for
320                    // reporting back the status code in these scenarios.
321                    // If we ever need the hostname reported back from
322                    // child processes, we need to enhance this
323                    // communication framework through tmp files.
324                    $this->results[$rpc_name][$result_num] = "";
325                    if ($returncode) {
326                        $this->num_rpcs_failed_by_method
327                            [$this->RPC_MAP[$rpc_name]] += 1;
328                    } else {
329                        $this->num_rpcs_succeeded_by_method
330                            [$this->RPC_MAP[$rpc_name]] += 1;
331                    }
332                    $this->add_rpc_result($rpc_name, $returncode);
333                    $keys_to_delete[] = $key;
334                }
335            }
336            foreach ($keys_to_delete as $key) {
337                unset($this->outstanding_rpcs[$key]);
338            }
339            ftruncate($f2, 0);
340            flock($f2, LOCK_UN);
341            fclose($f2);
342        }
343    }
344
345    public function execute_rpc_in_child_process($rpc, $metadata_serialized) {
346        // tmp_file1 contains the list of RPCs (and their
347        // specs) we want executed. This will be picked up
348        // by src/php/bin/xds_manager.py
349        $f1 = fopen($this->rpc_config->tmp_file1, 'a');
350        $key = implode('|', [$this->num_results,
351                             $rpc,
352                             $metadata_serialized,
353                             $this->rpc_config->timeout_sec]);
354        flock($f1, LOCK_EX);
355        fwrite($f1, $key."\n");
356        fflush($f1);
357        flock($f1, LOCK_UN);
358        fclose($f1);
359        $this->outstanding_rpcs[$key] = 1;
360        $this->num_rpcs_started_by_method[$this->RPC_MAP[$rpc]] += 1;
361        $this->accumulated_method_stats[$this->RPC_MAP[$rpc]]
362            ['rpcs_started'] += 1;
363    }
364
365    public function run() {
366        // Autoloaded classes do not get inherited in threads.
367        // Hence we need to do this.
368        require_once($this->autoload_path_);
369
370        $stub = new Grpc\Testing\TestServiceClient(
371            $this->rpc_config->server_address,
372            ['credentials' => Grpc\ChannelCredentials::createInsecure()
373        ]);
374        // hrtime returns nanoseconds
375        $target_next_start_us = hrtime(true) / 1000;
376        while (true) {
377            $now_us = hrtime(true) / 1000;
378            $sleep_us = $target_next_start_us - $now_us;
379            if ($sleep_us < 0) {
380                $target_next_start_us =
381                        $now_us + ($this->target_seconds_between_rpcs_ * 1e6);
382            } else {
383                $target_next_start_us +=
384                        ($this->target_seconds_between_rpcs_ * 1e6);
385                usleep($sleep_us);
386            }
387            $this->check_child_process_result();
388            foreach ($this->rpc_config->rpcs_to_send as $rpc) {
389                $metadata_to_send_arr
390                    = (array)$this->rpc_config->metadata_to_send;
391                $metadata = array_key_exists($rpc, $metadata_to_send_arr) ?
392                          $metadata_to_send_arr[$rpc] : [];
393                // This copy is somehow necessary because
394                // $this->metadata_to_send[$rpc] somehow becomes a
395                // Volatile object, instead of an associative array.
396                $metadata_array = [];
397                $execute_in_child_process = false;
398                foreach ($metadata as $key => $value) {
399                    $metadata_array[$key] = [$value];
400                    if ($key == 'rpc-behavior' || $key == 'fi_testcase') {
401                        $execute_in_child_process = true;
402                    }
403                }
404                if ($execute_in_child_process && $this->rpc_config->tmp_file1) {
405                    // if 'rpc-behavior' is set, we need to pawn off
406                    // the execution to some other child PHP processes
407                    $this->execute_rpc_in_child_process(
408                        $rpc, serialize($metadata_array));
409                    continue;
410                }
411                // Execute RPC within this script
412                $call = null;
413                if ($rpc == 'UnaryCall') {
414                    $call = $this->sendUnaryCall($stub, $metadata_array);
415                } else if ($rpc == 'EmptyCall') {
416                    $call = $this->sendEmptyCall($stub, $metadata_array);
417                } else {
418                    throw new Exception("Unhandled rpc $rpc");
419                }
420                $this->num_rpcs_started_by_method[$this->RPC_MAP[$rpc]] += 1;
421                $this->accumulated_method_stats[$this->RPC_MAP[$rpc]]
422                    ['rpcs_started'] += 1;
423                // the remote peer is being returned as part of the
424                // initial metadata, according to the test spec
425                $initial_metadata = $call->getMetadata();
426                list($response, $status) = $call->wait();
427                if ($status->code == Grpc\STATUS_OK &&
428                    array_key_exists('hostname', $initial_metadata)) {
429                    $this->results[$rpc][$this->num_results]
430                        = $initial_metadata['hostname'][0];
431                    $this->num_rpcs_succeeded_by_method
432                        [$this->RPC_MAP[$rpc]] += 1;
433                    $this->add_rpc_result($rpc, 0);
434                } else {
435                    if ($this->rpc_config->fail_on_failed_rpcs_) {
436                        throw new Exception("$rpc failed with status "
437                                            . $status->code);
438                    }
439                    $this->results[$rpc][$this->num_results] = "";
440                    $this->num_rpcs_failed_by_method
441                        [$this->RPC_MAP[$rpc]] += 1;
442                    $this->add_rpc_result($rpc, $status->code);
443                }
444            }
445            // $num_results here is only incremented when the group of
446            // all $rpcs_to_send are done.
447            $this->num_results++;
448        }
449    }
450
451    // This is needed for loading autoload_path in the child thread
452    public function start(int $options = PTHREADS_INHERIT_ALL) {
453        return parent::start(PTHREADS_INHERIT_NONE);
454    }
455}
456
457
458// Note: num_channels are currently ignored for now
459$args = getopt('', ['fail_on_failed_rpcs:', 'num_channels:',
460                    'rpc:', 'metadata:', 'tmp_file1:', 'tmp_file2:',
461                    'server:', 'stats_port:', 'qps:']);
462
463// Convert input in the form of
464//   rpc1:k1:v1,rpc2:k2:v2,rpc1:k3:v3
465// into
466//   [
467//     'rpc1' => [
468//       'k1' => 'v1',
469//       'k3' => 'v3',
470//     ],
471//     'rpc2' => [
472//       'k2' => 'v2'
473//     ],
474//   ]
475$metadata_to_send = [];
476if ($_all_metadata = explode(',', $args['metadata'])) {
477    foreach ($_all_metadata as $one_metadata_pair) {
478        list($rpc,
479             $metadata_key,
480             $metadata_value) = explode(':', $one_metadata_pair);
481        // initialize in case we haven't seen this rpc before
482        if (!array_key_exists($rpc, $metadata_to_send)) {
483            $metadata_to_send[$rpc] = [];
484        }
485        $metadata_to_send[$rpc][$metadata_key] = $metadata_value;
486    }
487}
488$rpcs_to_send = (empty($args['rpc']) ? 'UnaryCall' : $args['rpc']);
489
490// Need to communicate the xds server name to the async runner manager
491if ($args['tmp_file1']) {
492    $f1 = fopen($args['tmp_file1'], 'w');
493    fwrite($f1, 'server_address,'.$args['server']);
494    fclose($f1);
495}
496
497$rpc_config = new RpcConfig($args['server'],
498                            $args['qps'],
499                            $args['fail_on_failed_rpcs'],
500                            explode(',', $rpcs_to_send),
501                            $metadata_to_send,
502                            $args['tmp_file1'],
503                            $args['tmp_file2']);
504
505
506$client_thread = new ClientThread($rpc_config,
507                                  $autoload_path);
508$client_thread->start();
509
510$server = new Grpc\RpcServer();
511$server->addHttp2Port('0.0.0.0:'.$args['stats_port']);
512$server->handle(new LoadBalancerStatsService());
513$server->handle(new XdsUpdateClientConfigureService());
514$server->run();
515