1// Copyright 2023 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package trace 6 7import ( 8 "cmp" 9 "encoding/binary" 10 "fmt" 11 12 "internal/trace/event" 13 "internal/trace/event/go122" 14) 15 16type batchCursor struct { 17 m ThreadID 18 lastTs Time 19 idx int // next index into []batch 20 dataOff int // next index into batch.data 21 ev baseEvent // last read event 22} 23 24func (b *batchCursor) nextEvent(batches []batch, freq frequency) (ok bool, err error) { 25 // Batches should generally always have at least one event, 26 // but let's be defensive about that and accept empty batches. 27 for b.idx < len(batches) && len(batches[b.idx].data) == b.dataOff { 28 b.idx++ 29 b.dataOff = 0 30 b.lastTs = 0 31 } 32 // Have we reached the end of the batches? 33 if b.idx == len(batches) { 34 return false, nil 35 } 36 // Initialize lastTs if it hasn't been yet. 37 if b.lastTs == 0 { 38 b.lastTs = freq.mul(batches[b.idx].time) 39 } 40 // Read an event out. 41 n, tsdiff, err := readTimedBaseEvent(batches[b.idx].data[b.dataOff:], &b.ev) 42 if err != nil { 43 return false, err 44 } 45 // Complete the timestamp from the cursor's last timestamp. 46 b.ev.time = freq.mul(tsdiff) + b.lastTs 47 48 // Move the cursor's timestamp forward. 49 b.lastTs = b.ev.time 50 51 // Move the cursor forward. 52 b.dataOff += n 53 return true, nil 54} 55 56func (b *batchCursor) compare(a *batchCursor) int { 57 return cmp.Compare(b.ev.time, a.ev.time) 58} 59 60// readTimedBaseEvent reads out the raw event data from b 61// into e. It does not try to interpret the arguments 62// but it does validate that the event is a regular 63// event with a timestamp (vs. a structural event). 64// 65// It requires that the event its reading be timed, which must 66// be the case for every event in a plain EventBatch. 67func readTimedBaseEvent(b []byte, e *baseEvent) (int, timestamp, error) { 68 // Get the event type. 69 typ := event.Type(b[0]) 70 specs := go122.Specs() 71 if int(typ) >= len(specs) { 72 return 0, 0, fmt.Errorf("found invalid event type: %v", typ) 73 } 74 e.typ = typ 75 76 // Get spec. 77 spec := &specs[typ] 78 if len(spec.Args) == 0 || !spec.IsTimedEvent { 79 return 0, 0, fmt.Errorf("found event without a timestamp: type=%v", typ) 80 } 81 n := 1 82 83 // Read timestamp diff. 84 ts, nb := binary.Uvarint(b[n:]) 85 if nb <= 0 { 86 return 0, 0, fmt.Errorf("found invalid uvarint for timestamp") 87 } 88 n += nb 89 90 // Read the rest of the arguments. 91 for i := 0; i < len(spec.Args)-1; i++ { 92 arg, nb := binary.Uvarint(b[n:]) 93 if nb <= 0 { 94 return 0, 0, fmt.Errorf("found invalid uvarint") 95 } 96 e.args[i] = arg 97 n += nb 98 } 99 return n, timestamp(ts), nil 100} 101 102func heapInsert(heap []*batchCursor, bc *batchCursor) []*batchCursor { 103 // Add the cursor to the end of the heap. 104 heap = append(heap, bc) 105 106 // Sift the new entry up to the right place. 107 heapSiftUp(heap, len(heap)-1) 108 return heap 109} 110 111func heapUpdate(heap []*batchCursor, i int) { 112 // Try to sift up. 113 if heapSiftUp(heap, i) != i { 114 return 115 } 116 // Try to sift down, if sifting up failed. 117 heapSiftDown(heap, i) 118} 119 120func heapRemove(heap []*batchCursor, i int) []*batchCursor { 121 // Sift index i up to the root, ignoring actual values. 122 for i > 0 { 123 heap[(i-1)/2], heap[i] = heap[i], heap[(i-1)/2] 124 i = (i - 1) / 2 125 } 126 // Swap the root with the last element, then remove it. 127 heap[0], heap[len(heap)-1] = heap[len(heap)-1], heap[0] 128 heap = heap[:len(heap)-1] 129 // Sift the root down. 130 heapSiftDown(heap, 0) 131 return heap 132} 133 134func heapSiftUp(heap []*batchCursor, i int) int { 135 for i > 0 && heap[(i-1)/2].ev.time > heap[i].ev.time { 136 heap[(i-1)/2], heap[i] = heap[i], heap[(i-1)/2] 137 i = (i - 1) / 2 138 } 139 return i 140} 141 142func heapSiftDown(heap []*batchCursor, i int) int { 143 for { 144 m := min3(heap, i, 2*i+1, 2*i+2) 145 if m == i { 146 // Heap invariant already applies. 147 break 148 } 149 heap[i], heap[m] = heap[m], heap[i] 150 i = m 151 } 152 return i 153} 154 155func min3(b []*batchCursor, i0, i1, i2 int) int { 156 minIdx := i0 157 minT := maxTime 158 if i0 < len(b) { 159 minT = b[i0].ev.time 160 } 161 if i1 < len(b) { 162 if t := b[i1].ev.time; t < minT { 163 minT = t 164 minIdx = i1 165 } 166 } 167 if i2 < len(b) { 168 if t := b[i2].ev.time; t < minT { 169 minT = t 170 minIdx = i2 171 } 172 } 173 return minIdx 174} 175