xref: /aosp_15_r20/external/executorch/backends/cadence/runtime/executor.py (revision 523fa7a60841cd1ecfb9cc4201f1ca8b03ed023a)
1# Copyright (c) Meta Platforms, Inc. and affiliates.
2# All rights reserved.
3#
4# This source code is licensed under the BSD-style license found in the
5# LICENSE file in the root directory of this source tree.
6
7# pyre-strict
8
9
10import logging
11import os
12import selectors
13import subprocess
14import sys
15
16from dataclasses import dataclass
17from typing import Dict, List, Optional, Sequence, Tuple, Union
18
19import torch
20
21from executorch.devtools.bundled_program.config import MethodTestCase, MethodTestSuite
22from executorch.devtools.bundled_program.core import BundledProgram
23
24from executorch.devtools.bundled_program.serialize import (
25    serialize_from_bundled_program_to_flatbuffer,
26)
27from executorch.exir import ExecutorchProgram, ExecutorchProgramManager
28
29# If quiet is true, suppress the printing of stdout and stderr output.
30quiet = False
31
32
33def _execute_subprocess(cmd: List[str], cwd: Optional[str] = None) -> Tuple[str, str]:
34    """
35    `subprocess.run(cmd, capture_output=True)` captures stdout/stderr and only
36    returns it at the end. This functions not only does that, but also prints out
37    stdout/stderr non-blockingly when running the command.
38    """
39    logging.debug(f"cmd = \33[33m{cmd}\33[0m, cwd = {cwd}")
40    stdout = ""
41    stderr = ""
42
43    PIPE = subprocess.PIPE
44    with subprocess.Popen(cmd, stdout=PIPE, stderr=PIPE, cwd=cwd) as p:
45        sel = selectors.DefaultSelector()
46        # pyre-fixme[6]: For 1st argument expected `Union[HasFileno, int]` but got
47        #  `Optional[IO[bytes]]`.
48        sel.register(p.stdout, selectors.EVENT_READ)
49        # pyre-fixme[6]: For 1st argument expected `Union[HasFileno, int]` but got
50        #  `Optional[IO[bytes]]`.
51        sel.register(p.stderr, selectors.EVENT_READ)
52
53        done = False
54        while not done:
55            for key, _ in sel.select():
56                # pyre-fixme[16]: Item `HasFileno` of `Union[HasFileno, int]` has no
57                #  attribute `read1`.
58                data = key.fileobj.read1().decode()
59                if not data:
60                    done = True
61                    break
62
63                if key.fileobj is p.stdout:
64                    if not quiet:
65                        print(data, end="")
66                    stdout += data
67                else:
68                    if not quiet:
69                        print(data, end="", file=sys.stderr)
70                    stderr += data
71
72    # flush stdout and stderr in case there's no newline character at the end
73    # from the subprocess
74    sys.stdout.flush()
75    sys.stderr.flush()
76
77    if p.returncode != 0:
78        raise subprocess.CalledProcessError(p.returncode, p.args, stdout, stderr)
79
80    return stdout, stderr
81
82
83def execute(args: List[str]) -> Tuple[str, str]:
84    """
85    Either a local execution (through subprocess.run) or a remote execution (in Hargow).
86    Run the command described by args (the same way subprocess.run does). Ex: if you want to
87    run "ls -al", you need to pass args = ["ls", "-al"]
88    """
89    # `import torch` will mess up PYTHONPATH. delete the messed up PYTHONPATH
90    if "PYTHONPATH" in os.environ:
91        del os.environ["PYTHONPATH"]
92
93    try:
94        return _execute_subprocess(args)
95    except subprocess.CalledProcessError as e:
96        fdb_cmd = f"fdb {' '.join(e.cmd)}"
97        raise RuntimeError(
98            f"Failed to execute. Use the following to debug:\n{fdb_cmd}"
99        ) from e
100
101
102class Executor:
103    # pyre-fixme[3]: Return type must be annotated.
104    def __init__(
105        self,
106        working_dir: str = "",
107    ):
108        self.working_dir = working_dir
109        self.executor_builder = "./backends/cadence/build_cadence_runner.sh"
110        self.execute_runner = "./cmake-out/backends/cadence/cadence_runner"
111        self.bundled_program_path: str = "CadenceDemoModel.bpte"
112
113    def __call__(self) -> None:
114        # build executor
115        args = self.get_bash_command(self.executor_builder)
116        logging.info(f"\33[33m{' '.join(args)}\33[0m")
117        execute(args)
118
119        # run executor
120        cmd_args = {
121            "bundled_program_path": os.path.join(
122                self.working_dir, self.bundled_program_path
123            ),
124            "etdump_path": os.path.join(self.working_dir, "etdump.etdp"),
125            "debug_output_path": os.path.join(self.working_dir, "debug_output.bin"),
126            "dump_outputs": "true",
127        }
128        args = self.get_bash_command(self.execute_runner, cmd_args)
129        logging.info(f"\33[33m{' '.join(args)}\33[0m")
130        execute(args)
131
132    @staticmethod
133    def get_bash_command(
134        executable: str,
135        cmd_args: Optional[Dict[str, str]] = None,
136    ) -> List[str]:
137        # go through buck config and turn the dict into a list of "{key}=={value}"
138        if cmd_args is None:
139            cmd_args = {}
140
141        cmd_args_strs = []
142        for key, value in cmd_args.items():
143            cmd_args_strs.extend([f"--{key}={value}"])
144
145        return [executable] + cmd_args_strs
146
147
148@dataclass
149class BundledProgramTestData:
150    method: str
151    inputs: Sequence[Union[bool, float, int, torch.Tensor]]
152    expected_outputs: Sequence[torch.Tensor]
153    testset_idx: int = 0  # There is only one testset in the bundled program
154
155
156class BundledProgramManager:
157    """
158    Stateful bundled program object
159    Takes a BundledProgramTestData and generates a bundled program
160    """
161
162    def __init__(self, bundled_program_test_data: List[BundledProgramTestData]) -> None:
163        self.bundled_program_test_data: List[BundledProgramTestData] = (
164            bundled_program_test_data
165        )
166
167    @staticmethod
168    # pyre-fixme[2]: Parameter `**args` has no type specified.
169    def bundled_program_test_data_gen(**args) -> BundledProgramTestData:
170        return BundledProgramTestData(**args)
171
172    def get_method_test_suites(self) -> List[MethodTestSuite]:
173        return [
174            self._gen_method_test_suite(bptd) for bptd in self.bundled_program_test_data
175        ]
176
177    def _gen_method_test_suite(self, bptd: BundledProgramTestData) -> MethodTestSuite:
178        method_test_case = MethodTestCase(
179            inputs=bptd.inputs,
180            expected_outputs=bptd.expected_outputs,
181        )
182        return MethodTestSuite(
183            method_name=bptd.method,
184            test_cases=[method_test_case],
185        )
186
187    def _serialize(
188        self,
189        executorch_program: Union[
190            ExecutorchProgram,
191            ExecutorchProgramManager,
192        ],
193        method_test_suites: Sequence[MethodTestSuite],
194        bptd: BundledProgramTestData,
195    ) -> bytes:
196        bundled_program = BundledProgram(
197            executorch_program=executorch_program, method_test_suites=method_test_suites
198        )
199        bundled_program_buffer = serialize_from_bundled_program_to_flatbuffer(
200            bundled_program
201        )
202        return bundled_program_buffer
203