1#!/usr/bin/env python 2# Copyright 2021 gRPC authors. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Manage PHP child processes for the main PHP xDS Interop client""" 16 17import argparse 18import fcntl 19import os 20import subprocess 21 22# This script is being launched from src/php/bin/run_xds_client.sh 23# to manage PHP child processes which will send 1 RPC each 24# asynchronously. This script keeps track of all those open 25# processes and reports back to the main PHP interop client each 26# of the child RPCs' status code. 27 28if __name__ == "__main__": 29 parser = argparse.ArgumentParser() 30 parser.add_argument("--tmp_file1", nargs="?", default="") 31 parser.add_argument("--tmp_file2", nargs="?", default="") 32 parser.add_argument("--bootstrap_path", nargs="?", default="") 33 args = parser.parse_args() 34 server_address = "" 35 rpcs_started = [] 36 open_processes = {} 37 client_env = dict(os.environ) 38 client_env["GRPC_XDS_BOOTSTRAP"] = args.bootstrap_path 39 while True: 40 # tmp_file1 contains a list of RPCs (and their spec) the parent process 41 # wants executed 42 f1 = open(args.tmp_file1, "r+") 43 fcntl.flock(f1, fcntl.LOCK_EX) 44 while True: 45 key = f1.readline() 46 if not key: 47 break 48 key = key.strip() 49 if key.startswith("server_address"): 50 if not server_address: 51 server_address = key[15:] 52 elif not key in rpcs_started: 53 # format here needs to be in sync with 54 # src/php/tests/interop/xds_client.php 55 items = key.split("|") 56 num = items[0] 57 metadata = items[2] 58 timeout_sec = items[3] 59 if items[1] == "UnaryCall": 60 p = subprocess.Popen( 61 [ 62 "php", 63 "-d", 64 "extension=grpc.so", 65 "-d", 66 "extension=pthreads.so", 67 "src/php/tests/interop/xds_unary_call.php", 68 "--server=" + server_address, 69 "--num=" + str(num), 70 "--metadata=" + metadata, 71 "--timeout_sec=" + timeout_sec, 72 ], 73 env=client_env, 74 ) 75 elif items[1] == "EmptyCall": 76 p = subprocess.Popen( 77 [ 78 "php", 79 "-d", 80 "extension=grpc.so", 81 "-d", 82 "extension=pthreads.so", 83 "src/php/tests/interop/xds_empty_call.php", 84 "--server=" + server_address, 85 "--num=" + str(num), 86 "--metadata=" + metadata, 87 "--timeout_sec=" + timeout_sec, 88 ], 89 env=client_env, 90 ) 91 else: 92 continue 93 rpcs_started.append(key) 94 open_processes[key] = p 95 f1.truncate(0) 96 fcntl.flock(f1, fcntl.LOCK_UN) 97 f1.close() 98 # tmp_file2 contains the RPC result of each key received from tmp_file1 99 f2 = open(args.tmp_file2, "a") 100 fcntl.flock(f2, fcntl.LOCK_EX) 101 keys_to_delete = [] 102 for key, process in open_processes.items(): 103 result = process.poll() 104 if result is not None: 105 # format here needs to be in sync with 106 # src/php/tests/interop/xds_client.php 107 f2.write(key + "," + str(process.returncode) + "\n") 108 keys_to_delete.append(key) 109 for key in keys_to_delete: 110 del open_processes[key] 111 fcntl.flock(f2, fcntl.LOCK_UN) 112 f2.close() 113