1// Copyright 2023 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package trace
6
7import (
8	"cmp"
9	"encoding/binary"
10	"fmt"
11
12	"internal/trace/event"
13	"internal/trace/event/go122"
14)
15
16type batchCursor struct {
17	m       ThreadID
18	lastTs  Time
19	idx     int       // next index into []batch
20	dataOff int       // next index into batch.data
21	ev      baseEvent // last read event
22}
23
24func (b *batchCursor) nextEvent(batches []batch, freq frequency) (ok bool, err error) {
25	// Batches should generally always have at least one event,
26	// but let's be defensive about that and accept empty batches.
27	for b.idx < len(batches) && len(batches[b.idx].data) == b.dataOff {
28		b.idx++
29		b.dataOff = 0
30		b.lastTs = 0
31	}
32	// Have we reached the end of the batches?
33	if b.idx == len(batches) {
34		return false, nil
35	}
36	// Initialize lastTs if it hasn't been yet.
37	if b.lastTs == 0 {
38		b.lastTs = freq.mul(batches[b.idx].time)
39	}
40	// Read an event out.
41	n, tsdiff, err := readTimedBaseEvent(batches[b.idx].data[b.dataOff:], &b.ev)
42	if err != nil {
43		return false, err
44	}
45	// Complete the timestamp from the cursor's last timestamp.
46	b.ev.time = freq.mul(tsdiff) + b.lastTs
47
48	// Move the cursor's timestamp forward.
49	b.lastTs = b.ev.time
50
51	// Move the cursor forward.
52	b.dataOff += n
53	return true, nil
54}
55
56func (b *batchCursor) compare(a *batchCursor) int {
57	return cmp.Compare(b.ev.time, a.ev.time)
58}
59
60// readTimedBaseEvent reads out the raw event data from b
61// into e. It does not try to interpret the arguments
62// but it does validate that the event is a regular
63// event with a timestamp (vs. a structural event).
64//
65// It requires that the event its reading be timed, which must
66// be the case for every event in a plain EventBatch.
67func readTimedBaseEvent(b []byte, e *baseEvent) (int, timestamp, error) {
68	// Get the event type.
69	typ := event.Type(b[0])
70	specs := go122.Specs()
71	if int(typ) >= len(specs) {
72		return 0, 0, fmt.Errorf("found invalid event type: %v", typ)
73	}
74	e.typ = typ
75
76	// Get spec.
77	spec := &specs[typ]
78	if len(spec.Args) == 0 || !spec.IsTimedEvent {
79		return 0, 0, fmt.Errorf("found event without a timestamp: type=%v", typ)
80	}
81	n := 1
82
83	// Read timestamp diff.
84	ts, nb := binary.Uvarint(b[n:])
85	if nb <= 0 {
86		return 0, 0, fmt.Errorf("found invalid uvarint for timestamp")
87	}
88	n += nb
89
90	// Read the rest of the arguments.
91	for i := 0; i < len(spec.Args)-1; i++ {
92		arg, nb := binary.Uvarint(b[n:])
93		if nb <= 0 {
94			return 0, 0, fmt.Errorf("found invalid uvarint")
95		}
96		e.args[i] = arg
97		n += nb
98	}
99	return n, timestamp(ts), nil
100}
101
102func heapInsert(heap []*batchCursor, bc *batchCursor) []*batchCursor {
103	// Add the cursor to the end of the heap.
104	heap = append(heap, bc)
105
106	// Sift the new entry up to the right place.
107	heapSiftUp(heap, len(heap)-1)
108	return heap
109}
110
111func heapUpdate(heap []*batchCursor, i int) {
112	// Try to sift up.
113	if heapSiftUp(heap, i) != i {
114		return
115	}
116	// Try to sift down, if sifting up failed.
117	heapSiftDown(heap, i)
118}
119
120func heapRemove(heap []*batchCursor, i int) []*batchCursor {
121	// Sift index i up to the root, ignoring actual values.
122	for i > 0 {
123		heap[(i-1)/2], heap[i] = heap[i], heap[(i-1)/2]
124		i = (i - 1) / 2
125	}
126	// Swap the root with the last element, then remove it.
127	heap[0], heap[len(heap)-1] = heap[len(heap)-1], heap[0]
128	heap = heap[:len(heap)-1]
129	// Sift the root down.
130	heapSiftDown(heap, 0)
131	return heap
132}
133
134func heapSiftUp(heap []*batchCursor, i int) int {
135	for i > 0 && heap[(i-1)/2].ev.time > heap[i].ev.time {
136		heap[(i-1)/2], heap[i] = heap[i], heap[(i-1)/2]
137		i = (i - 1) / 2
138	}
139	return i
140}
141
142func heapSiftDown(heap []*batchCursor, i int) int {
143	for {
144		m := min3(heap, i, 2*i+1, 2*i+2)
145		if m == i {
146			// Heap invariant already applies.
147			break
148		}
149		heap[i], heap[m] = heap[m], heap[i]
150		i = m
151	}
152	return i
153}
154
155func min3(b []*batchCursor, i0, i1, i2 int) int {
156	minIdx := i0
157	minT := maxTime
158	if i0 < len(b) {
159		minT = b[i0].ev.time
160	}
161	if i1 < len(b) {
162		if t := b[i1].ev.time; t < minT {
163			minT = t
164			minIdx = i1
165		}
166	}
167	if i2 < len(b) {
168		if t := b[i2].ev.time; t < minT {
169			minT = t
170			minIdx = i2
171		}
172	}
173	return minIdx
174}
175