xref: /aosp_15_r20/external/OpenCL-CTS/test_conformance/run_conformance.py (revision 6467f958c7de8070b317fc65bcb0f6472e388d82)
1#! /usr/bin/python
2
3#/******************************************************************
4#//
5#//  OpenCL Conformance Tests
6#//
7#//  Copyright:  (c) 2008-2009 by Apple Inc. All Rights Reserved.
8#//
9#******************************************************************/
10
11from __future__ import print_function
12
13import os
14import re
15import sys
16import subprocess
17import time
18import tempfile
19
20DEBUG = 0
21
22log_file_name = "opencl_conformance_results_" + time.strftime("%Y-%m-%d_%H-%M", time.localtime()) + ".log"
23process_pid = 0
24
25# The amount of time between printing a "." (if no output from test) or ":" (if output)
26#  to the screen while the tests are running.
27seconds_between_status_updates = 60 * 60 * 24 * 7  # effectively never
28
29# Help info
30def write_help_info():
31    print("run_conformance.py test_list [CL_DEVICE_TYPE(s) to test] [partial-test-names, ...] [log=path/to/log/file/]")
32    print(" test_list - the .csv file containing the test names and commands to run the tests.")
33    print(" [partial-test-names, ...] - optional partial strings to select a subset of the tests to run.")
34    print(" [CL_DEVICE_TYPE(s) to test] - list of CL device types to test, default is CL_DEVICE_TYPE_DEFAULT.")
35    print(" [log=path/to/log/file/] - provide a path for the test log file, default is in the current directory.")
36    print("   (Note: spaces are not allowed in the log file path.")
37
38
39# Get the time formatted nicely
40def get_time():
41    return time.strftime("%d-%b %H:%M:%S", time.localtime())
42
43
44# Write text to the screen and the log file
45def write_screen_log(text):
46    global log_file
47    print(text)
48    log_file.write(text + "\n")
49
50
51# Load the tests from a csv formated file of the form name,command
52def get_tests(filename, devices_to_test):
53    tests = []
54    if os.path.exists(filename) == False:
55        print("FAILED: test_list \"" + filename + "\" does not exist.")
56        print("")
57        write_help_info()
58        sys.exit(-1)
59    file = open(filename, 'r')
60    for line in file.readlines():
61        comment = re.search("^#.*", line)
62        if comment:
63            continue
64        device_specific_match = re.search("^\s*(.+?)\s*,\s*(.+?)\s*,\s*(.+?)\s*$", line)
65        if device_specific_match:
66            if device_specific_match.group(1) in devices_to_test:
67                test_path = str.replace(device_specific_match.group(3), '/', os.sep)
68                test_name = str.replace(device_specific_match.group(2), '/', os.sep)
69                tests.append((test_name, test_path))
70            else:
71                print("Skipping " + device_specific_match.group(2) + " because " + device_specific_match.group(1) + " is not in the list of devices to test.")
72            continue
73        match = re.search("^\s*(.+?)\s*,\s*(.+?)\s*$", line)
74        if match:
75            test_path = str.replace(match.group(2), '/', os.sep)
76            test_name = str.replace(match.group(1), '/', os.sep)
77            tests.append((test_name, test_path))
78    return tests
79
80
81def run_test_checking_output(current_directory, test_dir, log_file):
82    global process_pid, seconds_between_status_updates
83    failures_this_run = 0
84    start_time = time.time()
85    # Create a temporary file for capturing the output from the test
86    (output_fd, output_name) = tempfile.mkstemp()
87    if not os.path.exists(output_name):
88        write_screen_log("\n           ==> ERROR: could not create temporary file %s ." % output_name)
89        os.close(output_fd)
90        return -1
91    # Execute the test
92    program_to_run = test_dir_without_args = test_dir.split(None, 1)[0]
93    if os.sep == '\\':
94        program_to_run += ".exe"
95    if os.path.exists(current_directory + os.sep + program_to_run):
96        os.chdir(os.path.dirname(current_directory + os.sep + test_dir_without_args))
97        try:
98            if DEBUG: p = subprocess.Popen("", stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True)
99            else: p = subprocess.Popen(current_directory + os.sep + test_dir, stderr=output_fd, stdout=output_fd, shell=True)
100        except OSError:
101            write_screen_log("\n           ==> ERROR: failed to execute test. Failing test. : " + str(OSError))
102            os.close(output_fd)
103            return -1
104    else:
105        write_screen_log("\n           ==> ERROR: test file (" + current_directory + os.sep + program_to_run + ") does not exist.  Failing test.")
106        os.close(output_fd)
107        return -1
108    # Set the global pid so we can kill it if this is aborted
109    process_pid = p.pid
110    # Read one character at a time from the temporary output file while the process is running.
111    # When we get an end-of-line, look for errors and write the results to the log file.
112    # This allows us to process the file as it is being produced.
113    # Keep track of the state for reading
114    # Whether we are done, if we have more to read, and where in the file we last read
115    done = False
116    more_to_read = True
117    pointer = 0
118    pointer_at_last_user_update = 0
119    output_this_run = False
120    try:
121        read_output = open(output_name, 'r')
122    except IOError:
123        write_screen_log("\n           ==> ERROR: could not open output file from test.")
124        os.close(output_fd)
125        return -1
126    line = ""
127    while not done or more_to_read:
128        os.fsync(output_fd)
129        # Determine if we should display some output
130        elapsed_time = (time.time() - start_time)
131        if elapsed_time > seconds_between_status_updates:
132            start_time = time.time()
133            # If we've received output from the test since the last update, display a #
134            if pointer != pointer_at_last_user_update:
135                sys.stdout.write(":")
136            else:
137                sys.stdout.write(".")
138            pointer_at_last_user_update = pointer
139            sys.stdout.flush()
140        # Check if we're done
141        p.poll()
142        if not done and p.returncode != None:
143            if p.returncode < 0:
144                if not output_this_run:
145                    print("")
146                    output_this_run = True
147                write_screen_log("           ==> ERROR: test killed/crashed: " + str(p.returncode) + ".")
148            done = True
149        # Try reading
150        try:
151            read_output.seek(pointer)
152            char_read = read_output.read(1)
153        except IOError:
154            time.sleep(1)
155            continue
156        # If we got a full line then process it
157        if char_read == "\n":
158            # Look for failures and report them as such
159            match = re.search(".*(FAILED|ERROR).*", line)
160            if match:
161                if not output_this_run:
162                    print("")
163                    output_this_run = True
164                print("           ==> " + line.replace('\n', ''))
165            match = re.search(".*FAILED.*", line)
166            if match:
167                failures_this_run = failures_this_run + 1
168            match = re.search(".*(PASSED).*", line)
169            if match:
170                if not output_this_run:
171                    print("")
172                    output_this_run = True
173                print("               " + line.replace('\n', ''))
174            # Write it to the log
175            log_file.write("     " + line + "\n")
176            log_file.flush()
177            line = ""
178            pointer = pointer + 1
179        # If we are at the end of the file, then re-open it to get new data
180        elif char_read == "":
181            more_to_read = False
182            read_output.close()
183            time.sleep(1)
184            try:
185                os.fsync(output_fd)
186                read_output = open(output_name, 'r')
187                # See if there is more to read. This happens if the process ends and we have data left.
188                read_output.seek(pointer)
189                if read_output.read(1) != "":
190                    more_to_read = True
191            except IOError:
192                write_screen_log("\n           ==> ERROR: could not reopen output file from test.")
193                return -1
194        else:
195            line = line + char_read
196            pointer = pointer + 1
197    # Now we are done, so write out any remaining data in the file:
198    # This should only happen if the process exited with an error.
199    os.fsync(output_fd)
200    while read_output.read(1) != "":
201        log_file.write(read_output.read(1))
202    # Return the total number of failures
203    if (p.returncode == 0 and failures_this_run > 0):
204        write_screen_log("\n           ==> ERROR: Test returned 0, but number of FAILED lines reported is " + str(failures_this_run) + ".")
205        return failures_this_run
206    return p.returncode
207
208
209def run_tests(tests):
210    global curent_directory
211    global process_pid
212    # Run the tests
213    failures = 0
214    previous_test = None
215    test_number = 1
216    for test in tests:
217        # Print the name of the test we're running and the time
218        (test_name, test_dir) = test
219        if test_dir != previous_test:
220            print("==========   " + test_dir)
221            log_file.write("========================================================================================\n")
222            log_file.write("========================================================================================\n")
223            log_file.write("(" + get_time() + ")     Running Tests: " + test_dir + "\n")
224            log_file.write("========================================================================================\n")
225            log_file.write("========================================================================================\n")
226            previous_test = test_dir
227        print("(" + get_time() + ")     BEGIN  " + test_name.ljust(40) + ": ", end='')
228        log_file.write("     ----------------------------------------------------------------------------------------\n")
229        log_file.write("     (" + get_time() + ")     Running Sub Test: " + test_name + "\n")
230        log_file.write("     ----------------------------------------------------------------------------------------\n")
231        log_file.flush()
232        sys.stdout.flush()
233
234        # Run the test
235        result = 0
236        start_time = time.time()
237        try:
238            process_pid = 0
239            result = run_test_checking_output(current_directory, test_dir, log_file)
240        except KeyboardInterrupt:
241            # Catch an interrupt from the user
242            write_screen_log("\nFAILED: Execution interrupted.  Killing test process, but not aborting full test run.")
243            os.kill(process_pid, 9)
244            if sys.version_info[0] < 3:
245                answer = raw_input("Abort all tests? (y/n)")
246            else:
247                answer = input("Abort all tests? (y/n)")
248            if answer.find("y") != -1:
249                write_screen_log("\nUser chose to abort all tests.")
250                log_file.close()
251                sys.exit(-1)
252            else:
253                write_screen_log("\nUser chose to continue with other tests. Reporting this test as failed.")
254                result = 1
255        run_time = (time.time() - start_time)
256
257        # Move print the finish status
258        if result == 0:
259            print("(" + get_time() + ")     PASSED " + test_name.ljust(40) + ": (" + str(int(run_time)).rjust(3) + "s, test " + str(test_number).rjust(3) + os.sep + str(len(tests)) + ")", end='')
260        else:
261            print("(" + get_time() + ")     FAILED " + test_name.ljust(40) + ": (" + str(int(run_time)).rjust(3) + "s, test " + str(test_number).rjust(3) + os.sep + str(len(tests)) + ")", end='')
262
263        test_number = test_number + 1
264        log_file.write("     ----------------------------------------------------------------------------------------\n")
265        log_file.flush()
266
267        print("")
268        if result != 0:
269            log_file.write("  *******************************************************************************************\n")
270            log_file.write("  *  (" + get_time() + ")     Test " + test_name + " ==> FAILED: " + str(result) + "\n")
271            log_file.write("  *******************************************************************************************\n")
272            failures = failures + 1
273        else:
274            log_file.write("     (" + get_time() + ")     Test " + test_name + " passed in " + str(run_time) + "s\n")
275
276        log_file.write("     ----------------------------------------------------------------------------------------\n")
277        log_file.write("\n")
278    return failures
279
280
281# ########################
282# Begin OpenCL conformance run script
283# ########################
284
285if len(sys.argv) < 2:
286    write_help_info()
287    sys.exit(-1)
288
289current_directory = os.getcwd()
290# Open the log file
291for arg in sys.argv:
292    match = re.search("log=(\S+)", arg)
293    if match:
294        log_file_name = match.group(1).rstrip('/') + os.sep + log_file_name
295try:
296    log_file = open(log_file_name, "w")
297except IOError:
298    print("Could not open log file " + log_file_name)
299    sys.exit(-1)
300
301# Determine which devices to test
302device_types = ["CL_DEVICE_TYPE_DEFAULT", "CL_DEVICE_TYPE_CPU", "CL_DEVICE_TYPE_GPU", "CL_DEVICE_TYPE_ACCELERATOR", "CL_DEVICE_TYPE_ALL"]
303devices_to_test = []
304for device in device_types:
305    if device in sys.argv[2:]:
306        devices_to_test.append(device)
307if len(devices_to_test) == 0:
308    devices_to_test = ["CL_DEVICE_TYPE_DEFAULT"]
309write_screen_log("Testing on: " + str(devices_to_test))
310
311# Get the tests
312tests = get_tests(sys.argv[1], devices_to_test)
313
314# If tests are specified on the command line then run just those ones
315tests_to_use = []
316num_of_patterns_to_match = 0
317for arg in sys.argv[2:]:
318    if arg in device_types:
319        continue
320    if re.search("log=(\S+)", arg):
321        continue
322    num_of_patterns_to_match = num_of_patterns_to_match + 1
323    found_it = False
324    for test in tests:
325        (test_name, test_dir) = test
326        if (test_name.find(arg) != -1 or test_dir.find(arg) != -1):
327            found_it = True
328            if test not in tests_to_use:
329                tests_to_use.append(test)
330    if found_it == False:
331        print("Failed to find a test matching " + arg)
332if len(tests_to_use) == 0:
333    if num_of_patterns_to_match > 0:
334        print("FAILED: Failed to find any tests matching the given command-line options.")
335        print("")
336        write_help_info()
337        sys.exit(-1)
338else:
339    tests = tests_to_use[:]
340
341write_screen_log("Test execution arguments: " + str(sys.argv))
342write_screen_log("Logging to file " + log_file_name + ".")
343write_screen_log("Loaded tests from " + sys.argv[1] + ", total of " + str(len(tests)) + " tests selected to run:")
344for (test_name, test_command) in tests:
345    write_screen_log(test_name.ljust(50) + " (" + test_command + ")")
346
347# Run the tests
348total_failures = 0
349for device_to_test in devices_to_test:
350    os.environ['CL_DEVICE_TYPE'] = device_to_test
351    write_screen_log("========================================================================================")
352    write_screen_log("========================================================================================")
353    write_screen_log(("Setting CL_DEVICE_TYPE to " + device_to_test).center(90))
354    write_screen_log("========================================================================================")
355    write_screen_log("========================================================================================")
356    failures = run_tests(tests)
357    write_screen_log("========================================================================================")
358    if failures == 0:
359        write_screen_log(">> TEST on " + device_to_test + " PASSED")
360    else:
361        write_screen_log(">> TEST on " + device_to_test + " FAILED (" + str(failures) + " FAILURES)")
362    write_screen_log("========================================================================================")
363    total_failures = total_failures + failures
364
365write_screen_log("(" + get_time() + ") Testing complete.  " + str(total_failures) + " failures for " + str(len(tests)) + " tests.")
366log_file.close()
367