|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | import ast | 
					
						
						|  | import datetime | 
					
						
						|  | import json | 
					
						
						|  | import os | 
					
						
						|  | import sys | 
					
						
						|  | import time | 
					
						
						|  | from typing import Dict | 
					
						
						|  |  | 
					
						
						|  | from get_ci_error_statistics import get_jobs | 
					
						
						|  | from huggingface_hub import HfApi | 
					
						
						|  | from notification_service import ( | 
					
						
						|  | Message, | 
					
						
						|  | handle_stacktraces, | 
					
						
						|  | handle_test_results, | 
					
						
						|  | prepare_reports, | 
					
						
						|  | retrieve_artifact, | 
					
						
						|  | retrieve_available_artifacts, | 
					
						
						|  | ) | 
					
						
						|  | from slack_sdk import WebClient | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | api = HfApi() | 
					
						
						|  | client = WebClient(token=os.environ["CI_SLACK_BOT_TOKEN"]) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | class QuantizationMessage(Message): | 
					
						
						|  | def __init__( | 
					
						
						|  | self, | 
					
						
						|  | title: str, | 
					
						
						|  | results: Dict, | 
					
						
						|  | ): | 
					
						
						|  | self.title = title | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | self.n_success = sum(r["success"] for r in results.values()) | 
					
						
						|  | self.single_gpu_failures = sum(r["failed"]["single"] for r in results.values()) | 
					
						
						|  | self.multi_gpu_failures = sum(r["failed"]["multi"] for r in results.values()) | 
					
						
						|  | self.n_failures = self.single_gpu_failures + self.multi_gpu_failures | 
					
						
						|  |  | 
					
						
						|  | self.n_tests = self.n_failures + self.n_success | 
					
						
						|  | self.results = results | 
					
						
						|  | self.thread_ts = None | 
					
						
						|  |  | 
					
						
						|  | @property | 
					
						
						|  | def payload(self) -> str: | 
					
						
						|  | blocks = [self.header] | 
					
						
						|  |  | 
					
						
						|  | if self.n_failures > 0: | 
					
						
						|  | blocks.append(self.failures_overwiew) | 
					
						
						|  | blocks.append(self.failures_detailed) | 
					
						
						|  |  | 
					
						
						|  | if self.n_failures == 0: | 
					
						
						|  | blocks.append(self.no_failures) | 
					
						
						|  |  | 
					
						
						|  | return json.dumps(blocks) | 
					
						
						|  |  | 
					
						
						|  | @property | 
					
						
						|  | def time(self) -> str: | 
					
						
						|  | all_results = self.results.values() | 
					
						
						|  | time_spent = [] | 
					
						
						|  | for r in all_results: | 
					
						
						|  | if len(r["time_spent"]): | 
					
						
						|  | time_spent.extend([x for x in r["time_spent"].split(", ") if len(x.strip())]) | 
					
						
						|  | total_secs = 0 | 
					
						
						|  |  | 
					
						
						|  | for time in time_spent: | 
					
						
						|  | time_parts = time.split(":") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if len(time_parts) == 1: | 
					
						
						|  | time_parts = [0, 0, time_parts[0]] | 
					
						
						|  |  | 
					
						
						|  | hours, minutes, seconds = int(time_parts[0]), int(time_parts[1]), float(time_parts[2]) | 
					
						
						|  | total_secs += hours * 3600 + minutes * 60 + seconds | 
					
						
						|  |  | 
					
						
						|  | hours, minutes, seconds = total_secs // 3600, (total_secs % 3600) // 60, total_secs % 60 | 
					
						
						|  | return f"{int(hours)}h{int(minutes)}m{int(seconds)}s" | 
					
						
						|  |  | 
					
						
						|  | @property | 
					
						
						|  | def failures_overwiew(self) -> Dict: | 
					
						
						|  | return { | 
					
						
						|  | "type": "section", | 
					
						
						|  | "text": { | 
					
						
						|  | "type": "plain_text", | 
					
						
						|  | "text": ( | 
					
						
						|  | f"There were {self.n_failures} failures, out of {self.n_tests} tests.\n" | 
					
						
						|  | f"The suite ran in {self.time}." | 
					
						
						|  | ), | 
					
						
						|  | "emoji": True, | 
					
						
						|  | }, | 
					
						
						|  | "accessory": { | 
					
						
						|  | "type": "button", | 
					
						
						|  | "text": {"type": "plain_text", "text": "Check Action results", "emoji": True}, | 
					
						
						|  | "url": f"https://github.com/huggingface/transformers/actions/runs/{os.environ['GITHUB_RUN_ID']}", | 
					
						
						|  | }, | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  | @property | 
					
						
						|  | def failures_detailed(self) -> Dict: | 
					
						
						|  | failures = {k: v["failed"] for k, v in self.results.items()} | 
					
						
						|  |  | 
					
						
						|  | individual_reports = [] | 
					
						
						|  | for key, value in failures.items(): | 
					
						
						|  | device_report = self.get_device_report(value) | 
					
						
						|  | if sum(value.values()): | 
					
						
						|  | report = f"{device_report}{key}" | 
					
						
						|  | individual_reports.append(report) | 
					
						
						|  |  | 
					
						
						|  | header = "Single |  Multi | Category\n" | 
					
						
						|  | failures_report = prepare_reports( | 
					
						
						|  | title="The following quantization tests had failures", header=header, reports=individual_reports | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | return {"type": "section", "text": {"type": "mrkdwn", "text": failures_report}} | 
					
						
						|  |  | 
					
						
						|  | def post(self): | 
					
						
						|  | payload = self.payload | 
					
						
						|  | print("Sending the following payload") | 
					
						
						|  | print(json.dumps({"blocks": json.loads(payload)})) | 
					
						
						|  |  | 
					
						
						|  | text = f"{self.n_failures} failures out of {self.n_tests} tests," if self.n_failures else "All tests passed." | 
					
						
						|  |  | 
					
						
						|  | self.thread_ts = client.chat_postMessage( | 
					
						
						|  | channel=SLACK_REPORT_CHANNEL_ID, | 
					
						
						|  | blocks=payload, | 
					
						
						|  | text=text, | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | def post_reply(self): | 
					
						
						|  | if self.thread_ts is None: | 
					
						
						|  | raise ValueError("Can only post reply if a post has been made.") | 
					
						
						|  |  | 
					
						
						|  | for job, job_result in self.results.items(): | 
					
						
						|  | if len(job_result["failures"]): | 
					
						
						|  | for device, failures in job_result["failures"].items(): | 
					
						
						|  | blocks = self.get_reply_blocks( | 
					
						
						|  | job, | 
					
						
						|  | job_result, | 
					
						
						|  | failures, | 
					
						
						|  | device, | 
					
						
						|  | text=f"Number of failures: {job_result['failed'][device]}", | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | print("Sending the following reply") | 
					
						
						|  | print(json.dumps({"blocks": blocks})) | 
					
						
						|  |  | 
					
						
						|  | client.chat_postMessage( | 
					
						
						|  | channel="#transformers-ci-daily-quantization", | 
					
						
						|  | text=f"Results for {job}", | 
					
						
						|  | blocks=blocks, | 
					
						
						|  | thread_ts=self.thread_ts["ts"], | 
					
						
						|  | ) | 
					
						
						|  | time.sleep(1) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if __name__ == "__main__": | 
					
						
						|  | setup_status = os.environ.get("SETUP_STATUS") | 
					
						
						|  | SLACK_REPORT_CHANNEL_ID = os.environ["SLACK_REPORT_CHANNEL"] | 
					
						
						|  | setup_failed = True if setup_status is not None and setup_status != "success" else False | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | ci_event = os.environ["CI_EVENT"] | 
					
						
						|  |  | 
					
						
						|  | title = f"🤗 Results of the {ci_event} - {os.getenv('CI_TEST_JOB')}." | 
					
						
						|  |  | 
					
						
						|  | if setup_failed: | 
					
						
						|  | Message.error_out( | 
					
						
						|  | title, ci_title="", runner_not_available=False, runner_failed=False, setup_failed=setup_failed | 
					
						
						|  | ) | 
					
						
						|  | exit(0) | 
					
						
						|  |  | 
					
						
						|  | arguments = sys.argv[1:][0] | 
					
						
						|  | try: | 
					
						
						|  | quantization_matrix = ast.literal_eval(arguments) | 
					
						
						|  |  | 
					
						
						|  | quantization_matrix = [x.replace("quantization/", "quantization_") for x in quantization_matrix] | 
					
						
						|  | except SyntaxError: | 
					
						
						|  | Message.error_out(title, ci_title="") | 
					
						
						|  | raise ValueError("Errored out.") | 
					
						
						|  |  | 
					
						
						|  | available_artifacts = retrieve_available_artifacts() | 
					
						
						|  |  | 
					
						
						|  | quantization_results = { | 
					
						
						|  | quant: { | 
					
						
						|  | "failed": {"single": 0, "multi": 0}, | 
					
						
						|  | "success": 0, | 
					
						
						|  | "time_spent": "", | 
					
						
						|  | "failures": {}, | 
					
						
						|  | "job_link": {}, | 
					
						
						|  | } | 
					
						
						|  | for quant in quantization_matrix | 
					
						
						|  | if f"run_quantization_torch_gpu_{quant}_test_reports" in available_artifacts | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  | github_actions_jobs = get_jobs( | 
					
						
						|  | workflow_run_id=os.environ["GITHUB_RUN_ID"], token=os.environ["ACCESS_REPO_INFO_TOKEN"] | 
					
						
						|  | ) | 
					
						
						|  | github_actions_job_links = {job["name"]: job["html_url"] for job in github_actions_jobs} | 
					
						
						|  |  | 
					
						
						|  | artifact_name_to_job_map = {} | 
					
						
						|  | for job in github_actions_jobs: | 
					
						
						|  | for step in job["steps"]: | 
					
						
						|  | if step["name"].startswith("Test suite reports artifacts: "): | 
					
						
						|  | artifact_name = step["name"][len("Test suite reports artifacts: ") :] | 
					
						
						|  | artifact_name_to_job_map[artifact_name] = job | 
					
						
						|  | break | 
					
						
						|  |  | 
					
						
						|  | for quant in quantization_results.keys(): | 
					
						
						|  | for artifact_path in available_artifacts[f"run_quantization_torch_gpu_{quant}_test_reports"].paths: | 
					
						
						|  | artifact = retrieve_artifact(artifact_path["path"], artifact_path["gpu"]) | 
					
						
						|  | if "stats" in artifact: | 
					
						
						|  |  | 
					
						
						|  | job = artifact_name_to_job_map[artifact_path["path"]] | 
					
						
						|  | quantization_results[quant]["job_link"][artifact_path["gpu"]] = job["html_url"] | 
					
						
						|  | failed, success, time_spent = handle_test_results(artifact["stats"]) | 
					
						
						|  | quantization_results[quant]["failed"][artifact_path["gpu"]] += failed | 
					
						
						|  | quantization_results[quant]["success"] += success | 
					
						
						|  | quantization_results[quant]["time_spent"] += time_spent[1:-1] + ", " | 
					
						
						|  |  | 
					
						
						|  | stacktraces = handle_stacktraces(artifact["failures_line"]) | 
					
						
						|  |  | 
					
						
						|  | for line in artifact["summary_short"].split("\n"): | 
					
						
						|  | if line.startswith("FAILED "): | 
					
						
						|  | line = line[len("FAILED ") :] | 
					
						
						|  | line = line.split()[0].replace("\n", "") | 
					
						
						|  |  | 
					
						
						|  | if artifact_path["gpu"] not in quantization_results[quant]["failures"]: | 
					
						
						|  | quantization_results[quant]["failures"][artifact_path["gpu"]] = [] | 
					
						
						|  |  | 
					
						
						|  | quantization_results[quant]["failures"][artifact_path["gpu"]].append( | 
					
						
						|  | {"line": line, "trace": stacktraces.pop(0)} | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | job_name = os.getenv("CI_TEST_JOB") | 
					
						
						|  | if not os.path.isdir(os.path.join(os.getcwd(), f"ci_results_{job_name}")): | 
					
						
						|  | os.makedirs(os.path.join(os.getcwd(), f"ci_results_{job_name}")) | 
					
						
						|  |  | 
					
						
						|  | with open(f"ci_results_{job_name}/quantization_results.json", "w", encoding="UTF-8") as fp: | 
					
						
						|  | json.dump(quantization_results, fp, indent=4, ensure_ascii=False) | 
					
						
						|  |  | 
					
						
						|  | target_workflow = "huggingface/transformers/.github/workflows/self-scheduled-caller.yml@refs/heads/main" | 
					
						
						|  | is_scheduled_ci_run = os.environ.get("CI_WORKFLOW_REF") == target_workflow | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if is_scheduled_ci_run: | 
					
						
						|  | api.upload_file( | 
					
						
						|  | path_or_fileobj=f"ci_results_{job_name}/quantization_results.json", | 
					
						
						|  | path_in_repo=f"{datetime.datetime.today().strftime('%Y-%m-%d')}/ci_results_{job_name}/quantization_results.json", | 
					
						
						|  | repo_id="hf-internal-testing/transformers_daily_ci", | 
					
						
						|  | repo_type="dataset", | 
					
						
						|  | token=os.environ.get("TRANSFORMERS_CI_RESULTS_UPLOAD_TOKEN", None), | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | message = QuantizationMessage( | 
					
						
						|  | title, | 
					
						
						|  | results=quantization_results, | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | message.post() | 
					
						
						|  | message.post_reply() | 
					
						
						|  |  |