#!/usr/bin/env python # Copyright (c) Alibaba, Inc. and its affiliates. import argparse import datetime import math import os import subprocess import sys sys.path.insert(0, '/swift') import tempfile import time import unittest from fnmatch import fnmatch from pathlib import Path from unittest import TextTestResult import pandas # NOTICE: Tensorflow 1.15 seems not so compatible with pytorch. # A segmentation fault may be raise by pytorch cpp library # if 'import tensorflow' in front of 'import torch'. # Putting a 'import torch' here can bypass this incompatibility. import torch import yaml from model_tag import ModelTag, commit_model_ut_result from test_utils import get_case_model_info from swift.utils.logger import get_logger logger = get_logger() def test_cases_result_to_df(result_list): table_header = [ 'Name', 'Result', 'Info', 'Start time', 'Stop time', 'Time cost(seconds)' ] df = pandas.DataFrame( result_list, columns=table_header).sort_values( by=['Start time'], ascending=True) return df def statistics_test_result(df): total_cases = df.shape[0] # yapf: disable success_cases = df.loc[df['Result'] == 'Success'].shape[0] error_cases = df.loc[df['Result'] == 'Error'].shape[0] failures_cases = df.loc[df['Result'] == 'Failures'].shape[0] expected_failure_cases = df.loc[df['Result'] == 'ExpectedFailures'].shape[0] unexpected_success_cases = df.loc[df['Result'] == 'UnexpectedSuccesses'].shape[0] skipped_cases = df.loc[df['Result'] == 'Skipped'].shape[0] # yapf: enable if failures_cases > 0 or \ error_cases > 0 or \ unexpected_success_cases > 0: final_result = 'FAILED' else: final_result = 'SUCCESS' result_msg = '%s (Runs=%s,success=%s,failures=%s,errors=%s,\ skipped=%s,expected failures=%s,unexpected successes=%s)' % ( final_result, total_cases, success_cases, failures_cases, error_cases, skipped_cases, expected_failure_cases, unexpected_success_cases) model_cases = get_case_model_info() for model_name, case_info in model_cases.items(): cases = df.loc[df['Name'].str.contains('|'.join(list(case_info)))] results = cases['Result'] result = None if any(results == 'Error') or any(results == 'Failures') or any( results == 'UnexpectedSuccesses'): result = ModelTag.MODEL_FAIL elif any(results == 'Success'): result = ModelTag.MODEL_PASS elif all(results == 'Skipped'): result = ModelTag.MODEL_SKIP else: print(f'invalid results for {model_name} \n{result}') if result is not None: commit_model_ut_result(model_name, result) print('Testing result summary.') print(result_msg) if final_result == 'FAILED': sys.exit(1) def gather_test_suites_in_files(test_dir, case_file_list, list_tests): test_suite = unittest.TestSuite() for case in case_file_list: test_case = unittest.defaultTestLoader.discover( start_dir=test_dir, pattern=case) test_suite.addTest(test_case) if hasattr(test_case, '__iter__'): for subcase in test_case: if list_tests: print(subcase) else: if list_tests: print(test_case) return test_suite def gather_test_suites_files(test_dir, pattern): case_file_list = [] for dirpath, dirnames, filenames in os.walk(test_dir): for file in filenames: if fnmatch(file, pattern): case_file_list.append(file) return case_file_list def collect_test_results(case_results): result_list = [ ] # each item is Case, Result, Start time, Stop time, Time cost for case_result in case_results.successes: result_list.append( (case_result.test_full_name, 'Success', '', case_result.start_time, case_result.stop_time, case_result.time_cost)) for case_result in case_results.errors: result_list.append( (case_result[0].test_full_name, 'Error', case_result[1], case_result[0].start_time, case_result[0].stop_time, case_result[0].time_cost)) for case_result in case_results.skipped: result_list.append( (case_result[0].test_full_name, 'Skipped', case_result[1], case_result[0].start_time, case_result[0].stop_time, case_result[0].time_cost)) for case_result in case_results.expectedFailures: result_list.append( (case_result[0].test_full_name, 'ExpectedFailures', case_result[1], case_result[0].start_time, case_result[0].stop_time, case_result[0].time_cost)) for case_result in case_results.failures: result_list.append( (case_result[0].test_full_name, 'Failures', case_result[1], case_result[0].start_time, case_result[0].stop_time, case_result[0].time_cost)) for case_result in case_results.unexpectedSuccesses: result_list.append((case_result.test_full_name, 'UnexpectedSuccesses', '', case_result.start_time, case_result.stop_time, case_result.time_cost)) return result_list def run_command_with_popen(cmd): with subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, encoding='utf8') as sub_process: for line in iter(sub_process.stdout.readline, ''): sys.stdout.write(line) def async_run_command_with_popen(cmd, device_id): logger.info('Worker id: %s args: %s' % (device_id, cmd)) env = os.environ.copy() env['CUDA_VISIBLE_DEVICES'] = '%s' % device_id sub_process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, universal_newlines=True, env=env, encoding='utf8') return sub_process def save_test_result(df, args): if args.result_dir is not None: file_name = str(int(datetime.datetime.now().timestamp() * 1000)) os.umask(0) Path(args.result_dir).mkdir(mode=0o777, parents=True, exist_ok=True) Path(os.path.join(args.result_dir, file_name)).touch( mode=0o666, exist_ok=True) df.to_pickle(os.path.join(args.result_dir, file_name)) def run_command(cmd): logger.info('Running command: %s' % ' '.join(cmd)) response = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) try: response.check_returncode() logger.info(response.stdout.decode('utf8')) except subprocess.CalledProcessError as error: logger.error( 'stdout: %s, stderr: %s' % (response.stdout.decode('utf8'), error.stderr.decode('utf8'))) def install_packages(pkgs): if pkgs is None: return cmd = [sys.executable, '-m', 'pip', 'install'] for pkg in pkgs: cmd.append(pkg) run_command(cmd) def install_requirements(requirements): for req in requirements: cmd = [ sys.executable, '-m', 'pip', 'install', '-r', 'requirements/%s' % req, '-f', 'https://modelscope.oss-cn-beijing.aliyuncs.com/releases/repo.html' ] run_command(cmd) def wait_for_free_worker(workers): while True: for idx, worker in enumerate(workers): if worker is None: logger.info('return free worker: %s' % (idx)) return idx if worker.poll() is None: # running, get output for line in iter(worker.stdout.readline, ''): if line != '': sys.stdout.write(line) else: break else: # worker process completed. logger.info('Process end: %s' % (idx)) workers[idx] = None return idx time.sleep(0.001) def wait_for_workers(workers): while True: for idx, worker in enumerate(workers): if worker is None: continue # check worker is completed. if worker.poll() is None: for line in iter(worker.stdout.readline, ''): if line != '': sys.stdout.write(line) else: break else: logger.info('Process idx: %s end!' % (idx)) workers[idx] = None is_all_completed = True for idx, worker in enumerate(workers): if worker is not None: is_all_completed = False break if is_all_completed: logger.info('All sub process is completed!') break time.sleep(0.001) def parallel_run_case_in_env(env_name, env, test_suite_env_map, isolated_cases, result_dir, parallel): logger.info('Running case in env: %s' % env_name) # install requirements and deps # run_config['envs'][env] if 'requirements' in env: install_requirements(env['requirements']) if 'dependencies' in env: install_packages(env['dependencies']) # case worker processes worker_processes = [None] * parallel for test_suite_file in isolated_cases: # run case in subprocess if test_suite_file in test_suite_env_map and test_suite_env_map[ test_suite_file] == env_name: cmd = [ 'python', 'tests/run.py', '--pattern', test_suite_file, '--result_dir', result_dir, ] worker_idx = wait_for_free_worker(worker_processes) worker_process = async_run_command_with_popen(cmd, worker_idx) os.set_blocking(worker_process.stdout.fileno(), False) worker_processes[worker_idx] = worker_process else: pass # case not in run list. # run remain cases in a process. remain_suite_files = [] for k, v in test_suite_env_map.items(): if k not in isolated_cases and v == env_name: remain_suite_files.append(k) if len(remain_suite_files) == 0: wait_for_workers(worker_processes) return # roughly split case in parallel part_count = math.ceil(len(remain_suite_files) / parallel) suites_chunks = [ remain_suite_files[x:x + part_count] for x in range(0, len(remain_suite_files), part_count) ] for suites_chunk in suites_chunks: worker_idx = wait_for_free_worker(worker_processes) cmd = [ 'python', 'tests/run.py', '--result_dir', result_dir, '--suites' ] for suite in suites_chunk: cmd.append(suite) worker_process = async_run_command_with_popen(cmd, worker_idx) os.set_blocking(worker_process.stdout.fileno(), False) worker_processes[worker_idx] = worker_process wait_for_workers(worker_processes) def run_case_in_env(env_name, env, test_suite_env_map, isolated_cases, result_dir): # install requirements and deps # run_config['envs'][env] if 'requirements' in env: install_requirements(env['requirements']) if 'dependencies' in env: install_packages(env['dependencies']) for test_suite_file in isolated_cases: # run case in subprocess if test_suite_file in test_suite_env_map and test_suite_env_map[ test_suite_file] == env_name: cmd = [ 'python', 'tests/run.py', '--pattern', test_suite_file, '--result_dir', result_dir, ] run_command_with_popen(cmd) else: pass # case not in run list. # run remain cases in a process. remain_suite_files = [] for k, v in test_suite_env_map.items(): if k not in isolated_cases and v == env_name: remain_suite_files.append(k) if len(remain_suite_files) == 0: return cmd = ['python', 'tests/run.py', '--result_dir', result_dir, '--suites'] for suite in remain_suite_files: cmd.append(suite) run_command_with_popen(cmd) def run_non_parallelizable_test_suites(suites, result_dir): cmd = ['python', 'tests/run.py', '--result_dir', result_dir, '--suites'] for suite in suites: cmd.append(suite) run_command_with_popen(cmd) # Selected cases: def get_selected_cases(): cmd = ['python', '-u', 'tests/run_analysis.py'] selected_cases = [] with subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, encoding='utf8') as sub_process: for line in iter(sub_process.stdout.readline, ''): sys.stdout.write(line) if line.startswith('Selected cases:'): line = line.replace('Selected cases:', '').strip() selected_cases = line.split(',') sub_process.wait() if sub_process.returncode != 0: msg = 'Run analysis exception, returncode: %s!' % sub_process.returncode logger.error(msg) raise Exception(msg) return selected_cases def run_in_subprocess(args): # only case args.isolated_cases run in subprocess, all other run in a subprocess if not args.no_diff: # run based on git diff try: test_suite_files = get_selected_cases() logger.info('Tests suite to run: ') for f in test_suite_files: logger.info(f) except Exception: logger.error( 'Get test suite based diff exception!, will run all cases.') test_suite_files = gather_test_suites_files( os.path.abspath(args.test_dir), args.pattern) if len(test_suite_files) == 0: logger.error('Get no test suite based on diff, run all the cases.') test_suite_files = gather_test_suites_files( os.path.abspath(args.test_dir), args.pattern) else: test_suite_files = gather_test_suites_files( os.path.abspath(args.test_dir), args.pattern) non_parallelizable_suites = [] test_suite_files = [ x for x in test_suite_files if x not in non_parallelizable_suites ] run_config = None isolated_cases = [] test_suite_env_map = {} # put all the case in default env. for test_suite_file in test_suite_files: test_suite_env_map[test_suite_file] = 'default' if args.run_config is not None and Path(args.run_config).exists(): with open(args.run_config, encoding='utf-8') as f: run_config = yaml.load(f, Loader=yaml.FullLoader) if 'isolated' in run_config: isolated_cases = run_config['isolated'] if 'envs' in run_config: for env in run_config['envs']: if env != 'default': for test_suite in run_config['envs'][env]['tests']: if test_suite in test_suite_env_map: test_suite_env_map[test_suite] = env if args.subprocess: # run all case in subprocess isolated_cases = test_suite_files with tempfile.TemporaryDirectory() as temp_result_dir: # first run cases that nonparallelizable run_non_parallelizable_test_suites(non_parallelizable_suites, temp_result_dir) # run case parallel in envs for env in set(test_suite_env_map.values()): parallel_run_case_in_env(env, run_config['envs'][env], test_suite_env_map, isolated_cases, temp_result_dir, args.parallel) result_dfs = [] result_path = Path(temp_result_dir) for result in result_path.iterdir(): if Path.is_file(result): df = pandas.read_pickle(result) result_dfs.append(df) result_pd = pandas.concat( result_dfs) # merge result of every test suite. print_table_result(result_pd) print_abnormal_case_info(result_pd) statistics_test_result(result_pd) def get_object_full_name(obj): klass = obj.__class__ module = klass.__module__ if module == 'builtins': return klass.__qualname__ return module + '.' + klass.__qualname__ class TimeCostTextTestResult(TextTestResult): """Record test case time used!""" def __init__(self, stream, descriptions, verbosity): self.successes = [] super(TimeCostTextTestResult, self).__init__(stream, descriptions, verbosity) def startTest(self, test): test.start_time = datetime.datetime.now() test.test_full_name = get_object_full_name( test) + '.' + test._testMethodName self.stream.writeln('Test case: %s start at: %s' % (test.test_full_name, test.start_time)) return super(TimeCostTextTestResult, self).startTest(test) def stopTest(self, test): TextTestResult.stopTest(self, test) test.stop_time = datetime.datetime.now() test.time_cost = (test.stop_time - test.start_time).total_seconds() self.stream.writeln( 'Test case: %s stop at: %s, cost time: %s(seconds)' % (test.test_full_name, test.stop_time, test.time_cost)) if torch.cuda.is_available( ) and test.time_cost > 5.0: # print nvidia-smi cmd = ['nvidia-smi'] run_command_with_popen(cmd) super(TimeCostTextTestResult, self).stopTest(test) def addSuccess(self, test): self.successes.append(test) super(TextTestResult, self).addSuccess(test) class TimeCostTextTestRunner(unittest.runner.TextTestRunner): resultclass = TimeCostTextTestResult def run(self, test): return super(TimeCostTextTestRunner, self).run(test) def _makeResult(self): result = super(TimeCostTextTestRunner, self)._makeResult() return result def gather_test_cases(test_dir, pattern, list_tests): case_list = [] for dirpath, dirnames, filenames in os.walk(test_dir): for file in filenames: if fnmatch(file, pattern): case_list.append(file) test_suite = unittest.TestSuite() for case in case_list: test_case = unittest.defaultTestLoader.discover( start_dir=test_dir, pattern=case) test_suite.addTest(test_case) if hasattr(test_case, '__iter__'): for subcase in test_case: if list_tests: print(subcase) else: if list_tests: print(test_case) return test_suite def print_abnormal_case_info(df): df = df.loc[(df['Result'] == 'Error') | (df['Result'] == 'Failures')] for _, row in df.iterrows(): print('Case %s run result: %s, msg:\n%s' % (row['Name'], row['Result'], row['Info'])) def print_table_result(df): df = df.loc[df['Result'] != 'Skipped'] df = df.drop('Info', axis=1) formatters = { 'Name': '{{:<{}s}}'.format(df['Name'].str.len().max()).format, 'Result': '{{:<{}s}}'.format(df['Result'].str.len().max()).format, } with pandas.option_context('display.max_rows', None, 'display.max_columns', None, 'display.width', None): print(df.to_string(justify='left', formatters=formatters, index=False)) def main(args): runner = TimeCostTextTestRunner() if args.suites is not None and len(args.suites) > 0: logger.info('Running: %s' % ' '.join(args.suites)) test_suite = gather_test_suites_in_files(args.test_dir, args.suites, args.list_tests) else: test_suite = gather_test_cases( os.path.abspath(args.test_dir), args.pattern, args.list_tests) if not args.list_tests: result = runner.run(test_suite) logger.info('Running case completed, pid: %s, suites: %s' % (os.getpid(), args.suites)) result = collect_test_results(result) df = test_cases_result_to_df(result) if args.result_dir is not None: save_test_result(df, args) else: print_table_result(df) print_abnormal_case_info(df) statistics_test_result(df) if __name__ == '__main__': parser = argparse.ArgumentParser('test runner') parser.add_argument( '--list_tests', action='store_true', help='list all tests') parser.add_argument( '--pattern', default='test_*.py', help='test file pattern') parser.add_argument( '--test_dir', default='tests', help='directory to be tested') parser.add_argument( '--level', default=0, type=int, help='2 -- all, 1 -- p1, 0 -- p0') parser.add_argument( '--profile', action='store_true', help='enable profiling') parser.add_argument( '--run_config', default=None, help='specified case run config file(yaml file)') parser.add_argument( '--subprocess', action='store_true', help='run all test suite in subprocess') parser.add_argument( '--result_dir', default=None, help='Save result to directory, internal use only') parser.add_argument( '--parallel', default=1, type=int, help='Set case parallels, default single process, set with gpu number.' ) parser.add_argument( '--no-diff', action='store_true', help= 'Default running case based on git diff(with master), disable with --no-diff)' ) parser.add_argument( '--suites', nargs='*', help='Run specified test suites(test suite files list split by space)') args = parser.parse_args() print(args) if args.run_config is not None or args.subprocess: run_in_subprocess(args) else: main(args)