1// Copyright 2023 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package trace
6
7import (
8	"bufio"
9	"bytes"
10	"cmp"
11	"encoding/binary"
12	"fmt"
13	"io"
14	"slices"
15	"strings"
16
17	"internal/trace/event"
18	"internal/trace/event/go122"
19)
20
21// generation contains all the trace data for a single
22// trace generation. It is purely data: it does not
23// track any parse state nor does it contain a cursor
24// into the generation.
25type generation struct {
26	gen        uint64
27	batches    map[ThreadID][]batch
28	cpuSamples []cpuSample
29	*evTable
30}
31
32// spilledBatch represents a batch that was read out for the next generation,
33// while reading the previous one. It's passed on when parsing the next
34// generation.
35type spilledBatch struct {
36	gen uint64
37	*batch
38}
39
40// readGeneration buffers and decodes the structural elements of a trace generation
41// out of r. spill is the first batch of the new generation (already buffered and
42// parsed from reading the last generation). Returns the generation and the first
43// batch read of the next generation, if any.
44//
45// If gen is non-nil, it is valid and must be processed before handling the returned
46// error.
47func readGeneration(r *bufio.Reader, spill *spilledBatch) (*generation, *spilledBatch, error) {
48	g := &generation{
49		evTable: &evTable{
50			pcs: make(map[uint64]frame),
51		},
52		batches: make(map[ThreadID][]batch),
53	}
54	// Process the spilled batch.
55	if spill != nil {
56		g.gen = spill.gen
57		if err := processBatch(g, *spill.batch); err != nil {
58			return nil, nil, err
59		}
60		spill = nil
61	}
62	// Read batches one at a time until we either hit EOF or
63	// the next generation.
64	var spillErr error
65	for {
66		b, gen, err := readBatch(r)
67		if err == io.EOF {
68			break
69		}
70		if err != nil {
71			if g.gen != 0 {
72				// This is an error reading the first batch of the next generation.
73				// This is fine. Let's forge ahead assuming that what we've got so
74				// far is fine.
75				spillErr = err
76				break
77			}
78			return nil, nil, err
79		}
80		if gen == 0 {
81			// 0 is a sentinel used by the runtime, so we'll never see it.
82			return nil, nil, fmt.Errorf("invalid generation number %d", gen)
83		}
84		if g.gen == 0 {
85			// Initialize gen.
86			g.gen = gen
87		}
88		if gen == g.gen+1 { // TODO: advance this the same way the runtime does.
89			spill = &spilledBatch{gen: gen, batch: &b}
90			break
91		}
92		if gen != g.gen {
93			// N.B. Fail as fast as possible if we see this. At first it
94			// may seem prudent to be fault-tolerant and assume we have a
95			// complete generation, parsing and returning that first. However,
96			// if the batches are mixed across generations then it's likely
97			// we won't be able to parse this generation correctly at all.
98			// Rather than return a cryptic error in that case, indicate the
99			// problem as soon as we see it.
100			return nil, nil, fmt.Errorf("generations out of order")
101		}
102		if err := processBatch(g, b); err != nil {
103			return nil, nil, err
104		}
105	}
106
107	// Check some invariants.
108	if g.freq == 0 {
109		return nil, nil, fmt.Errorf("no frequency event found")
110	}
111	// N.B. Trust that the batch order is correct. We can't validate the batch order
112	// by timestamp because the timestamps could just be plain wrong. The source of
113	// truth is the order things appear in the trace and the partial order sequence
114	// numbers on certain events. If it turns out the batch order is actually incorrect
115	// we'll very likely fail to advance a partial order from the frontier.
116
117	// Compactify stacks and strings for better lookup performance later.
118	g.stacks.compactify()
119	g.strings.compactify()
120
121	// Validate stacks.
122	if err := validateStackStrings(&g.stacks, &g.strings, g.pcs); err != nil {
123		return nil, nil, err
124	}
125
126	// Fix up the CPU sample timestamps, now that we have freq.
127	for i := range g.cpuSamples {
128		s := &g.cpuSamples[i]
129		s.time = g.freq.mul(timestamp(s.time))
130	}
131	// Sort the CPU samples.
132	slices.SortFunc(g.cpuSamples, func(a, b cpuSample) int {
133		return cmp.Compare(a.time, b.time)
134	})
135	return g, spill, spillErr
136}
137
138// processBatch adds the batch to the generation.
139func processBatch(g *generation, b batch) error {
140	switch {
141	case b.isStringsBatch():
142		if err := addStrings(&g.strings, b); err != nil {
143			return err
144		}
145	case b.isStacksBatch():
146		if err := addStacks(&g.stacks, g.pcs, b); err != nil {
147			return err
148		}
149	case b.isCPUSamplesBatch():
150		samples, err := addCPUSamples(g.cpuSamples, b)
151		if err != nil {
152			return err
153		}
154		g.cpuSamples = samples
155	case b.isFreqBatch():
156		freq, err := parseFreq(b)
157		if err != nil {
158			return err
159		}
160		if g.freq != 0 {
161			return fmt.Errorf("found multiple frequency events")
162		}
163		g.freq = freq
164	case b.exp != event.NoExperiment:
165		if g.expData == nil {
166			g.expData = make(map[event.Experiment]*ExperimentalData)
167		}
168		if err := addExperimentalData(g.expData, b); err != nil {
169			return err
170		}
171	default:
172		g.batches[b.m] = append(g.batches[b.m], b)
173	}
174	return nil
175}
176
177// validateStackStrings makes sure all the string references in
178// the stack table are present in the string table.
179func validateStackStrings(
180	stacks *dataTable[stackID, stack],
181	strings *dataTable[stringID, string],
182	frames map[uint64]frame,
183) error {
184	var err error
185	stacks.forEach(func(id stackID, stk stack) bool {
186		for _, pc := range stk.pcs {
187			frame, ok := frames[pc]
188			if !ok {
189				err = fmt.Errorf("found unknown pc %x for stack %d", pc, id)
190				return false
191			}
192			_, ok = strings.get(frame.funcID)
193			if !ok {
194				err = fmt.Errorf("found invalid func string ID %d for stack %d", frame.funcID, id)
195				return false
196			}
197			_, ok = strings.get(frame.fileID)
198			if !ok {
199				err = fmt.Errorf("found invalid file string ID %d for stack %d", frame.fileID, id)
200				return false
201			}
202		}
203		return true
204	})
205	return err
206}
207
208// addStrings takes a batch whose first byte is an EvStrings event
209// (indicating that the batch contains only strings) and adds each
210// string contained therein to the provided strings map.
211func addStrings(stringTable *dataTable[stringID, string], b batch) error {
212	if !b.isStringsBatch() {
213		return fmt.Errorf("internal error: addStrings called on non-string batch")
214	}
215	r := bytes.NewReader(b.data)
216	hdr, err := r.ReadByte() // Consume the EvStrings byte.
217	if err != nil || event.Type(hdr) != go122.EvStrings {
218		return fmt.Errorf("missing strings batch header")
219	}
220
221	var sb strings.Builder
222	for r.Len() != 0 {
223		// Read the header.
224		ev, err := r.ReadByte()
225		if err != nil {
226			return err
227		}
228		if event.Type(ev) != go122.EvString {
229			return fmt.Errorf("expected string event, got %d", ev)
230		}
231
232		// Read the string's ID.
233		id, err := binary.ReadUvarint(r)
234		if err != nil {
235			return err
236		}
237
238		// Read the string's length.
239		len, err := binary.ReadUvarint(r)
240		if err != nil {
241			return err
242		}
243		if len > go122.MaxStringSize {
244			return fmt.Errorf("invalid string size %d, maximum is %d", len, go122.MaxStringSize)
245		}
246
247		// Copy out the string.
248		n, err := io.CopyN(&sb, r, int64(len))
249		if n != int64(len) {
250			return fmt.Errorf("failed to read full string: read %d but wanted %d", n, len)
251		}
252		if err != nil {
253			return fmt.Errorf("copying string data: %w", err)
254		}
255
256		// Add the string to the map.
257		s := sb.String()
258		sb.Reset()
259		if err := stringTable.insert(stringID(id), s); err != nil {
260			return err
261		}
262	}
263	return nil
264}
265
266// addStacks takes a batch whose first byte is an EvStacks event
267// (indicating that the batch contains only stacks) and adds each
268// string contained therein to the provided stacks map.
269func addStacks(stackTable *dataTable[stackID, stack], pcs map[uint64]frame, b batch) error {
270	if !b.isStacksBatch() {
271		return fmt.Errorf("internal error: addStacks called on non-stacks batch")
272	}
273	r := bytes.NewReader(b.data)
274	hdr, err := r.ReadByte() // Consume the EvStacks byte.
275	if err != nil || event.Type(hdr) != go122.EvStacks {
276		return fmt.Errorf("missing stacks batch header")
277	}
278
279	for r.Len() != 0 {
280		// Read the header.
281		ev, err := r.ReadByte()
282		if err != nil {
283			return err
284		}
285		if event.Type(ev) != go122.EvStack {
286			return fmt.Errorf("expected stack event, got %d", ev)
287		}
288
289		// Read the stack's ID.
290		id, err := binary.ReadUvarint(r)
291		if err != nil {
292			return err
293		}
294
295		// Read how many frames are in each stack.
296		nFrames, err := binary.ReadUvarint(r)
297		if err != nil {
298			return err
299		}
300		if nFrames > go122.MaxFramesPerStack {
301			return fmt.Errorf("invalid stack size %d, maximum is %d", nFrames, go122.MaxFramesPerStack)
302		}
303
304		// Each frame consists of 4 fields: pc, funcID (string), fileID (string), line.
305		frames := make([]uint64, 0, nFrames)
306		for i := uint64(0); i < nFrames; i++ {
307			// Read the frame data.
308			pc, err := binary.ReadUvarint(r)
309			if err != nil {
310				return fmt.Errorf("reading frame %d's PC for stack %d: %w", i+1, id, err)
311			}
312			funcID, err := binary.ReadUvarint(r)
313			if err != nil {
314				return fmt.Errorf("reading frame %d's funcID for stack %d: %w", i+1, id, err)
315			}
316			fileID, err := binary.ReadUvarint(r)
317			if err != nil {
318				return fmt.Errorf("reading frame %d's fileID for stack %d: %w", i+1, id, err)
319			}
320			line, err := binary.ReadUvarint(r)
321			if err != nil {
322				return fmt.Errorf("reading frame %d's line for stack %d: %w", i+1, id, err)
323			}
324			frames = append(frames, pc)
325
326			if _, ok := pcs[pc]; !ok {
327				pcs[pc] = frame{
328					pc:     pc,
329					funcID: stringID(funcID),
330					fileID: stringID(fileID),
331					line:   line,
332				}
333			}
334		}
335
336		// Add the stack to the map.
337		if err := stackTable.insert(stackID(id), stack{pcs: frames}); err != nil {
338			return err
339		}
340	}
341	return nil
342}
343
344// addCPUSamples takes a batch whose first byte is an EvCPUSamples event
345// (indicating that the batch contains only CPU samples) and adds each
346// sample contained therein to the provided samples list.
347func addCPUSamples(samples []cpuSample, b batch) ([]cpuSample, error) {
348	if !b.isCPUSamplesBatch() {
349		return nil, fmt.Errorf("internal error: addCPUSamples called on non-CPU-sample batch")
350	}
351	r := bytes.NewReader(b.data)
352	hdr, err := r.ReadByte() // Consume the EvCPUSamples byte.
353	if err != nil || event.Type(hdr) != go122.EvCPUSamples {
354		return nil, fmt.Errorf("missing CPU samples batch header")
355	}
356
357	for r.Len() != 0 {
358		// Read the header.
359		ev, err := r.ReadByte()
360		if err != nil {
361			return nil, err
362		}
363		if event.Type(ev) != go122.EvCPUSample {
364			return nil, fmt.Errorf("expected CPU sample event, got %d", ev)
365		}
366
367		// Read the sample's timestamp.
368		ts, err := binary.ReadUvarint(r)
369		if err != nil {
370			return nil, err
371		}
372
373		// Read the sample's M.
374		m, err := binary.ReadUvarint(r)
375		if err != nil {
376			return nil, err
377		}
378		mid := ThreadID(m)
379
380		// Read the sample's P.
381		p, err := binary.ReadUvarint(r)
382		if err != nil {
383			return nil, err
384		}
385		pid := ProcID(p)
386
387		// Read the sample's G.
388		g, err := binary.ReadUvarint(r)
389		if err != nil {
390			return nil, err
391		}
392		goid := GoID(g)
393		if g == 0 {
394			goid = NoGoroutine
395		}
396
397		// Read the sample's stack.
398		s, err := binary.ReadUvarint(r)
399		if err != nil {
400			return nil, err
401		}
402
403		// Add the sample to the slice.
404		samples = append(samples, cpuSample{
405			schedCtx: schedCtx{
406				M: mid,
407				P: pid,
408				G: goid,
409			},
410			time:  Time(ts), // N.B. this is really a "timestamp," not a Time.
411			stack: stackID(s),
412		})
413	}
414	return samples, nil
415}
416
417// parseFreq parses out a lone EvFrequency from a batch.
418func parseFreq(b batch) (frequency, error) {
419	if !b.isFreqBatch() {
420		return 0, fmt.Errorf("internal error: parseFreq called on non-frequency batch")
421	}
422	r := bytes.NewReader(b.data)
423	r.ReadByte() // Consume the EvFrequency byte.
424
425	// Read the frequency. It'll come out as timestamp units per second.
426	f, err := binary.ReadUvarint(r)
427	if err != nil {
428		return 0, err
429	}
430	// Convert to nanoseconds per timestamp unit.
431	return frequency(1.0 / (float64(f) / 1e9)), nil
432}
433
434// addExperimentalData takes an experimental batch and adds it to the ExperimentalData
435// for the experiment its a part of.
436func addExperimentalData(expData map[event.Experiment]*ExperimentalData, b batch) error {
437	if b.exp == event.NoExperiment {
438		return fmt.Errorf("internal error: addExperimentalData called on non-experimental batch")
439	}
440	ed, ok := expData[b.exp]
441	if !ok {
442		ed = new(ExperimentalData)
443		expData[b.exp] = ed
444	}
445	ed.Batches = append(ed.Batches, ExperimentalBatch{
446		Thread: b.m,
447		Data:   b.data,
448	})
449	return nil
450}
451