xref: /aosp_15_r20/external/fonttools/Lib/fontTools/varLib/varStore.py (revision e1fe3e4ad2793916b15cccdc4a7da52a7e1dd0e9)
1from fontTools.misc.roundTools import noRound, otRound
2from fontTools.misc.intTools import bit_count
3from fontTools.ttLib.tables import otTables as ot
4from fontTools.varLib.models import supportScalar
5from fontTools.varLib.builder import (
6    buildVarRegionList,
7    buildVarStore,
8    buildVarRegion,
9    buildVarData,
10)
11from functools import partial
12from collections import defaultdict
13from heapq import heappush, heappop
14
15
16NO_VARIATION_INDEX = ot.NO_VARIATION_INDEX
17ot.VarStore.NO_VARIATION_INDEX = NO_VARIATION_INDEX
18
19
20def _getLocationKey(loc):
21    return tuple(sorted(loc.items(), key=lambda kv: kv[0]))
22
23
24class OnlineVarStoreBuilder(object):
25    def __init__(self, axisTags):
26        self._axisTags = axisTags
27        self._regionMap = {}
28        self._regionList = buildVarRegionList([], axisTags)
29        self._store = buildVarStore(self._regionList, [])
30        self._data = None
31        self._model = None
32        self._supports = None
33        self._varDataIndices = {}
34        self._varDataCaches = {}
35        self._cache = {}
36
37    def setModel(self, model):
38        self.setSupports(model.supports)
39        self._model = model
40
41    def setSupports(self, supports):
42        self._model = None
43        self._supports = list(supports)
44        if not self._supports[0]:
45            del self._supports[0]  # Drop base master support
46        self._cache = {}
47        self._data = None
48
49    def finish(self, optimize=True):
50        self._regionList.RegionCount = len(self._regionList.Region)
51        self._store.VarDataCount = len(self._store.VarData)
52        for data in self._store.VarData:
53            data.ItemCount = len(data.Item)
54            data.calculateNumShorts(optimize=optimize)
55        return self._store
56
57    def _add_VarData(self):
58        regionMap = self._regionMap
59        regionList = self._regionList
60
61        regions = self._supports
62        regionIndices = []
63        for region in regions:
64            key = _getLocationKey(region)
65            idx = regionMap.get(key)
66            if idx is None:
67                varRegion = buildVarRegion(region, self._axisTags)
68                idx = regionMap[key] = len(regionList.Region)
69                regionList.Region.append(varRegion)
70            regionIndices.append(idx)
71
72        # Check if we have one already...
73        key = tuple(regionIndices)
74        varDataIdx = self._varDataIndices.get(key)
75        if varDataIdx is not None:
76            self._outer = varDataIdx
77            self._data = self._store.VarData[varDataIdx]
78            self._cache = self._varDataCaches[key]
79            if len(self._data.Item) == 0xFFFF:
80                # This is full.  Need new one.
81                varDataIdx = None
82
83        if varDataIdx is None:
84            self._data = buildVarData(regionIndices, [], optimize=False)
85            self._outer = len(self._store.VarData)
86            self._store.VarData.append(self._data)
87            self._varDataIndices[key] = self._outer
88            if key not in self._varDataCaches:
89                self._varDataCaches[key] = {}
90            self._cache = self._varDataCaches[key]
91
92    def storeMasters(self, master_values, *, round=round):
93        deltas = self._model.getDeltas(master_values, round=round)
94        base = deltas.pop(0)
95        return base, self.storeDeltas(deltas, round=noRound)
96
97    def storeDeltas(self, deltas, *, round=round):
98        deltas = [round(d) for d in deltas]
99        if len(deltas) == len(self._supports) + 1:
100            deltas = tuple(deltas[1:])
101        else:
102            assert len(deltas) == len(self._supports)
103            deltas = tuple(deltas)
104
105        varIdx = self._cache.get(deltas)
106        if varIdx is not None:
107            return varIdx
108
109        if not self._data:
110            self._add_VarData()
111        inner = len(self._data.Item)
112        if inner == 0xFFFF:
113            # Full array. Start new one.
114            self._add_VarData()
115            return self.storeDeltas(deltas)
116        self._data.addItem(deltas, round=noRound)
117
118        varIdx = (self._outer << 16) + inner
119        self._cache[deltas] = varIdx
120        return varIdx
121
122
123def VarData_addItem(self, deltas, *, round=round):
124    deltas = [round(d) for d in deltas]
125
126    countUs = self.VarRegionCount
127    countThem = len(deltas)
128    if countUs + 1 == countThem:
129        deltas = list(deltas[1:])
130    else:
131        assert countUs == countThem, (countUs, countThem)
132        deltas = list(deltas)
133    self.Item.append(deltas)
134    self.ItemCount = len(self.Item)
135
136
137ot.VarData.addItem = VarData_addItem
138
139
140def VarRegion_get_support(self, fvar_axes):
141    return {
142        fvar_axes[i].axisTag: (reg.StartCoord, reg.PeakCoord, reg.EndCoord)
143        for i, reg in enumerate(self.VarRegionAxis)
144        if reg.PeakCoord != 0
145    }
146
147
148ot.VarRegion.get_support = VarRegion_get_support
149
150
151def VarStore___bool__(self):
152    return bool(self.VarData)
153
154
155ot.VarStore.__bool__ = VarStore___bool__
156
157
158class VarStoreInstancer(object):
159    def __init__(self, varstore, fvar_axes, location={}):
160        self.fvar_axes = fvar_axes
161        assert varstore is None or varstore.Format == 1
162        self._varData = varstore.VarData if varstore else []
163        self._regions = varstore.VarRegionList.Region if varstore else []
164        self.setLocation(location)
165
166    def setLocation(self, location):
167        self.location = dict(location)
168        self._clearCaches()
169
170    def _clearCaches(self):
171        self._scalars = {}
172
173    def _getScalar(self, regionIdx):
174        scalar = self._scalars.get(regionIdx)
175        if scalar is None:
176            support = self._regions[regionIdx].get_support(self.fvar_axes)
177            scalar = supportScalar(self.location, support)
178            self._scalars[regionIdx] = scalar
179        return scalar
180
181    @staticmethod
182    def interpolateFromDeltasAndScalars(deltas, scalars):
183        delta = 0.0
184        for d, s in zip(deltas, scalars):
185            if not s:
186                continue
187            delta += d * s
188        return delta
189
190    def __getitem__(self, varidx):
191        major, minor = varidx >> 16, varidx & 0xFFFF
192        if varidx == NO_VARIATION_INDEX:
193            return 0.0
194        varData = self._varData
195        scalars = [self._getScalar(ri) for ri in varData[major].VarRegionIndex]
196        deltas = varData[major].Item[minor]
197        return self.interpolateFromDeltasAndScalars(deltas, scalars)
198
199    def interpolateFromDeltas(self, varDataIndex, deltas):
200        varData = self._varData
201        scalars = [self._getScalar(ri) for ri in varData[varDataIndex].VarRegionIndex]
202        return self.interpolateFromDeltasAndScalars(deltas, scalars)
203
204
205#
206# Optimizations
207#
208# retainFirstMap - If true, major 0 mappings are retained. Deltas for unused indices are zeroed
209# advIdxes - Set of major 0 indices for advance deltas to be listed first. Other major 0 indices follow.
210
211
212def VarStore_subset_varidxes(
213    self, varIdxes, optimize=True, retainFirstMap=False, advIdxes=set()
214):
215    # Sort out used varIdxes by major/minor.
216    used = {}
217    for varIdx in varIdxes:
218        if varIdx == NO_VARIATION_INDEX:
219            continue
220        major = varIdx >> 16
221        minor = varIdx & 0xFFFF
222        d = used.get(major)
223        if d is None:
224            d = used[major] = set()
225        d.add(minor)
226    del varIdxes
227
228    #
229    # Subset VarData
230    #
231
232    varData = self.VarData
233    newVarData = []
234    varDataMap = {NO_VARIATION_INDEX: NO_VARIATION_INDEX}
235    for major, data in enumerate(varData):
236        usedMinors = used.get(major)
237        if usedMinors is None:
238            continue
239        newMajor = len(newVarData)
240        newVarData.append(data)
241
242        items = data.Item
243        newItems = []
244        if major == 0 and retainFirstMap:
245            for minor in range(len(items)):
246                newItems.append(
247                    items[minor] if minor in usedMinors else [0] * len(items[minor])
248                )
249                varDataMap[minor] = minor
250        else:
251            if major == 0:
252                minors = sorted(advIdxes) + sorted(usedMinors - advIdxes)
253            else:
254                minors = sorted(usedMinors)
255            for minor in minors:
256                newMinor = len(newItems)
257                newItems.append(items[minor])
258                varDataMap[(major << 16) + minor] = (newMajor << 16) + newMinor
259
260        data.Item = newItems
261        data.ItemCount = len(data.Item)
262
263        data.calculateNumShorts(optimize=optimize)
264
265    self.VarData = newVarData
266    self.VarDataCount = len(self.VarData)
267
268    self.prune_regions()
269
270    return varDataMap
271
272
273ot.VarStore.subset_varidxes = VarStore_subset_varidxes
274
275
276def VarStore_prune_regions(self):
277    """Remove unused VarRegions."""
278    #
279    # Subset VarRegionList
280    #
281
282    # Collect.
283    usedRegions = set()
284    for data in self.VarData:
285        usedRegions.update(data.VarRegionIndex)
286    # Subset.
287    regionList = self.VarRegionList
288    regions = regionList.Region
289    newRegions = []
290    regionMap = {}
291    for i in sorted(usedRegions):
292        regionMap[i] = len(newRegions)
293        newRegions.append(regions[i])
294    regionList.Region = newRegions
295    regionList.RegionCount = len(regionList.Region)
296    # Map.
297    for data in self.VarData:
298        data.VarRegionIndex = [regionMap[i] for i in data.VarRegionIndex]
299
300
301ot.VarStore.prune_regions = VarStore_prune_regions
302
303
304def _visit(self, func):
305    """Recurse down from self, if type of an object is ot.Device,
306    call func() on it.  Works on otData-style classes."""
307
308    if type(self) == ot.Device:
309        func(self)
310
311    elif isinstance(self, list):
312        for that in self:
313            _visit(that, func)
314
315    elif hasattr(self, "getConverters") and not hasattr(self, "postRead"):
316        for conv in self.getConverters():
317            that = getattr(self, conv.name, None)
318            if that is not None:
319                _visit(that, func)
320
321    elif isinstance(self, ot.ValueRecord):
322        for that in self.__dict__.values():
323            _visit(that, func)
324
325
326def _Device_recordVarIdx(self, s):
327    """Add VarIdx in this Device table (if any) to the set s."""
328    if self.DeltaFormat == 0x8000:
329        s.add((self.StartSize << 16) + self.EndSize)
330
331
332def Object_collect_device_varidxes(self, varidxes):
333    adder = partial(_Device_recordVarIdx, s=varidxes)
334    _visit(self, adder)
335
336
337ot.GDEF.collect_device_varidxes = Object_collect_device_varidxes
338ot.GPOS.collect_device_varidxes = Object_collect_device_varidxes
339
340
341def _Device_mapVarIdx(self, mapping, done):
342    """Map VarIdx in this Device table (if any) through mapping."""
343    if id(self) in done:
344        return
345    done.add(id(self))
346    if self.DeltaFormat == 0x8000:
347        varIdx = mapping[(self.StartSize << 16) + self.EndSize]
348        self.StartSize = varIdx >> 16
349        self.EndSize = varIdx & 0xFFFF
350
351
352def Object_remap_device_varidxes(self, varidxes_map):
353    mapper = partial(_Device_mapVarIdx, mapping=varidxes_map, done=set())
354    _visit(self, mapper)
355
356
357ot.GDEF.remap_device_varidxes = Object_remap_device_varidxes
358ot.GPOS.remap_device_varidxes = Object_remap_device_varidxes
359
360
361class _Encoding(object):
362    def __init__(self, chars):
363        self.chars = chars
364        self.width = bit_count(chars)
365        self.columns = self._columns(chars)
366        self.overhead = self._characteristic_overhead(self.columns)
367        self.items = set()
368
369    def append(self, row):
370        self.items.add(row)
371
372    def extend(self, lst):
373        self.items.update(lst)
374
375    def get_room(self):
376        """Maximum number of bytes that can be added to characteristic
377        while still being beneficial to merge it into another one."""
378        count = len(self.items)
379        return max(0, (self.overhead - 1) // count - self.width)
380
381    room = property(get_room)
382
383    def get_gain(self):
384        """Maximum possible byte gain from merging this into another
385        characteristic."""
386        count = len(self.items)
387        return max(0, self.overhead - count)
388
389    gain = property(get_gain)
390
391    def gain_sort_key(self):
392        return self.gain, self.chars
393
394    def width_sort_key(self):
395        return self.width, self.chars
396
397    @staticmethod
398    def _characteristic_overhead(columns):
399        """Returns overhead in bytes of encoding this characteristic
400        as a VarData."""
401        c = 4 + 6  # 4 bytes for LOffset, 6 bytes for VarData header
402        c += bit_count(columns) * 2
403        return c
404
405    @staticmethod
406    def _columns(chars):
407        cols = 0
408        i = 1
409        while chars:
410            if chars & 0b1111:
411                cols |= i
412            chars >>= 4
413            i <<= 1
414        return cols
415
416    def gain_from_merging(self, other_encoding):
417        combined_chars = other_encoding.chars | self.chars
418        combined_width = bit_count(combined_chars)
419        combined_columns = self.columns | other_encoding.columns
420        combined_overhead = _Encoding._characteristic_overhead(combined_columns)
421        combined_gain = (
422            +self.overhead
423            + other_encoding.overhead
424            - combined_overhead
425            - (combined_width - self.width) * len(self.items)
426            - (combined_width - other_encoding.width) * len(other_encoding.items)
427        )
428        return combined_gain
429
430
431class _EncodingDict(dict):
432    def __missing__(self, chars):
433        r = self[chars] = _Encoding(chars)
434        return r
435
436    def add_row(self, row):
437        chars = self._row_characteristics(row)
438        self[chars].append(row)
439
440    @staticmethod
441    def _row_characteristics(row):
442        """Returns encoding characteristics for a row."""
443        longWords = False
444
445        chars = 0
446        i = 1
447        for v in row:
448            if v:
449                chars += i
450            if not (-128 <= v <= 127):
451                chars += i * 0b0010
452            if not (-32768 <= v <= 32767):
453                longWords = True
454                break
455            i <<= 4
456
457        if longWords:
458            # Redo; only allow 2byte/4byte encoding
459            chars = 0
460            i = 1
461            for v in row:
462                if v:
463                    chars += i * 0b0011
464                if not (-32768 <= v <= 32767):
465                    chars += i * 0b1100
466                i <<= 4
467
468        return chars
469
470
471def VarStore_optimize(self, use_NO_VARIATION_INDEX=True, quantization=1):
472    """Optimize storage. Returns mapping from old VarIdxes to new ones."""
473
474    # Overview:
475    #
476    # For each VarData row, we first extend it with zeroes to have
477    # one column per region in VarRegionList. We then group the
478    # rows into _Encoding objects, by their "characteristic" bitmap.
479    # The characteristic bitmap is a binary number representing how
480    # many bytes each column of the data takes up to encode. Each
481    # column is encoded in four bits. For example, if a column has
482    # only values in the range -128..127, it would only have a single
483    # bit set in the characteristic bitmap for that column. If it has
484    # values in the range -32768..32767, it would have two bits set.
485    # The number of ones in the characteristic bitmap is the "width"
486    # of the encoding.
487    #
488    # Each encoding as such has a number of "active" (ie. non-zero)
489    # columns. The overhead of encoding the characteristic bitmap
490    # is 10 bytes, plus 2 bytes per active column.
491    #
492    # When an encoding is merged into another one, if the characteristic
493    # of the old encoding is a subset of the new one, then the overhead
494    # of the old encoding is completely eliminated. However, each row
495    # now would require more bytes to encode, to the tune of one byte
496    # per characteristic bit that is active in the new encoding but not
497    # in the old one. The number of bits that can be added to an encoding
498    # while still beneficial to merge it into another encoding is called
499    # the "room" for that encoding.
500    #
501    # The "gain" of an encodings is the maximum number of bytes we can
502    # save by merging it into another encoding. The "gain" of merging
503    # two encodings is how many bytes we save by doing so.
504    #
505    # High-level algorithm:
506    #
507    # - Each encoding has a minimal way to encode it. However, because
508    #   of the overhead of encoding the characteristic bitmap, it may
509    #   be beneficial to merge two encodings together, if there is
510    #   gain in doing so. As such, we need to search for the best
511    #   such successive merges.
512    #
513    # Algorithm:
514    #
515    # - Put all encodings into a "todo" list.
516    #
517    # - Sort todo list by decreasing gain (for stability).
518    #
519    # - Make a priority-queue of the gain from combining each two
520    #   encodings in the todo list. The priority queue is sorted by
521    #   decreasing gain. Only positive gains are included.
522    #
523    # - While priority queue is not empty:
524    #   - Pop the first item from the priority queue,
525    #   - Merge the two encodings it represents,
526    #   - Remove the two encodings from the todo list,
527    #   - Insert positive gains from combining the new encoding with
528    #     all existing todo list items into the priority queue,
529    #   - If a todo list item with the same characteristic bitmap as
530    #     the new encoding exists, remove it from the todo list and
531    #     merge it into the new encoding.
532    #   - Insert the new encoding into the todo list,
533    #
534    # - Encode all remaining items in the todo list.
535    #
536    # The output is then sorted for stability, in the following way:
537    # - The VarRegionList of the input is kept intact.
538    # - All encodings are sorted before the main algorithm, by
539    #   gain_key_sort(), which is a tuple of the following items:
540    #   * The gain of the encoding.
541    #   * The characteristic bitmap of the encoding, with higher-numbered
542    #     columns compared first.
543    # - The VarData is sorted by width_sort_key(), which is a tuple
544    #   of the following items:
545    #   * The "width" of the encoding.
546    #   * The characteristic bitmap of the encoding, with higher-numbered
547    #     columns compared first.
548    # - Within each VarData, the items are sorted as vectors of numbers.
549    #
550    # Finally, each VarData is optimized to remove the empty columns and
551    # reorder columns as needed.
552
553    # TODO
554    # Check that no two VarRegions are the same; if they are, fold them.
555
556    n = len(self.VarRegionList.Region)  # Number of columns
557    zeroes = [0] * n
558
559    front_mapping = {}  # Map from old VarIdxes to full row tuples
560
561    encodings = _EncodingDict()
562
563    # Collect all items into a set of full rows (with lots of zeroes.)
564    for major, data in enumerate(self.VarData):
565        regionIndices = data.VarRegionIndex
566
567        for minor, item in enumerate(data.Item):
568            row = list(zeroes)
569
570            if quantization == 1:
571                for regionIdx, v in zip(regionIndices, item):
572                    row[regionIdx] += v
573            else:
574                for regionIdx, v in zip(regionIndices, item):
575                    row[regionIdx] += (
576                        round(v / quantization) * quantization
577                    )  # TODO https://github.com/fonttools/fonttools/pull/3126#discussion_r1205439785
578
579            row = tuple(row)
580
581            if use_NO_VARIATION_INDEX and not any(row):
582                front_mapping[(major << 16) + minor] = None
583                continue
584
585            encodings.add_row(row)
586            front_mapping[(major << 16) + minor] = row
587
588    # Prepare for the main algorithm.
589    todo = sorted(encodings.values(), key=_Encoding.gain_sort_key)
590    del encodings
591
592    # Repeatedly pick two best encodings to combine, and combine them.
593
594    heap = []
595    for i, encoding in enumerate(todo):
596        for j in range(i + 1, len(todo)):
597            other_encoding = todo[j]
598            combining_gain = encoding.gain_from_merging(other_encoding)
599            if combining_gain > 0:
600                heappush(heap, (-combining_gain, i, j))
601
602    while heap:
603        _, i, j = heappop(heap)
604        if todo[i] is None or todo[j] is None:
605            continue
606
607        encoding, other_encoding = todo[i], todo[j]
608        todo[i], todo[j] = None, None
609
610        # Combine the two encodings
611        combined_chars = other_encoding.chars | encoding.chars
612        combined_encoding = _Encoding(combined_chars)
613        combined_encoding.extend(encoding.items)
614        combined_encoding.extend(other_encoding.items)
615
616        for k, enc in enumerate(todo):
617            if enc is None:
618                continue
619
620            # In the unlikely event that the same encoding exists already,
621            # combine it.
622            if enc.chars == combined_chars:
623                combined_encoding.extend(enc.items)
624                todo[k] = None
625                continue
626
627            combining_gain = combined_encoding.gain_from_merging(enc)
628            if combining_gain > 0:
629                heappush(heap, (-combining_gain, k, len(todo)))
630
631        todo.append(combined_encoding)
632
633    encodings = [encoding for encoding in todo if encoding is not None]
634
635    # Assemble final store.
636    back_mapping = {}  # Mapping from full rows to new VarIdxes
637    encodings.sort(key=_Encoding.width_sort_key)
638    self.VarData = []
639    for encoding in encodings:
640        items = sorted(encoding.items)
641
642        while items:
643            major = len(self.VarData)
644            data = ot.VarData()
645            self.VarData.append(data)
646            data.VarRegionIndex = range(n)
647            data.VarRegionCount = len(data.VarRegionIndex)
648
649            # Each major can only encode up to 0xFFFF entries.
650            data.Item, items = items[:0xFFFF], items[0xFFFF:]
651
652            for minor, item in enumerate(data.Item):
653                back_mapping[item] = (major << 16) + minor
654
655    # Compile final mapping.
656    varidx_map = {NO_VARIATION_INDEX: NO_VARIATION_INDEX}
657    for k, v in front_mapping.items():
658        varidx_map[k] = back_mapping[v] if v is not None else NO_VARIATION_INDEX
659
660    # Recalculate things and go home.
661    self.VarRegionList.RegionCount = len(self.VarRegionList.Region)
662    self.VarDataCount = len(self.VarData)
663    for data in self.VarData:
664        data.ItemCount = len(data.Item)
665        data.optimize()
666
667    # Remove unused regions.
668    self.prune_regions()
669
670    return varidx_map
671
672
673ot.VarStore.optimize = VarStore_optimize
674
675
676def main(args=None):
677    """Optimize a font's GDEF variation store"""
678    from argparse import ArgumentParser
679    from fontTools import configLogger
680    from fontTools.ttLib import TTFont
681    from fontTools.ttLib.tables.otBase import OTTableWriter
682
683    parser = ArgumentParser(prog="varLib.varStore", description=main.__doc__)
684    parser.add_argument("--quantization", type=int, default=1)
685    parser.add_argument("fontfile")
686    parser.add_argument("outfile", nargs="?")
687    options = parser.parse_args(args)
688
689    # TODO: allow user to configure logging via command-line options
690    configLogger(level="INFO")
691
692    quantization = options.quantization
693    fontfile = options.fontfile
694    outfile = options.outfile
695
696    font = TTFont(fontfile)
697    gdef = font["GDEF"]
698    store = gdef.table.VarStore
699
700    writer = OTTableWriter()
701    store.compile(writer, font)
702    size = len(writer.getAllData())
703    print("Before: %7d bytes" % size)
704
705    varidx_map = store.optimize(quantization=quantization)
706
707    writer = OTTableWriter()
708    store.compile(writer, font)
709    size = len(writer.getAllData())
710    print("After:  %7d bytes" % size)
711
712    if outfile is not None:
713        gdef.table.remap_device_varidxes(varidx_map)
714        if "GPOS" in font:
715            font["GPOS"].table.remap_device_varidxes(varidx_map)
716
717        font.save(outfile)
718
719
720if __name__ == "__main__":
721    import sys
722
723    if len(sys.argv) > 1:
724        sys.exit(main())
725    import doctest
726
727    sys.exit(doctest.testmod().failed)
728