xref: /aosp_15_r20/tools/treble/cuttlefish/build_chd_utils.py (revision 105f628577ac4ba0e277a494fbb614ed8c12a994)
1#!/usr/bin/python3
2#
3# Copyright (C) 2023 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16
17import fnmatch
18import glob
19import os
20import shutil
21import subprocess
22import tempfile
23from typing import List, Tuple
24import zipfile
25
26
27def unzip_otatools(
28    otatools_zip_path: str, output_dir: str, patterns: List[str] = None
29) -> None:
30  """Unzip otatools to a directory and set the permissions for execution.
31
32  Args:
33    otatools_zip_path: The path to otatools zip archive.
34    output_dir: The root directory of the unzip output.
35    patterns: If provided, only extract files matching any of these patterns
36              from the otatools zip archive; otherwise, extract all files.
37  """
38  with zipfile.ZipFile(otatools_zip_path, 'r') as zf:
39    if patterns is None:
40      zf.extractall(path=output_dir)
41    else:
42      for file in zf.namelist():
43        if any(fnmatch.fnmatch(file, p) for p in patterns):
44          zf.extract(file, output_dir)
45
46  for f in glob.glob(os.path.join(output_dir, 'bin', '*')):
47    os.chmod(f, 0o777)
48
49
50def _parse_copy_file_pair(copy_file_pair: str) -> Tuple[str, str]:
51  """Convert a string to a source path and a destination path.
52
53  Args:
54    copy_file_pair: A string in the format of <src glob pattern>:<dst path>.
55
56  Returns:
57    The source path and the destination path.
58
59  Raises:
60    ValueError if the input string is in a wrong format.
61  """
62  split_pair = copy_file_pair.split(':', 1)
63  if len(split_pair) != 2:
64    raise ValueError(f'{copy_file_pair} is not a <src>:<dst> pair.')
65  src_list = glob.glob(split_pair[0])
66  if len(src_list) != 1:
67    raise ValueError(f'{copy_file_pair} has more than one matched src files: '
68                     f'{" ".join(src_list)}.')
69  return src_list[0], split_pair[1]
70
71
72def copy_files(copy_files_list: List[str], output_dir: str) -> None:
73  """Copy files to the output directory.
74
75  Args:
76    copy_files_list: A list of copy file pairs, where a pair defines the src
77                     glob pattern and the dst path.
78    output_dir: The root directory of the copy dst.
79
80  Raises:
81    FileExistsError if the dst file already exists.
82  """
83  for pair in copy_files_list:
84    src, dst = _parse_copy_file_pair(pair)
85    # this line does not change dst if dst is absolute.
86    dst = os.path.join(output_dir, dst)
87    os.makedirs(os.path.dirname(dst), exist_ok=True)
88    print(f'Copying {src} to {dst}')
89    if os.path.exists(dst):
90      raise FileExistsError(dst)
91    shutil.copyfile(src, dst)
92
93
94def _extract_cil_files(target_files_zip: str, output_dir: str) -> None:
95  """Extract sepolicy cil files from a target files zip archive.
96
97  Args:
98    target_files_zip: A path to the target files zip archive.
99    output_dir: The directory of extracted cil files.
100  """
101  with zipfile.ZipFile(target_files_zip, 'r') as zf:
102    cil_files = [name for name in zf.namelist() if name.endswith('.cil')]
103    for f in cil_files:
104      zf.extract(f, output_dir)
105
106
107def _get_sepolicy_plat_version(target_files_zip: str) -> str:
108  """Get the platform sepolicy version from a vendor target files zip archive.
109
110  Args:
111    target_files_zip: A path to the target files zip archive.
112
113  Returns:
114    A string that represents the platform sepolicy version.
115  """
116  with zipfile.ZipFile(target_files_zip, 'r') as zf:
117    try:
118      with zf.open('VENDOR/etc/selinux/plat_sepolicy_vers.txt') as ver_file:
119        return ver_file.readline().decode('utf-8').strip('\n')
120    except Exception as error:
121      print(f'cannot get platform sepolicy version from {target_files_zip}')
122      raise
123
124
125def merge_chd_sepolicy(
126    framework_target_files_zip: str, vendor_target_files_zip: str,
127    otatools_dir: str, output_dir: str
128) -> str:
129  """Merge the sepolicy files for CHD.
130
131  This function takes both the system and vendor sepolicy files from
132  framework_target_files_zip, and merges them with the vendor sepolicy from
133  vendor_target_files_zip to generate `chd_merged_sepolicy`.
134
135  In certain instances, a device may possess components that do not put their
136  sepolicy rules within the same partition as the components themselves. This
137  results in a problem that CHD is missing necessary vendor sepolicy rules
138  after the replacement of the device's vendor image with Cuttlefish. As a
139  short term solution to resolve this issue, the vendor sepolicy files from
140  framework_target_files_zip are additionally merged.
141
142  Args:
143    framework_target_files_zip: A path to the framework target files zip
144                                archive.
145    vendor_target_files_zip: A path to the vendor target files zip archive.
146    otatools_dir: The otatools directory.
147    output_dir: The output directory for generating a merged sepolicy file.
148
149  Returns:
150    The path to the CHD merged sepolicy file.
151
152  Raises:
153    FileNotFoundError if any mandatory sepolicy file is missing.
154  """
155  with tempfile.TemporaryDirectory(prefix='framework_',
156                                   dir=output_dir) as framework_dir, \
157       tempfile.TemporaryDirectory(prefix='vendor_',
158                                   dir=output_dir) as vendor_dir:
159    merged_policy = os.path.join(output_dir, 'chd_merged_sepolicy')
160    _extract_cil_files(framework_target_files_zip, framework_dir)
161    _extract_cil_files(vendor_target_files_zip, vendor_dir)
162    plat_ver = _get_sepolicy_plat_version(vendor_target_files_zip)
163    print(f'Merging sepolicy files from {framework_target_files_zip} and '
164          f'{vendor_target_files_zip}: platform version {plat_ver}.')
165
166    # (partition, path, required)
167    system_policy_files = (
168        ('system', 'etc/selinux/plat_sepolicy.cil', True),
169        ('system', f'etc/selinux/mapping/{plat_ver}.cil', True),
170        ('system', f'etc/selinux/mapping/{plat_ver}.compat.cil', False),
171        ('system_ext', 'etc/selinux/system_ext_sepolicy.cil', False),
172        ('system_ext', f'etc/selinux/mapping/{plat_ver}.cil', False),
173        ('system_ext', f'etc/selinux/mapping/{plat_ver}.compat.cil', False),
174        ('product', 'etc/selinux/product_sepolicy.cil', False),
175        ('product', f'etc/selinux/mapping/{plat_ver}.cil', False),
176    )
177    vendor_policy_files = (
178        ('vendor', 'etc/selinux/vendor_sepolicy.cil', True),
179        ('vendor', 'etc/selinux/plat_pub_versioned.cil', True),
180        ('odm', 'etc/selinux/odm_sepolicy.cil', False),
181    )
182
183    # merge system and vendor policy files from framework_dir with vendor
184    # policy files from vendor_dir.
185    merge_cmd = [
186        os.path.join(otatools_dir, 'bin', 'secilc'),
187        '-m', '-M', 'true', '-G', '-N',
188        '-o', merged_policy,
189        '-f', '/dev/null'
190    ]
191    policy_dirs_and_files = (
192        # For the normal case, we should merge the system policies from
193        # framework_dir with the vendor policies from vendor_dir.
194        (framework_dir, system_policy_files),
195        (vendor_dir, vendor_policy_files),
196
197        # Additionally merging the vendor policies from framework_dir in order
198        # to fix the policy misplaced issue.
199        # TODO (b/315474132): remove this when all the policies from
200        #                     framework_dir are moved to the right partition.
201        (framework_dir, vendor_policy_files),
202    )
203    for policy_dir, policy_files in policy_dirs_and_files:
204      for partition, path, required in policy_files:
205        policy_file = os.path.join(policy_dir, partition.upper(), path)
206        if os.path.exists(policy_file):
207          merge_cmd.append(policy_file)
208        elif required:
209          raise FileNotFoundError(f'{policy_file} does not exist')
210
211    subprocess.run(merge_cmd, check=True)
212    return merged_policy
213