1# Copyright 2018 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import gzip 6import os 7import shutil 8import subprocess 9import threading 10 11class Archiver(): 12 """ 13 An instance of this class stores set of files in given directory on local 14 filesystem. Stored files are automatically compressed and organized into 15 tar.xz archives based on their filenames prefixes. It is a very useful tool 16 when one has to deal with many files with similar content that are generated 17 continuously. Packing similar files together into tar.xz archive can 18 singificantly reduce amount of required disk space (even for gzipped files). 19 As a parameter, the constructor takes set of filenames prefixes. These 20 prefixes are automatically clustered into archives by their common prefixes 21 (yes, prefixes of prefixes). These archives are automatically created, when 22 all files assigned to the given set of prefixes is added to Archiver object. 23 Methods provided by this class are synchronized and can be called from 24 different Python threads. 25 26 """ 27 28 def _split_names_by_prefixes( 29 self, names, max_names_per_prefix, prefix_length=0): 30 """ 31 Recursive function used to split given set of names into groups by 32 common prefixes. It tries to find configuration with minimum number of 33 groups (prefixes) where the number of elements (names) in each group is 34 not larger than given parameter. 35 36 @param names: list of names to split into groups (names MUST BE sorted 37 and unique). 38 @param max_names_per_prefix: maximum number of names assigned to 39 group (single prefix). 40 @param prefix_length: current length of the prefix (for recursive 41 calls); all elements in the list given as the parameter 'names' 42 MUST HAVE the same prefix with this length. 43 @returns dictionary with prefixes (each one represents single group) and 44 size (a number of names in the group). 45 46 """ 47 assert max_names_per_prefix > 1 48 # Returns the current prefix if the group is small enough 49 if len(names) <= max_names_per_prefix: 50 return { names[0][0:prefix_length] : len(names) } 51 # Increases prefix_length until a difference is found: 52 # - elements in 'names' are sorted and unique 53 # - elements in 'names' have a common prefix with a length of 54 # 'prefix_length' characters 55 while ( len(names[0]) > prefix_length and 56 names[0][prefix_length] == names[-1][prefix_length] ): 57 prefix_length += 1 58 # Checks for special case, when the first name == prefix 59 if len(names[0]) == prefix_length: 60 return { names[0][0:prefix_length] : len(names) } 61 # Calculates resultant list of prefixes 62 results = dict() 63 i_begin = 0 64 # Calculates all prefixes (groups) using recursion: 65 # - 'prefix_length' points to the first character that differentiates 66 # elements from the 'names' list 67 while i_begin < len(names): 68 char = names[i_begin][prefix_length] 69 i_end = i_begin + 1 70 while i_end < len(names) and char == names[i_end][prefix_length]: 71 i_end += 1 72 results.update(self._split_names_by_prefixes(names[i_begin:i_end], 73 max_names_per_prefix, prefix_length+1)) 74 i_begin = i_end 75 return results 76 77 78 def __init__(self, path_directory, prefixes, max_prefixes_per_archive): 79 """ 80 Constructor. 81 82 @param path_directory: directory where files and archives are stored. 83 It is created if not exists. 84 @param prefixes: a set of allowed filenames prefixes. 85 @param max_prefixes_per_archive: maximum number of filenames prefixes 86 assigned to single group (archive). 87 88 """ 89 self._lock = threading.Lock() 90 self._path_directory = path_directory 91 if not os.path.exists(self._path_directory): 92 os.makedirs(self._path_directory) 93 94 prefixes = sorted(set(prefixes)) 95 self._archives_names = self._split_names_by_prefixes(prefixes, 96 max_prefixes_per_archive) 97 self._filenames_prefixes = dict() 98 prefixes.reverse() 99 for ap, fc in sorted(self._archives_names.items()): 100 self._archives_names[ap] = [fc, []] 101 while fc > 0: 102 self._filenames_prefixes[prefixes.pop()] = [ap, set()] 103 fc -= 1 104 105 106 def save_file(self, prefix, name, content, apply_gzip=False): 107 """ 108 Add a new file with given content to the archive. 109 110 @param prefix: prefix of filename that the new file will be saved with 111 @param name: the rest of the filename of the new file; in summary, the 112 resultant filename of the new file will be prefix+name 113 @param content: a content of the file (bytes) 114 @param apply_gzip: if true, the added file will be gzipped, the suffix 115 .gz will be added to its resultant filename 116 117 """ 118 if apply_gzip: 119 name += ".gz" 120 path_target = os.path.join(self._path_directory, prefix + name) 121 122 with self._lock: 123 assert prefix in self._filenames_prefixes 124 assert self._filenames_prefixes[prefix][1] is not None 125 assert name not in self._filenames_prefixes[prefix][1] 126 self._filenames_prefixes[prefix][1].add(name) 127 128 if apply_gzip: 129 file_target = gzip.GzipFile(path_target, 'wb', 9, None, 0) 130 else: 131 file_target = open(path_target, 'wb') 132 with file_target: 133 file_target.write(content) 134 135 136 def copy_file(self, prefix, name, path_file, apply_gzip=False): 137 """ 138 Add a new file to the archive. The file is copied from given location. 139 140 @param prefix: prefix of filename that the new file will be saved with 141 @param name: the rest of the filename of the new file; in summary, the 142 resultant filename of the new file will be prefix+name 143 @param path_file: path to the source file 144 @param apply_gzip: if true, the added file will be gzipped, the suffix 145 .gz will be added to its resultant filename 146 147 """ 148 with open(path_file, 'rb') as file_source: 149 content = file_source.read() 150 self.save_file(prefix, name, content, apply_gzip) 151 152 153 def move_file(self, prefix, name, path_file, apply_gzip=False): 154 """ 155 Add a new file to the archive. The file is moved, i.e. an original 156 file is deleted. 157 158 @param prefix: prefix of filename that the new file will be saved with 159 @param name: the rest of the filename of the new file; in summary, the 160 resultant filename of the new file will be prefix+name 161 @param path_file: path to the source file, it will be deleted 162 @param apply_gzip: if true, the added file will be gzipped, the suffix 163 .gz will be added to its resultant filename 164 165 """ 166 if apply_gzip: 167 self.copy_file(prefix, name, path_file, apply_gzip) 168 os.remove(path_file) 169 else: 170 path_target = os.path.join(self._path_directory, prefix + name) 171 with self._lock: 172 assert prefix in self._filenames_prefixes 173 assert self._filenames_prefixes[prefix][1] is not None 174 assert name not in self._filenames_prefixes[prefix][1] 175 self._filenames_prefixes[prefix][1].add(name) 176 shutil.move(path_file, path_target) 177 178 179 def finalize_prefix(self, prefix): 180 """ 181 This method is called to mark that there is no more files to add with 182 given prefix. This method creates a tar archive when the last prefix 183 assigned to the corresponding group is finalized. This method must be 184 called for all prefixes given to the constructor. 185 186 @param prefix: prefix to finalize, no more files with this prefix can 187 be added to the archive 188 189 """ 190 with self._lock: 191 assert prefix in self._filenames_prefixes 192 assert self._filenames_prefixes[prefix][1] is not None 193 194 filenames = [] 195 for name in sorted(self._filenames_prefixes[prefix][1]): 196 filenames.append(prefix + name) 197 self._filenames_prefixes[prefix][1] = None 198 archive_name = self._filenames_prefixes[prefix][0] 199 200 self._archives_names[archive_name][0] -= 1 201 self._archives_names[archive_name][1] += filenames 202 if self._archives_names[archive_name][0] == 0: 203 archive_is_complete = True 204 filenames = self._archives_names[archive_name][1] 205 else: 206 archive_is_complete = False 207 208 if archive_is_complete and len(filenames) > 0: 209 argv = ['tar', 'cJf', 'archive_' + archive_name + '.tar.xz'] 210 argv += filenames 211 process_tar = subprocess.Popen(argv, cwd=self._path_directory) 212 if process_tar.wait() != 0: 213 raise Exception("Process 'tar cJf' failed!") 214 for filename in filenames: 215 os.remove(os.path.join(self._path_directory, filename)) 216