1// Copyright 2023 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package trace 6 7import ( 8 "bufio" 9 "bytes" 10 "cmp" 11 "encoding/binary" 12 "fmt" 13 "io" 14 "slices" 15 "strings" 16 17 "internal/trace/event" 18 "internal/trace/event/go122" 19) 20 21// generation contains all the trace data for a single 22// trace generation. It is purely data: it does not 23// track any parse state nor does it contain a cursor 24// into the generation. 25type generation struct { 26 gen uint64 27 batches map[ThreadID][]batch 28 cpuSamples []cpuSample 29 *evTable 30} 31 32// spilledBatch represents a batch that was read out for the next generation, 33// while reading the previous one. It's passed on when parsing the next 34// generation. 35type spilledBatch struct { 36 gen uint64 37 *batch 38} 39 40// readGeneration buffers and decodes the structural elements of a trace generation 41// out of r. spill is the first batch of the new generation (already buffered and 42// parsed from reading the last generation). Returns the generation and the first 43// batch read of the next generation, if any. 44// 45// If gen is non-nil, it is valid and must be processed before handling the returned 46// error. 47func readGeneration(r *bufio.Reader, spill *spilledBatch) (*generation, *spilledBatch, error) { 48 g := &generation{ 49 evTable: &evTable{ 50 pcs: make(map[uint64]frame), 51 }, 52 batches: make(map[ThreadID][]batch), 53 } 54 // Process the spilled batch. 55 if spill != nil { 56 g.gen = spill.gen 57 if err := processBatch(g, *spill.batch); err != nil { 58 return nil, nil, err 59 } 60 spill = nil 61 } 62 // Read batches one at a time until we either hit EOF or 63 // the next generation. 64 var spillErr error 65 for { 66 b, gen, err := readBatch(r) 67 if err == io.EOF { 68 break 69 } 70 if err != nil { 71 if g.gen != 0 { 72 // This is an error reading the first batch of the next generation. 73 // This is fine. Let's forge ahead assuming that what we've got so 74 // far is fine. 75 spillErr = err 76 break 77 } 78 return nil, nil, err 79 } 80 if gen == 0 { 81 // 0 is a sentinel used by the runtime, so we'll never see it. 82 return nil, nil, fmt.Errorf("invalid generation number %d", gen) 83 } 84 if g.gen == 0 { 85 // Initialize gen. 86 g.gen = gen 87 } 88 if gen == g.gen+1 { // TODO: advance this the same way the runtime does. 89 spill = &spilledBatch{gen: gen, batch: &b} 90 break 91 } 92 if gen != g.gen { 93 // N.B. Fail as fast as possible if we see this. At first it 94 // may seem prudent to be fault-tolerant and assume we have a 95 // complete generation, parsing and returning that first. However, 96 // if the batches are mixed across generations then it's likely 97 // we won't be able to parse this generation correctly at all. 98 // Rather than return a cryptic error in that case, indicate the 99 // problem as soon as we see it. 100 return nil, nil, fmt.Errorf("generations out of order") 101 } 102 if err := processBatch(g, b); err != nil { 103 return nil, nil, err 104 } 105 } 106 107 // Check some invariants. 108 if g.freq == 0 { 109 return nil, nil, fmt.Errorf("no frequency event found") 110 } 111 // N.B. Trust that the batch order is correct. We can't validate the batch order 112 // by timestamp because the timestamps could just be plain wrong. The source of 113 // truth is the order things appear in the trace and the partial order sequence 114 // numbers on certain events. If it turns out the batch order is actually incorrect 115 // we'll very likely fail to advance a partial order from the frontier. 116 117 // Compactify stacks and strings for better lookup performance later. 118 g.stacks.compactify() 119 g.strings.compactify() 120 121 // Validate stacks. 122 if err := validateStackStrings(&g.stacks, &g.strings, g.pcs); err != nil { 123 return nil, nil, err 124 } 125 126 // Fix up the CPU sample timestamps, now that we have freq. 127 for i := range g.cpuSamples { 128 s := &g.cpuSamples[i] 129 s.time = g.freq.mul(timestamp(s.time)) 130 } 131 // Sort the CPU samples. 132 slices.SortFunc(g.cpuSamples, func(a, b cpuSample) int { 133 return cmp.Compare(a.time, b.time) 134 }) 135 return g, spill, spillErr 136} 137 138// processBatch adds the batch to the generation. 139func processBatch(g *generation, b batch) error { 140 switch { 141 case b.isStringsBatch(): 142 if err := addStrings(&g.strings, b); err != nil { 143 return err 144 } 145 case b.isStacksBatch(): 146 if err := addStacks(&g.stacks, g.pcs, b); err != nil { 147 return err 148 } 149 case b.isCPUSamplesBatch(): 150 samples, err := addCPUSamples(g.cpuSamples, b) 151 if err != nil { 152 return err 153 } 154 g.cpuSamples = samples 155 case b.isFreqBatch(): 156 freq, err := parseFreq(b) 157 if err != nil { 158 return err 159 } 160 if g.freq != 0 { 161 return fmt.Errorf("found multiple frequency events") 162 } 163 g.freq = freq 164 case b.exp != event.NoExperiment: 165 if g.expData == nil { 166 g.expData = make(map[event.Experiment]*ExperimentalData) 167 } 168 if err := addExperimentalData(g.expData, b); err != nil { 169 return err 170 } 171 default: 172 g.batches[b.m] = append(g.batches[b.m], b) 173 } 174 return nil 175} 176 177// validateStackStrings makes sure all the string references in 178// the stack table are present in the string table. 179func validateStackStrings( 180 stacks *dataTable[stackID, stack], 181 strings *dataTable[stringID, string], 182 frames map[uint64]frame, 183) error { 184 var err error 185 stacks.forEach(func(id stackID, stk stack) bool { 186 for _, pc := range stk.pcs { 187 frame, ok := frames[pc] 188 if !ok { 189 err = fmt.Errorf("found unknown pc %x for stack %d", pc, id) 190 return false 191 } 192 _, ok = strings.get(frame.funcID) 193 if !ok { 194 err = fmt.Errorf("found invalid func string ID %d for stack %d", frame.funcID, id) 195 return false 196 } 197 _, ok = strings.get(frame.fileID) 198 if !ok { 199 err = fmt.Errorf("found invalid file string ID %d for stack %d", frame.fileID, id) 200 return false 201 } 202 } 203 return true 204 }) 205 return err 206} 207 208// addStrings takes a batch whose first byte is an EvStrings event 209// (indicating that the batch contains only strings) and adds each 210// string contained therein to the provided strings map. 211func addStrings(stringTable *dataTable[stringID, string], b batch) error { 212 if !b.isStringsBatch() { 213 return fmt.Errorf("internal error: addStrings called on non-string batch") 214 } 215 r := bytes.NewReader(b.data) 216 hdr, err := r.ReadByte() // Consume the EvStrings byte. 217 if err != nil || event.Type(hdr) != go122.EvStrings { 218 return fmt.Errorf("missing strings batch header") 219 } 220 221 var sb strings.Builder 222 for r.Len() != 0 { 223 // Read the header. 224 ev, err := r.ReadByte() 225 if err != nil { 226 return err 227 } 228 if event.Type(ev) != go122.EvString { 229 return fmt.Errorf("expected string event, got %d", ev) 230 } 231 232 // Read the string's ID. 233 id, err := binary.ReadUvarint(r) 234 if err != nil { 235 return err 236 } 237 238 // Read the string's length. 239 len, err := binary.ReadUvarint(r) 240 if err != nil { 241 return err 242 } 243 if len > go122.MaxStringSize { 244 return fmt.Errorf("invalid string size %d, maximum is %d", len, go122.MaxStringSize) 245 } 246 247 // Copy out the string. 248 n, err := io.CopyN(&sb, r, int64(len)) 249 if n != int64(len) { 250 return fmt.Errorf("failed to read full string: read %d but wanted %d", n, len) 251 } 252 if err != nil { 253 return fmt.Errorf("copying string data: %w", err) 254 } 255 256 // Add the string to the map. 257 s := sb.String() 258 sb.Reset() 259 if err := stringTable.insert(stringID(id), s); err != nil { 260 return err 261 } 262 } 263 return nil 264} 265 266// addStacks takes a batch whose first byte is an EvStacks event 267// (indicating that the batch contains only stacks) and adds each 268// string contained therein to the provided stacks map. 269func addStacks(stackTable *dataTable[stackID, stack], pcs map[uint64]frame, b batch) error { 270 if !b.isStacksBatch() { 271 return fmt.Errorf("internal error: addStacks called on non-stacks batch") 272 } 273 r := bytes.NewReader(b.data) 274 hdr, err := r.ReadByte() // Consume the EvStacks byte. 275 if err != nil || event.Type(hdr) != go122.EvStacks { 276 return fmt.Errorf("missing stacks batch header") 277 } 278 279 for r.Len() != 0 { 280 // Read the header. 281 ev, err := r.ReadByte() 282 if err != nil { 283 return err 284 } 285 if event.Type(ev) != go122.EvStack { 286 return fmt.Errorf("expected stack event, got %d", ev) 287 } 288 289 // Read the stack's ID. 290 id, err := binary.ReadUvarint(r) 291 if err != nil { 292 return err 293 } 294 295 // Read how many frames are in each stack. 296 nFrames, err := binary.ReadUvarint(r) 297 if err != nil { 298 return err 299 } 300 if nFrames > go122.MaxFramesPerStack { 301 return fmt.Errorf("invalid stack size %d, maximum is %d", nFrames, go122.MaxFramesPerStack) 302 } 303 304 // Each frame consists of 4 fields: pc, funcID (string), fileID (string), line. 305 frames := make([]uint64, 0, nFrames) 306 for i := uint64(0); i < nFrames; i++ { 307 // Read the frame data. 308 pc, err := binary.ReadUvarint(r) 309 if err != nil { 310 return fmt.Errorf("reading frame %d's PC for stack %d: %w", i+1, id, err) 311 } 312 funcID, err := binary.ReadUvarint(r) 313 if err != nil { 314 return fmt.Errorf("reading frame %d's funcID for stack %d: %w", i+1, id, err) 315 } 316 fileID, err := binary.ReadUvarint(r) 317 if err != nil { 318 return fmt.Errorf("reading frame %d's fileID for stack %d: %w", i+1, id, err) 319 } 320 line, err := binary.ReadUvarint(r) 321 if err != nil { 322 return fmt.Errorf("reading frame %d's line for stack %d: %w", i+1, id, err) 323 } 324 frames = append(frames, pc) 325 326 if _, ok := pcs[pc]; !ok { 327 pcs[pc] = frame{ 328 pc: pc, 329 funcID: stringID(funcID), 330 fileID: stringID(fileID), 331 line: line, 332 } 333 } 334 } 335 336 // Add the stack to the map. 337 if err := stackTable.insert(stackID(id), stack{pcs: frames}); err != nil { 338 return err 339 } 340 } 341 return nil 342} 343 344// addCPUSamples takes a batch whose first byte is an EvCPUSamples event 345// (indicating that the batch contains only CPU samples) and adds each 346// sample contained therein to the provided samples list. 347func addCPUSamples(samples []cpuSample, b batch) ([]cpuSample, error) { 348 if !b.isCPUSamplesBatch() { 349 return nil, fmt.Errorf("internal error: addCPUSamples called on non-CPU-sample batch") 350 } 351 r := bytes.NewReader(b.data) 352 hdr, err := r.ReadByte() // Consume the EvCPUSamples byte. 353 if err != nil || event.Type(hdr) != go122.EvCPUSamples { 354 return nil, fmt.Errorf("missing CPU samples batch header") 355 } 356 357 for r.Len() != 0 { 358 // Read the header. 359 ev, err := r.ReadByte() 360 if err != nil { 361 return nil, err 362 } 363 if event.Type(ev) != go122.EvCPUSample { 364 return nil, fmt.Errorf("expected CPU sample event, got %d", ev) 365 } 366 367 // Read the sample's timestamp. 368 ts, err := binary.ReadUvarint(r) 369 if err != nil { 370 return nil, err 371 } 372 373 // Read the sample's M. 374 m, err := binary.ReadUvarint(r) 375 if err != nil { 376 return nil, err 377 } 378 mid := ThreadID(m) 379 380 // Read the sample's P. 381 p, err := binary.ReadUvarint(r) 382 if err != nil { 383 return nil, err 384 } 385 pid := ProcID(p) 386 387 // Read the sample's G. 388 g, err := binary.ReadUvarint(r) 389 if err != nil { 390 return nil, err 391 } 392 goid := GoID(g) 393 if g == 0 { 394 goid = NoGoroutine 395 } 396 397 // Read the sample's stack. 398 s, err := binary.ReadUvarint(r) 399 if err != nil { 400 return nil, err 401 } 402 403 // Add the sample to the slice. 404 samples = append(samples, cpuSample{ 405 schedCtx: schedCtx{ 406 M: mid, 407 P: pid, 408 G: goid, 409 }, 410 time: Time(ts), // N.B. this is really a "timestamp," not a Time. 411 stack: stackID(s), 412 }) 413 } 414 return samples, nil 415} 416 417// parseFreq parses out a lone EvFrequency from a batch. 418func parseFreq(b batch) (frequency, error) { 419 if !b.isFreqBatch() { 420 return 0, fmt.Errorf("internal error: parseFreq called on non-frequency batch") 421 } 422 r := bytes.NewReader(b.data) 423 r.ReadByte() // Consume the EvFrequency byte. 424 425 // Read the frequency. It'll come out as timestamp units per second. 426 f, err := binary.ReadUvarint(r) 427 if err != nil { 428 return 0, err 429 } 430 // Convert to nanoseconds per timestamp unit. 431 return frequency(1.0 / (float64(f) / 1e9)), nil 432} 433 434// addExperimentalData takes an experimental batch and adds it to the ExperimentalData 435// for the experiment its a part of. 436func addExperimentalData(expData map[event.Experiment]*ExperimentalData, b batch) error { 437 if b.exp == event.NoExperiment { 438 return fmt.Errorf("internal error: addExperimentalData called on non-experimental batch") 439 } 440 ed, ok := expData[b.exp] 441 if !ok { 442 ed = new(ExperimentalData) 443 expData[b.exp] = ed 444 } 445 ed.Batches = append(ed.Batches, ExperimentalBatch{ 446 Thread: b.m, 447 Data: b.data, 448 }) 449 return nil 450} 451