1"""Generate QEMU options for Trusty test framework"""
2
3import logging
4import os
5import pathlib
6import shutil
7import subprocess
8import tempfile
9
10from qemu_error import RunnerGenericError
11
12logger = logging.getLogger(__name__)
13
14def _find_dtc():
15    for search_dir in ["out/host/linux-x86/bin", "prebuilts/misc/linux-x86/dtc", "bin", "linux-build/scripts/dtc"]:
16        path = os.path.join(search_dir, "dtc")
17        if os.path.exists(path):
18            return path
19    return None
20
21
22class QemuDrive():
23    def __init__(self, name: str, index: int, read_only: bool = True):
24        self.name = name
25        self.index = index
26        self.read_only = read_only
27
28    def index_letter(self):
29        return chr(ord('a') + self.index)
30
31    def path(self, directory: os.PathLike):
32        return f"{directory}/{self.name}.img"
33
34    def ensure_image_exists(self, image_dir: os.PathLike,
35                            instance_dir: os.PathLike):
36        if self.read_only:
37            return
38
39        path = self.path(instance_dir)
40        if not os.path.exists(path):
41            snapshot_path = self.path(image_dir)
42            shutil.copy(snapshot_path, path)
43
44    def delete_image_changes(self, instance_dir: os.PathLike):
45        if self.read_only:
46            return
47
48        try:
49            os.remove(self.path(instance_dir))
50        except FileNotFoundError:
51            pass
52
53    def args(self, image_dir: os.PathLike, instance_dir: os.PathLike):
54        path = self.path(image_dir if self.read_only else instance_dir)
55        snapshot = "on" if self.read_only else "off"
56        return [
57            "-drive",
58            (f"file={path},index={self.index},if=none,"
59             + f"id=hd{self.index_letter()},format=raw,snapshot={snapshot}"),
60            "-device",
61            f"virtio-blk-device,drive=hd{self.index_letter()}"
62        ]
63
64class QemuArm64Options(object):
65
66    MACHINE = "virt,secure=on,virtualization=on"
67
68    BASIC_ARGS = [
69        "-nographic", "-cpu", "max,sve=off,pauth=off", "-smp", "4", "-m", "1024", "-d",
70        "unimp", "-semihosting-config", "enable,target=native", "-no-acpi",
71    ]
72
73    LINUX_ARGS = (
74        "earlyprintk console=ttyAMA0,38400 keep_bootcon "
75        "root=/dev/ram0 init=/init androidboot.hardware=qemu_trusty "
76        "trusty-log.log_ratelimit_interval=0 trusty-log.log_to_dmesg=always")
77
78    def __init__(self, config, instance_dir):
79        self.args = []
80        self.config = config
81        self.instance_dir = instance_dir
82
83    def create_rpmb_data(self):
84        """If no rpmb data image exists, copy the snapshot to create new one."""
85        os.makedirs(self.instance_dir, exist_ok=True)
86        path = self.rpmb_data_path()
87        if not os.path.exists(path):
88            shutil.copy(self.rpmb_data_snapshot_path(), path)
89
90    def delete_rpmb_data(self):
91        try:
92            os.remove(self.rpmb_data_path())
93        except FileNotFoundError:
94            pass
95
96    def rpmb_data_snapshot_path(self):
97        return f"{self.config.atf}/RPMB_DATA"
98
99    def rpmb_data_path(self):
100        return f"{self.instance_dir}/RPMB_DATA"
101
102    def rpmb_options(self, sock):
103        return [
104            "-device", "virtio-serial",
105            "-device", "virtserialport,chardev=rpmb0,name=rpmb0",
106            "-chardev", f"socket,id=rpmb0,path={sock}"]
107
108    def get_initrd_filename(self):
109        return self.config.android_image_dir + "/ramdisk.img"
110
111    def initrd_dts(self):
112        file_stats = os.stat(self.get_initrd_filename())
113        start_addr = 0x48000000
114        end_addr = start_addr + file_stats.st_size
115
116        return f"""/ {{
117        chosen {{
118            linux,initrd-start = <0x0 0x{start_addr:08x}>;
119            linux,initrd-end = <0x0 0x{end_addr:08x}>;
120        }};
121    }};
122        """
123
124    def gen_dtb(self, args, dtb_tmp_file):
125        """Computes a trusty device tree, returning a file for it"""
126        dtc = _find_dtc()
127        if dtc is None:
128            raise RunnerGenericError("Could not find dtc tool")
129        with tempfile.NamedTemporaryFile() as dtb_gen:
130            dump_dtb_cmd = [
131                self.config.qemu, "-machine",
132                f"{self.MACHINE},dumpdtb={dtb_gen.name}"
133            ] + [arg for arg in args if arg != "-S"]
134            logger.info("dump dtb command: %s", " ".join(dump_dtb_cmd))
135            returncode = subprocess.call(dump_dtb_cmd)
136            if returncode != 0:
137                raise RunnerGenericError(
138                    f"dumping dtb failed with {returncode}")
139            dtb_to_dts_cmd = [dtc, "-q", "-O", "dts", dtb_gen.name]
140            logger.info("dtb to dts command: %s", " ".join(dtb_to_dts_cmd))
141            # pylint: disable=consider-using-with
142            with subprocess.Popen(dtb_to_dts_cmd,
143                                  stdout=subprocess.PIPE,
144                                  universal_newlines=True) as dtb_to_dts:
145                dts = dtb_to_dts.communicate()[0]
146                if dtb_to_dts.returncode != 0:
147                    raise RunnerGenericError(
148                        f"dtb_to_dts failed with {dtb_to_dts.returncode}")
149
150        dts += self.initrd_dts()
151
152        # Subprocess closes dtb, so we can't allow it to autodelete
153        dtb = dtb_tmp_file
154        dts_to_dtb_cmd = [dtc, "-q", "-O", "dtb"]
155        with subprocess.Popen(dts_to_dtb_cmd,
156                              stdin=subprocess.PIPE,
157                              stdout=dtb,
158                              universal_newlines=True) as dts_to_dtb:
159            dts_to_dtb.communicate(dts)
160            dts_to_dtb_ret = dts_to_dtb.wait()
161
162        if dts_to_dtb_ret:
163            raise RunnerGenericError(f"dts_to_dtb failed with {dts_to_dtb_ret}")
164        return ["-dtb", dtb.name]
165
166    def drives(self) -> list[QemuDrive]:
167        return [
168            QemuDrive("userdata", 2, read_only=False),
169            QemuDrive("vendor", 1),
170            QemuDrive("system", 0)
171        ]
172
173    def create_drives_data(self):
174        """If drives images don't exist, create some from their snapshots."""
175        os.makedirs(self.instance_dir, exist_ok=True)
176        for drive in self.drives():
177            drive.ensure_image_exists(self.config.android_image_dir, self.instance_dir)
178
179    def delete_drives_data(self):
180        for drive in self.drives():
181            drive.delete_image_changes(self.instance_dir)
182
183    def android_drives_args(self):
184        """Generates arguments for mapping all default drives"""
185        args = []
186        # This is order sensitive due to using e.g. root=/dev/vda
187        for drive in self.drives():
188            args += drive.args(self.config.android_image_dir, self.instance_dir)
189        return args
190
191    def machine_options(self):
192        return ["-machine", self.MACHINE]
193
194    def basic_options(self):
195        return list(self.BASIC_ARGS)
196
197    def bios_options(self):
198        return ["-bios", f"{self.config.atf}/bl1.bin"]
199
200    def find_kernel_image(self):
201        if pathlib.Path(self.config.linux).is_file():
202            return self.config.linux
203        image = f"{self.config.linux}/arch/{self.config.linux_arch}/boot/Image"
204        if os.path.exists(image):
205            return image
206        return None
207
208    def linux_options(self):
209        kernel = self.find_kernel_image()
210        if kernel is None:
211            raise RunnerGenericError("Could not find kernel image")
212        return [
213            "-kernel",
214            kernel,
215            "-initrd",
216            self.get_initrd_filename(),
217            "-append", self.LINUX_ARGS
218        ]
219
220    def android_trusty_user_data(self):
221        return os.path.join(self.config.android_image_dir, "data")
222