1// Copyright 2011 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5// Package sql provides a generic interface around SQL (or SQL-like) 6// databases. 7// 8// The sql package must be used in conjunction with a database driver. 9// See https://golang.org/s/sqldrivers for a list of drivers. 10// 11// Drivers that do not support context cancellation will not return until 12// after the query is completed. 13// 14// For usage examples, see the wiki page at 15// https://golang.org/s/sqlwiki. 16package sql 17 18import ( 19 "context" 20 "database/sql/driver" 21 "errors" 22 "fmt" 23 "io" 24 "math/rand/v2" 25 "reflect" 26 "runtime" 27 "slices" 28 "strconv" 29 "sync" 30 "sync/atomic" 31 "time" 32 _ "unsafe" 33) 34 35var driversMu sync.RWMutex 36 37// drivers should be an internal detail, 38// but widely used packages access it using linkname. 39// (It is extra wrong that they linkname drivers but not driversMu.) 40// Notable members of the hall of shame include: 41// - github.com/instana/go-sensor 42// 43// Do not remove or change the type signature. 44// See go.dev/issue/67401. 45// 46//go:linkname drivers 47var drivers = make(map[string]driver.Driver) 48 49// nowFunc returns the current time; it's overridden in tests. 50var nowFunc = time.Now 51 52// Register makes a database driver available by the provided name. 53// If Register is called twice with the same name or if driver is nil, 54// it panics. 55func Register(name string, driver driver.Driver) { 56 driversMu.Lock() 57 defer driversMu.Unlock() 58 if driver == nil { 59 panic("sql: Register driver is nil") 60 } 61 if _, dup := drivers[name]; dup { 62 panic("sql: Register called twice for driver " + name) 63 } 64 drivers[name] = driver 65} 66 67func unregisterAllDrivers() { 68 driversMu.Lock() 69 defer driversMu.Unlock() 70 // For tests. 71 drivers = make(map[string]driver.Driver) 72} 73 74// Drivers returns a sorted list of the names of the registered drivers. 75func Drivers() []string { 76 driversMu.RLock() 77 defer driversMu.RUnlock() 78 list := make([]string, 0, len(drivers)) 79 for name := range drivers { 80 list = append(list, name) 81 } 82 slices.Sort(list) 83 return list 84} 85 86// A NamedArg is a named argument. NamedArg values may be used as 87// arguments to [DB.Query] or [DB.Exec] and bind to the corresponding named 88// parameter in the SQL statement. 89// 90// For a more concise way to create NamedArg values, see 91// the [Named] function. 92type NamedArg struct { 93 _NamedFieldsRequired struct{} 94 95 // Name is the name of the parameter placeholder. 96 // 97 // If empty, the ordinal position in the argument list will be 98 // used. 99 // 100 // Name must omit any symbol prefix. 101 Name string 102 103 // Value is the value of the parameter. 104 // It may be assigned the same value types as the query 105 // arguments. 106 Value any 107} 108 109// Named provides a more concise way to create [NamedArg] values. 110// 111// Example usage: 112// 113// db.ExecContext(ctx, ` 114// delete from Invoice 115// where 116// TimeCreated < @end 117// and TimeCreated >= @start;`, 118// sql.Named("start", startTime), 119// sql.Named("end", endTime), 120// ) 121func Named(name string, value any) NamedArg { 122 // This method exists because the go1compat promise 123 // doesn't guarantee that structs don't grow more fields, 124 // so unkeyed struct literals are a vet error. Thus, we don't 125 // want to allow sql.NamedArg{name, value}. 126 return NamedArg{Name: name, Value: value} 127} 128 129// IsolationLevel is the transaction isolation level used in [TxOptions]. 130type IsolationLevel int 131 132// Various isolation levels that drivers may support in [DB.BeginTx]. 133// If a driver does not support a given isolation level an error may be returned. 134// 135// See https://en.wikipedia.org/wiki/Isolation_(database_systems)#Isolation_levels. 136const ( 137 LevelDefault IsolationLevel = iota 138 LevelReadUncommitted 139 LevelReadCommitted 140 LevelWriteCommitted 141 LevelRepeatableRead 142 LevelSnapshot 143 LevelSerializable 144 LevelLinearizable 145) 146 147// String returns the name of the transaction isolation level. 148func (i IsolationLevel) String() string { 149 switch i { 150 case LevelDefault: 151 return "Default" 152 case LevelReadUncommitted: 153 return "Read Uncommitted" 154 case LevelReadCommitted: 155 return "Read Committed" 156 case LevelWriteCommitted: 157 return "Write Committed" 158 case LevelRepeatableRead: 159 return "Repeatable Read" 160 case LevelSnapshot: 161 return "Snapshot" 162 case LevelSerializable: 163 return "Serializable" 164 case LevelLinearizable: 165 return "Linearizable" 166 default: 167 return "IsolationLevel(" + strconv.Itoa(int(i)) + ")" 168 } 169} 170 171var _ fmt.Stringer = LevelDefault 172 173// TxOptions holds the transaction options to be used in [DB.BeginTx]. 174type TxOptions struct { 175 // Isolation is the transaction isolation level. 176 // If zero, the driver or database's default level is used. 177 Isolation IsolationLevel 178 ReadOnly bool 179} 180 181// RawBytes is a byte slice that holds a reference to memory owned by 182// the database itself. After a [Rows.Scan] into a RawBytes, the slice is only 183// valid until the next call to [Rows.Next], [Rows.Scan], or [Rows.Close]. 184type RawBytes []byte 185 186// NullString represents a string that may be null. 187// NullString implements the [Scanner] interface so 188// it can be used as a scan destination: 189// 190// var s NullString 191// err := db.QueryRow("SELECT name FROM foo WHERE id=?", id).Scan(&s) 192// ... 193// if s.Valid { 194// // use s.String 195// } else { 196// // NULL value 197// } 198type NullString struct { 199 String string 200 Valid bool // Valid is true if String is not NULL 201} 202 203// Scan implements the [Scanner] interface. 204func (ns *NullString) Scan(value any) error { 205 if value == nil { 206 ns.String, ns.Valid = "", false 207 return nil 208 } 209 ns.Valid = true 210 return convertAssign(&ns.String, value) 211} 212 213// Value implements the [driver.Valuer] interface. 214func (ns NullString) Value() (driver.Value, error) { 215 if !ns.Valid { 216 return nil, nil 217 } 218 return ns.String, nil 219} 220 221// NullInt64 represents an int64 that may be null. 222// NullInt64 implements the [Scanner] interface so 223// it can be used as a scan destination, similar to [NullString]. 224type NullInt64 struct { 225 Int64 int64 226 Valid bool // Valid is true if Int64 is not NULL 227} 228 229// Scan implements the [Scanner] interface. 230func (n *NullInt64) Scan(value any) error { 231 if value == nil { 232 n.Int64, n.Valid = 0, false 233 return nil 234 } 235 n.Valid = true 236 return convertAssign(&n.Int64, value) 237} 238 239// Value implements the [driver.Valuer] interface. 240func (n NullInt64) Value() (driver.Value, error) { 241 if !n.Valid { 242 return nil, nil 243 } 244 return n.Int64, nil 245} 246 247// NullInt32 represents an int32 that may be null. 248// NullInt32 implements the [Scanner] interface so 249// it can be used as a scan destination, similar to [NullString]. 250type NullInt32 struct { 251 Int32 int32 252 Valid bool // Valid is true if Int32 is not NULL 253} 254 255// Scan implements the [Scanner] interface. 256func (n *NullInt32) Scan(value any) error { 257 if value == nil { 258 n.Int32, n.Valid = 0, false 259 return nil 260 } 261 n.Valid = true 262 return convertAssign(&n.Int32, value) 263} 264 265// Value implements the [driver.Valuer] interface. 266func (n NullInt32) Value() (driver.Value, error) { 267 if !n.Valid { 268 return nil, nil 269 } 270 return int64(n.Int32), nil 271} 272 273// NullInt16 represents an int16 that may be null. 274// NullInt16 implements the [Scanner] interface so 275// it can be used as a scan destination, similar to [NullString]. 276type NullInt16 struct { 277 Int16 int16 278 Valid bool // Valid is true if Int16 is not NULL 279} 280 281// Scan implements the [Scanner] interface. 282func (n *NullInt16) Scan(value any) error { 283 if value == nil { 284 n.Int16, n.Valid = 0, false 285 return nil 286 } 287 err := convertAssign(&n.Int16, value) 288 n.Valid = err == nil 289 return err 290} 291 292// Value implements the [driver.Valuer] interface. 293func (n NullInt16) Value() (driver.Value, error) { 294 if !n.Valid { 295 return nil, nil 296 } 297 return int64(n.Int16), nil 298} 299 300// NullByte represents a byte that may be null. 301// NullByte implements the [Scanner] interface so 302// it can be used as a scan destination, similar to [NullString]. 303type NullByte struct { 304 Byte byte 305 Valid bool // Valid is true if Byte is not NULL 306} 307 308// Scan implements the [Scanner] interface. 309func (n *NullByte) Scan(value any) error { 310 if value == nil { 311 n.Byte, n.Valid = 0, false 312 return nil 313 } 314 err := convertAssign(&n.Byte, value) 315 n.Valid = err == nil 316 return err 317} 318 319// Value implements the [driver.Valuer] interface. 320func (n NullByte) Value() (driver.Value, error) { 321 if !n.Valid { 322 return nil, nil 323 } 324 return int64(n.Byte), nil 325} 326 327// NullFloat64 represents a float64 that may be null. 328// NullFloat64 implements the [Scanner] interface so 329// it can be used as a scan destination, similar to [NullString]. 330type NullFloat64 struct { 331 Float64 float64 332 Valid bool // Valid is true if Float64 is not NULL 333} 334 335// Scan implements the [Scanner] interface. 336func (n *NullFloat64) Scan(value any) error { 337 if value == nil { 338 n.Float64, n.Valid = 0, false 339 return nil 340 } 341 n.Valid = true 342 return convertAssign(&n.Float64, value) 343} 344 345// Value implements the [driver.Valuer] interface. 346func (n NullFloat64) Value() (driver.Value, error) { 347 if !n.Valid { 348 return nil, nil 349 } 350 return n.Float64, nil 351} 352 353// NullBool represents a bool that may be null. 354// NullBool implements the [Scanner] interface so 355// it can be used as a scan destination, similar to [NullString]. 356type NullBool struct { 357 Bool bool 358 Valid bool // Valid is true if Bool is not NULL 359} 360 361// Scan implements the [Scanner] interface. 362func (n *NullBool) Scan(value any) error { 363 if value == nil { 364 n.Bool, n.Valid = false, false 365 return nil 366 } 367 n.Valid = true 368 return convertAssign(&n.Bool, value) 369} 370 371// Value implements the [driver.Valuer] interface. 372func (n NullBool) Value() (driver.Value, error) { 373 if !n.Valid { 374 return nil, nil 375 } 376 return n.Bool, nil 377} 378 379// NullTime represents a [time.Time] that may be null. 380// NullTime implements the [Scanner] interface so 381// it can be used as a scan destination, similar to [NullString]. 382type NullTime struct { 383 Time time.Time 384 Valid bool // Valid is true if Time is not NULL 385} 386 387// Scan implements the [Scanner] interface. 388func (n *NullTime) Scan(value any) error { 389 if value == nil { 390 n.Time, n.Valid = time.Time{}, false 391 return nil 392 } 393 n.Valid = true 394 return convertAssign(&n.Time, value) 395} 396 397// Value implements the [driver.Valuer] interface. 398func (n NullTime) Value() (driver.Value, error) { 399 if !n.Valid { 400 return nil, nil 401 } 402 return n.Time, nil 403} 404 405// Null represents a value that may be null. 406// Null implements the [Scanner] interface so 407// it can be used as a scan destination: 408// 409// var s Null[string] 410// err := db.QueryRow("SELECT name FROM foo WHERE id=?", id).Scan(&s) 411// ... 412// if s.Valid { 413// // use s.V 414// } else { 415// // NULL value 416// } 417type Null[T any] struct { 418 V T 419 Valid bool 420} 421 422func (n *Null[T]) Scan(value any) error { 423 if value == nil { 424 n.V, n.Valid = *new(T), false 425 return nil 426 } 427 n.Valid = true 428 return convertAssign(&n.V, value) 429} 430 431func (n Null[T]) Value() (driver.Value, error) { 432 if !n.Valid { 433 return nil, nil 434 } 435 return n.V, nil 436} 437 438// Scanner is an interface used by [Rows.Scan]. 439type Scanner interface { 440 // Scan assigns a value from a database driver. 441 // 442 // The src value will be of one of the following types: 443 // 444 // int64 445 // float64 446 // bool 447 // []byte 448 // string 449 // time.Time 450 // nil - for NULL values 451 // 452 // An error should be returned if the value cannot be stored 453 // without loss of information. 454 // 455 // Reference types such as []byte are only valid until the next call to Scan 456 // and should not be retained. Their underlying memory is owned by the driver. 457 // If retention is necessary, copy their values before the next call to Scan. 458 Scan(src any) error 459} 460 461// Out may be used to retrieve OUTPUT value parameters from stored procedures. 462// 463// Not all drivers and databases support OUTPUT value parameters. 464// 465// Example usage: 466// 467// var outArg string 468// _, err := db.ExecContext(ctx, "ProcName", sql.Named("Arg1", sql.Out{Dest: &outArg})) 469type Out struct { 470 _NamedFieldsRequired struct{} 471 472 // Dest is a pointer to the value that will be set to the result of the 473 // stored procedure's OUTPUT parameter. 474 Dest any 475 476 // In is whether the parameter is an INOUT parameter. If so, the input value to the stored 477 // procedure is the dereferenced value of Dest's pointer, which is then replaced with 478 // the output value. 479 In bool 480} 481 482// ErrNoRows is returned by [Row.Scan] when [DB.QueryRow] doesn't return a 483// row. In such a case, QueryRow returns a placeholder [*Row] value that 484// defers this error until a Scan. 485var ErrNoRows = errors.New("sql: no rows in result set") 486 487// DB is a database handle representing a pool of zero or more 488// underlying connections. It's safe for concurrent use by multiple 489// goroutines. 490// 491// The sql package creates and frees connections automatically; it 492// also maintains a free pool of idle connections. If the database has 493// a concept of per-connection state, such state can be reliably observed 494// within a transaction ([Tx]) or connection ([Conn]). Once [DB.Begin] is called, the 495// returned [Tx] is bound to a single connection. Once [Tx.Commit] or 496// [Tx.Rollback] is called on the transaction, that transaction's 497// connection is returned to [DB]'s idle connection pool. The pool size 498// can be controlled with [DB.SetMaxIdleConns]. 499type DB struct { 500 // Total time waited for new connections. 501 waitDuration atomic.Int64 502 503 connector driver.Connector 504 // numClosed is an atomic counter which represents a total number of 505 // closed connections. Stmt.openStmt checks it before cleaning closed 506 // connections in Stmt.css. 507 numClosed atomic.Uint64 508 509 mu sync.Mutex // protects following fields 510 freeConn []*driverConn // free connections ordered by returnedAt oldest to newest 511 connRequests connRequestSet 512 numOpen int // number of opened and pending open connections 513 // Used to signal the need for new connections 514 // a goroutine running connectionOpener() reads on this chan and 515 // maybeOpenNewConnections sends on the chan (one send per needed connection) 516 // It is closed during db.Close(). The close tells the connectionOpener 517 // goroutine to exit. 518 openerCh chan struct{} 519 closed bool 520 dep map[finalCloser]depSet 521 lastPut map[*driverConn]string // stacktrace of last conn's put; debug only 522 maxIdleCount int // zero means defaultMaxIdleConns; negative means 0 523 maxOpen int // <= 0 means unlimited 524 maxLifetime time.Duration // maximum amount of time a connection may be reused 525 maxIdleTime time.Duration // maximum amount of time a connection may be idle before being closed 526 cleanerCh chan struct{} 527 waitCount int64 // Total number of connections waited for. 528 maxIdleClosed int64 // Total number of connections closed due to idle count. 529 maxIdleTimeClosed int64 // Total number of connections closed due to idle time. 530 maxLifetimeClosed int64 // Total number of connections closed due to max connection lifetime limit. 531 532 stop func() // stop cancels the connection opener. 533} 534 535// connReuseStrategy determines how (*DB).conn returns database connections. 536type connReuseStrategy uint8 537 538const ( 539 // alwaysNewConn forces a new connection to the database. 540 alwaysNewConn connReuseStrategy = iota 541 // cachedOrNewConn returns a cached connection, if available, else waits 542 // for one to become available (if MaxOpenConns has been reached) or 543 // creates a new database connection. 544 cachedOrNewConn 545) 546 547// driverConn wraps a driver.Conn with a mutex, to 548// be held during all calls into the Conn. (including any calls onto 549// interfaces returned via that Conn, such as calls on Tx, Stmt, 550// Result, Rows) 551type driverConn struct { 552 db *DB 553 createdAt time.Time 554 555 sync.Mutex // guards following 556 ci driver.Conn 557 needReset bool // The connection session should be reset before use if true. 558 closed bool 559 finalClosed bool // ci.Close has been called 560 openStmt map[*driverStmt]bool 561 562 // guarded by db.mu 563 inUse bool 564 dbmuClosed bool // same as closed, but guarded by db.mu, for removeClosedStmtLocked 565 returnedAt time.Time // Time the connection was created or returned. 566 onPut []func() // code (with db.mu held) run when conn is next returned 567} 568 569func (dc *driverConn) releaseConn(err error) { 570 dc.db.putConn(dc, err, true) 571} 572 573func (dc *driverConn) removeOpenStmt(ds *driverStmt) { 574 dc.Lock() 575 defer dc.Unlock() 576 delete(dc.openStmt, ds) 577} 578 579func (dc *driverConn) expired(timeout time.Duration) bool { 580 if timeout <= 0 { 581 return false 582 } 583 return dc.createdAt.Add(timeout).Before(nowFunc()) 584} 585 586// resetSession checks if the driver connection needs the 587// session to be reset and if required, resets it. 588func (dc *driverConn) resetSession(ctx context.Context) error { 589 dc.Lock() 590 defer dc.Unlock() 591 592 if !dc.needReset { 593 return nil 594 } 595 if cr, ok := dc.ci.(driver.SessionResetter); ok { 596 return cr.ResetSession(ctx) 597 } 598 return nil 599} 600 601// validateConnection checks if the connection is valid and can 602// still be used. It also marks the session for reset if required. 603func (dc *driverConn) validateConnection(needsReset bool) bool { 604 dc.Lock() 605 defer dc.Unlock() 606 607 if needsReset { 608 dc.needReset = true 609 } 610 if cv, ok := dc.ci.(driver.Validator); ok { 611 return cv.IsValid() 612 } 613 return true 614} 615 616// prepareLocked prepares the query on dc. When cg == nil the dc must keep track of 617// the prepared statements in a pool. 618func (dc *driverConn) prepareLocked(ctx context.Context, cg stmtConnGrabber, query string) (*driverStmt, error) { 619 si, err := ctxDriverPrepare(ctx, dc.ci, query) 620 if err != nil { 621 return nil, err 622 } 623 ds := &driverStmt{Locker: dc, si: si} 624 625 // No need to manage open statements if there is a single connection grabber. 626 if cg != nil { 627 return ds, nil 628 } 629 630 // Track each driverConn's open statements, so we can close them 631 // before closing the conn. 632 // 633 // Wrap all driver.Stmt is *driverStmt to ensure they are only closed once. 634 if dc.openStmt == nil { 635 dc.openStmt = make(map[*driverStmt]bool) 636 } 637 dc.openStmt[ds] = true 638 return ds, nil 639} 640 641// the dc.db's Mutex is held. 642func (dc *driverConn) closeDBLocked() func() error { 643 dc.Lock() 644 defer dc.Unlock() 645 if dc.closed { 646 return func() error { return errors.New("sql: duplicate driverConn close") } 647 } 648 dc.closed = true 649 return dc.db.removeDepLocked(dc, dc) 650} 651 652func (dc *driverConn) Close() error { 653 dc.Lock() 654 if dc.closed { 655 dc.Unlock() 656 return errors.New("sql: duplicate driverConn close") 657 } 658 dc.closed = true 659 dc.Unlock() // not defer; removeDep finalClose calls may need to lock 660 661 // And now updates that require holding dc.mu.Lock. 662 dc.db.mu.Lock() 663 dc.dbmuClosed = true 664 fn := dc.db.removeDepLocked(dc, dc) 665 dc.db.mu.Unlock() 666 return fn() 667} 668 669func (dc *driverConn) finalClose() error { 670 var err error 671 672 // Each *driverStmt has a lock to the dc. Copy the list out of the dc 673 // before calling close on each stmt. 674 var openStmt []*driverStmt 675 withLock(dc, func() { 676 openStmt = make([]*driverStmt, 0, len(dc.openStmt)) 677 for ds := range dc.openStmt { 678 openStmt = append(openStmt, ds) 679 } 680 dc.openStmt = nil 681 }) 682 for _, ds := range openStmt { 683 ds.Close() 684 } 685 withLock(dc, func() { 686 dc.finalClosed = true 687 err = dc.ci.Close() 688 dc.ci = nil 689 }) 690 691 dc.db.mu.Lock() 692 dc.db.numOpen-- 693 dc.db.maybeOpenNewConnections() 694 dc.db.mu.Unlock() 695 696 dc.db.numClosed.Add(1) 697 return err 698} 699 700// driverStmt associates a driver.Stmt with the 701// *driverConn from which it came, so the driverConn's lock can be 702// held during calls. 703type driverStmt struct { 704 sync.Locker // the *driverConn 705 si driver.Stmt 706 closed bool 707 closeErr error // return value of previous Close call 708} 709 710// Close ensures driver.Stmt is only closed once and always returns the same 711// result. 712func (ds *driverStmt) Close() error { 713 ds.Lock() 714 defer ds.Unlock() 715 if ds.closed { 716 return ds.closeErr 717 } 718 ds.closed = true 719 ds.closeErr = ds.si.Close() 720 return ds.closeErr 721} 722 723// depSet is a finalCloser's outstanding dependencies 724type depSet map[any]bool // set of true bools 725 726// The finalCloser interface is used by (*DB).addDep and related 727// dependency reference counting. 728type finalCloser interface { 729 // finalClose is called when the reference count of an object 730 // goes to zero. (*DB).mu is not held while calling it. 731 finalClose() error 732} 733 734// addDep notes that x now depends on dep, and x's finalClose won't be 735// called until all of x's dependencies are removed with removeDep. 736func (db *DB) addDep(x finalCloser, dep any) { 737 db.mu.Lock() 738 defer db.mu.Unlock() 739 db.addDepLocked(x, dep) 740} 741 742func (db *DB) addDepLocked(x finalCloser, dep any) { 743 if db.dep == nil { 744 db.dep = make(map[finalCloser]depSet) 745 } 746 xdep := db.dep[x] 747 if xdep == nil { 748 xdep = make(depSet) 749 db.dep[x] = xdep 750 } 751 xdep[dep] = true 752} 753 754// removeDep notes that x no longer depends on dep. 755// If x still has dependencies, nil is returned. 756// If x no longer has any dependencies, its finalClose method will be 757// called and its error value will be returned. 758func (db *DB) removeDep(x finalCloser, dep any) error { 759 db.mu.Lock() 760 fn := db.removeDepLocked(x, dep) 761 db.mu.Unlock() 762 return fn() 763} 764 765func (db *DB) removeDepLocked(x finalCloser, dep any) func() error { 766 xdep, ok := db.dep[x] 767 if !ok { 768 panic(fmt.Sprintf("unpaired removeDep: no deps for %T", x)) 769 } 770 771 l0 := len(xdep) 772 delete(xdep, dep) 773 774 switch len(xdep) { 775 case l0: 776 // Nothing removed. Shouldn't happen. 777 panic(fmt.Sprintf("unpaired removeDep: no %T dep on %T", dep, x)) 778 case 0: 779 // No more dependencies. 780 delete(db.dep, x) 781 return x.finalClose 782 default: 783 // Dependencies remain. 784 return func() error { return nil } 785 } 786} 787 788// This is the size of the connectionOpener request chan (DB.openerCh). 789// This value should be larger than the maximum typical value 790// used for DB.maxOpen. If maxOpen is significantly larger than 791// connectionRequestQueueSize then it is possible for ALL calls into the *DB 792// to block until the connectionOpener can satisfy the backlog of requests. 793var connectionRequestQueueSize = 1000000 794 795type dsnConnector struct { 796 dsn string 797 driver driver.Driver 798} 799 800func (t dsnConnector) Connect(_ context.Context) (driver.Conn, error) { 801 return t.driver.Open(t.dsn) 802} 803 804func (t dsnConnector) Driver() driver.Driver { 805 return t.driver 806} 807 808// OpenDB opens a database using a [driver.Connector], allowing drivers to 809// bypass a string based data source name. 810// 811// Most users will open a database via a driver-specific connection 812// helper function that returns a [*DB]. No database drivers are included 813// in the Go standard library. See https://golang.org/s/sqldrivers for 814// a list of third-party drivers. 815// 816// OpenDB may just validate its arguments without creating a connection 817// to the database. To verify that the data source name is valid, call 818// [DB.Ping]. 819// 820// The returned [DB] is safe for concurrent use by multiple goroutines 821// and maintains its own pool of idle connections. Thus, the OpenDB 822// function should be called just once. It is rarely necessary to 823// close a [DB]. 824func OpenDB(c driver.Connector) *DB { 825 ctx, cancel := context.WithCancel(context.Background()) 826 db := &DB{ 827 connector: c, 828 openerCh: make(chan struct{}, connectionRequestQueueSize), 829 lastPut: make(map[*driverConn]string), 830 stop: cancel, 831 } 832 833 go db.connectionOpener(ctx) 834 835 return db 836} 837 838// Open opens a database specified by its database driver name and a 839// driver-specific data source name, usually consisting of at least a 840// database name and connection information. 841// 842// Most users will open a database via a driver-specific connection 843// helper function that returns a [*DB]. No database drivers are included 844// in the Go standard library. See https://golang.org/s/sqldrivers for 845// a list of third-party drivers. 846// 847// Open may just validate its arguments without creating a connection 848// to the database. To verify that the data source name is valid, call 849// [DB.Ping]. 850// 851// The returned [DB] is safe for concurrent use by multiple goroutines 852// and maintains its own pool of idle connections. Thus, the Open 853// function should be called just once. It is rarely necessary to 854// close a [DB]. 855func Open(driverName, dataSourceName string) (*DB, error) { 856 driversMu.RLock() 857 driveri, ok := drivers[driverName] 858 driversMu.RUnlock() 859 if !ok { 860 return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName) 861 } 862 863 if driverCtx, ok := driveri.(driver.DriverContext); ok { 864 connector, err := driverCtx.OpenConnector(dataSourceName) 865 if err != nil { 866 return nil, err 867 } 868 return OpenDB(connector), nil 869 } 870 871 return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil 872} 873 874func (db *DB) pingDC(ctx context.Context, dc *driverConn, release func(error)) error { 875 var err error 876 if pinger, ok := dc.ci.(driver.Pinger); ok { 877 withLock(dc, func() { 878 err = pinger.Ping(ctx) 879 }) 880 } 881 release(err) 882 return err 883} 884 885// PingContext verifies a connection to the database is still alive, 886// establishing a connection if necessary. 887func (db *DB) PingContext(ctx context.Context) error { 888 var dc *driverConn 889 var err error 890 891 err = db.retry(func(strategy connReuseStrategy) error { 892 dc, err = db.conn(ctx, strategy) 893 return err 894 }) 895 896 if err != nil { 897 return err 898 } 899 900 return db.pingDC(ctx, dc, dc.releaseConn) 901} 902 903// Ping verifies a connection to the database is still alive, 904// establishing a connection if necessary. 905// 906// Ping uses [context.Background] internally; to specify the context, use 907// [DB.PingContext]. 908func (db *DB) Ping() error { 909 return db.PingContext(context.Background()) 910} 911 912// Close closes the database and prevents new queries from starting. 913// Close then waits for all queries that have started processing on the server 914// to finish. 915// 916// It is rare to Close a [DB], as the [DB] handle is meant to be 917// long-lived and shared between many goroutines. 918func (db *DB) Close() error { 919 db.mu.Lock() 920 if db.closed { // Make DB.Close idempotent 921 db.mu.Unlock() 922 return nil 923 } 924 if db.cleanerCh != nil { 925 close(db.cleanerCh) 926 } 927 var err error 928 fns := make([]func() error, 0, len(db.freeConn)) 929 for _, dc := range db.freeConn { 930 fns = append(fns, dc.closeDBLocked()) 931 } 932 db.freeConn = nil 933 db.closed = true 934 db.connRequests.CloseAndRemoveAll() 935 db.mu.Unlock() 936 for _, fn := range fns { 937 err1 := fn() 938 if err1 != nil { 939 err = err1 940 } 941 } 942 db.stop() 943 if c, ok := db.connector.(io.Closer); ok { 944 err1 := c.Close() 945 if err1 != nil { 946 err = err1 947 } 948 } 949 return err 950} 951 952const defaultMaxIdleConns = 2 953 954func (db *DB) maxIdleConnsLocked() int { 955 n := db.maxIdleCount 956 switch { 957 case n == 0: 958 // TODO(bradfitz): ask driver, if supported, for its default preference 959 return defaultMaxIdleConns 960 case n < 0: 961 return 0 962 default: 963 return n 964 } 965} 966 967func (db *DB) shortestIdleTimeLocked() time.Duration { 968 if db.maxIdleTime <= 0 { 969 return db.maxLifetime 970 } 971 if db.maxLifetime <= 0 { 972 return db.maxIdleTime 973 } 974 return min(db.maxIdleTime, db.maxLifetime) 975} 976 977// SetMaxIdleConns sets the maximum number of connections in the idle 978// connection pool. 979// 980// If MaxOpenConns is greater than 0 but less than the new MaxIdleConns, 981// then the new MaxIdleConns will be reduced to match the MaxOpenConns limit. 982// 983// If n <= 0, no idle connections are retained. 984// 985// The default max idle connections is currently 2. This may change in 986// a future release. 987func (db *DB) SetMaxIdleConns(n int) { 988 db.mu.Lock() 989 if n > 0 { 990 db.maxIdleCount = n 991 } else { 992 // No idle connections. 993 db.maxIdleCount = -1 994 } 995 // Make sure maxIdle doesn't exceed maxOpen 996 if db.maxOpen > 0 && db.maxIdleConnsLocked() > db.maxOpen { 997 db.maxIdleCount = db.maxOpen 998 } 999 var closing []*driverConn 1000 idleCount := len(db.freeConn) 1001 maxIdle := db.maxIdleConnsLocked() 1002 if idleCount > maxIdle { 1003 closing = db.freeConn[maxIdle:] 1004 db.freeConn = db.freeConn[:maxIdle] 1005 } 1006 db.maxIdleClosed += int64(len(closing)) 1007 db.mu.Unlock() 1008 for _, c := range closing { 1009 c.Close() 1010 } 1011} 1012 1013// SetMaxOpenConns sets the maximum number of open connections to the database. 1014// 1015// If MaxIdleConns is greater than 0 and the new MaxOpenConns is less than 1016// MaxIdleConns, then MaxIdleConns will be reduced to match the new 1017// MaxOpenConns limit. 1018// 1019// If n <= 0, then there is no limit on the number of open connections. 1020// The default is 0 (unlimited). 1021func (db *DB) SetMaxOpenConns(n int) { 1022 db.mu.Lock() 1023 db.maxOpen = n 1024 if n < 0 { 1025 db.maxOpen = 0 1026 } 1027 syncMaxIdle := db.maxOpen > 0 && db.maxIdleConnsLocked() > db.maxOpen 1028 db.mu.Unlock() 1029 if syncMaxIdle { 1030 db.SetMaxIdleConns(n) 1031 } 1032} 1033 1034// SetConnMaxLifetime sets the maximum amount of time a connection may be reused. 1035// 1036// Expired connections may be closed lazily before reuse. 1037// 1038// If d <= 0, connections are not closed due to a connection's age. 1039func (db *DB) SetConnMaxLifetime(d time.Duration) { 1040 if d < 0 { 1041 d = 0 1042 } 1043 db.mu.Lock() 1044 // Wake cleaner up when lifetime is shortened. 1045 if d > 0 && d < db.maxLifetime && db.cleanerCh != nil { 1046 select { 1047 case db.cleanerCh <- struct{}{}: 1048 default: 1049 } 1050 } 1051 db.maxLifetime = d 1052 db.startCleanerLocked() 1053 db.mu.Unlock() 1054} 1055 1056// SetConnMaxIdleTime sets the maximum amount of time a connection may be idle. 1057// 1058// Expired connections may be closed lazily before reuse. 1059// 1060// If d <= 0, connections are not closed due to a connection's idle time. 1061func (db *DB) SetConnMaxIdleTime(d time.Duration) { 1062 if d < 0 { 1063 d = 0 1064 } 1065 db.mu.Lock() 1066 defer db.mu.Unlock() 1067 1068 // Wake cleaner up when idle time is shortened. 1069 if d > 0 && d < db.maxIdleTime && db.cleanerCh != nil { 1070 select { 1071 case db.cleanerCh <- struct{}{}: 1072 default: 1073 } 1074 } 1075 db.maxIdleTime = d 1076 db.startCleanerLocked() 1077} 1078 1079// startCleanerLocked starts connectionCleaner if needed. 1080func (db *DB) startCleanerLocked() { 1081 if (db.maxLifetime > 0 || db.maxIdleTime > 0) && db.numOpen > 0 && db.cleanerCh == nil { 1082 db.cleanerCh = make(chan struct{}, 1) 1083 go db.connectionCleaner(db.shortestIdleTimeLocked()) 1084 } 1085} 1086 1087func (db *DB) connectionCleaner(d time.Duration) { 1088 const minInterval = time.Second 1089 1090 if d < minInterval { 1091 d = minInterval 1092 } 1093 t := time.NewTimer(d) 1094 1095 for { 1096 select { 1097 case <-t.C: 1098 case <-db.cleanerCh: // maxLifetime was changed or db was closed. 1099 } 1100 1101 db.mu.Lock() 1102 1103 d = db.shortestIdleTimeLocked() 1104 if db.closed || db.numOpen == 0 || d <= 0 { 1105 db.cleanerCh = nil 1106 db.mu.Unlock() 1107 return 1108 } 1109 1110 d, closing := db.connectionCleanerRunLocked(d) 1111 db.mu.Unlock() 1112 for _, c := range closing { 1113 c.Close() 1114 } 1115 1116 if d < minInterval { 1117 d = minInterval 1118 } 1119 1120 if !t.Stop() { 1121 select { 1122 case <-t.C: 1123 default: 1124 } 1125 } 1126 t.Reset(d) 1127 } 1128} 1129 1130// connectionCleanerRunLocked removes connections that should be closed from 1131// freeConn and returns them along side an updated duration to the next check 1132// if a quicker check is required to ensure connections are checked appropriately. 1133func (db *DB) connectionCleanerRunLocked(d time.Duration) (time.Duration, []*driverConn) { 1134 var idleClosing int64 1135 var closing []*driverConn 1136 if db.maxIdleTime > 0 { 1137 // As freeConn is ordered by returnedAt process 1138 // in reverse order to minimise the work needed. 1139 idleSince := nowFunc().Add(-db.maxIdleTime) 1140 last := len(db.freeConn) - 1 1141 for i := last; i >= 0; i-- { 1142 c := db.freeConn[i] 1143 if c.returnedAt.Before(idleSince) { 1144 i++ 1145 closing = db.freeConn[:i:i] 1146 db.freeConn = db.freeConn[i:] 1147 idleClosing = int64(len(closing)) 1148 db.maxIdleTimeClosed += idleClosing 1149 break 1150 } 1151 } 1152 1153 if len(db.freeConn) > 0 { 1154 c := db.freeConn[0] 1155 if d2 := c.returnedAt.Sub(idleSince); d2 < d { 1156 // Ensure idle connections are cleaned up as soon as 1157 // possible. 1158 d = d2 1159 } 1160 } 1161 } 1162 1163 if db.maxLifetime > 0 { 1164 expiredSince := nowFunc().Add(-db.maxLifetime) 1165 for i := 0; i < len(db.freeConn); i++ { 1166 c := db.freeConn[i] 1167 if c.createdAt.Before(expiredSince) { 1168 closing = append(closing, c) 1169 1170 last := len(db.freeConn) - 1 1171 // Use slow delete as order is required to ensure 1172 // connections are reused least idle time first. 1173 copy(db.freeConn[i:], db.freeConn[i+1:]) 1174 db.freeConn[last] = nil 1175 db.freeConn = db.freeConn[:last] 1176 i-- 1177 } else if d2 := c.createdAt.Sub(expiredSince); d2 < d { 1178 // Prevent connections sitting the freeConn when they 1179 // have expired by updating our next deadline d. 1180 d = d2 1181 } 1182 } 1183 db.maxLifetimeClosed += int64(len(closing)) - idleClosing 1184 } 1185 1186 return d, closing 1187} 1188 1189// DBStats contains database statistics. 1190type DBStats struct { 1191 MaxOpenConnections int // Maximum number of open connections to the database. 1192 1193 // Pool Status 1194 OpenConnections int // The number of established connections both in use and idle. 1195 InUse int // The number of connections currently in use. 1196 Idle int // The number of idle connections. 1197 1198 // Counters 1199 WaitCount int64 // The total number of connections waited for. 1200 WaitDuration time.Duration // The total time blocked waiting for a new connection. 1201 MaxIdleClosed int64 // The total number of connections closed due to SetMaxIdleConns. 1202 MaxIdleTimeClosed int64 // The total number of connections closed due to SetConnMaxIdleTime. 1203 MaxLifetimeClosed int64 // The total number of connections closed due to SetConnMaxLifetime. 1204} 1205 1206// Stats returns database statistics. 1207func (db *DB) Stats() DBStats { 1208 wait := db.waitDuration.Load() 1209 1210 db.mu.Lock() 1211 defer db.mu.Unlock() 1212 1213 stats := DBStats{ 1214 MaxOpenConnections: db.maxOpen, 1215 1216 Idle: len(db.freeConn), 1217 OpenConnections: db.numOpen, 1218 InUse: db.numOpen - len(db.freeConn), 1219 1220 WaitCount: db.waitCount, 1221 WaitDuration: time.Duration(wait), 1222 MaxIdleClosed: db.maxIdleClosed, 1223 MaxIdleTimeClosed: db.maxIdleTimeClosed, 1224 MaxLifetimeClosed: db.maxLifetimeClosed, 1225 } 1226 return stats 1227} 1228 1229// Assumes db.mu is locked. 1230// If there are connRequests and the connection limit hasn't been reached, 1231// then tell the connectionOpener to open new connections. 1232func (db *DB) maybeOpenNewConnections() { 1233 numRequests := db.connRequests.Len() 1234 if db.maxOpen > 0 { 1235 numCanOpen := db.maxOpen - db.numOpen 1236 if numRequests > numCanOpen { 1237 numRequests = numCanOpen 1238 } 1239 } 1240 for numRequests > 0 { 1241 db.numOpen++ // optimistically 1242 numRequests-- 1243 if db.closed { 1244 return 1245 } 1246 db.openerCh <- struct{}{} 1247 } 1248} 1249 1250// Runs in a separate goroutine, opens new connections when requested. 1251func (db *DB) connectionOpener(ctx context.Context) { 1252 for { 1253 select { 1254 case <-ctx.Done(): 1255 return 1256 case <-db.openerCh: 1257 db.openNewConnection(ctx) 1258 } 1259 } 1260} 1261 1262// Open one new connection 1263func (db *DB) openNewConnection(ctx context.Context) { 1264 // maybeOpenNewConnections has already executed db.numOpen++ before it sent 1265 // on db.openerCh. This function must execute db.numOpen-- if the 1266 // connection fails or is closed before returning. 1267 ci, err := db.connector.Connect(ctx) 1268 db.mu.Lock() 1269 defer db.mu.Unlock() 1270 if db.closed { 1271 if err == nil { 1272 ci.Close() 1273 } 1274 db.numOpen-- 1275 return 1276 } 1277 if err != nil { 1278 db.numOpen-- 1279 db.putConnDBLocked(nil, err) 1280 db.maybeOpenNewConnections() 1281 return 1282 } 1283 dc := &driverConn{ 1284 db: db, 1285 createdAt: nowFunc(), 1286 returnedAt: nowFunc(), 1287 ci: ci, 1288 } 1289 if db.putConnDBLocked(dc, err) { 1290 db.addDepLocked(dc, dc) 1291 } else { 1292 db.numOpen-- 1293 ci.Close() 1294 } 1295} 1296 1297// connRequest represents one request for a new connection 1298// When there are no idle connections available, DB.conn will create 1299// a new connRequest and put it on the db.connRequests list. 1300type connRequest struct { 1301 conn *driverConn 1302 err error 1303} 1304 1305var errDBClosed = errors.New("sql: database is closed") 1306 1307// conn returns a newly-opened or cached *driverConn. 1308func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) { 1309 db.mu.Lock() 1310 if db.closed { 1311 db.mu.Unlock() 1312 return nil, errDBClosed 1313 } 1314 // Check if the context is expired. 1315 select { 1316 default: 1317 case <-ctx.Done(): 1318 db.mu.Unlock() 1319 return nil, ctx.Err() 1320 } 1321 lifetime := db.maxLifetime 1322 1323 // Prefer a free connection, if possible. 1324 last := len(db.freeConn) - 1 1325 if strategy == cachedOrNewConn && last >= 0 { 1326 // Reuse the lowest idle time connection so we can close 1327 // connections which remain idle as soon as possible. 1328 conn := db.freeConn[last] 1329 db.freeConn = db.freeConn[:last] 1330 conn.inUse = true 1331 if conn.expired(lifetime) { 1332 db.maxLifetimeClosed++ 1333 db.mu.Unlock() 1334 conn.Close() 1335 return nil, driver.ErrBadConn 1336 } 1337 db.mu.Unlock() 1338 1339 // Reset the session if required. 1340 if err := conn.resetSession(ctx); errors.Is(err, driver.ErrBadConn) { 1341 conn.Close() 1342 return nil, err 1343 } 1344 1345 return conn, nil 1346 } 1347 1348 // Out of free connections or we were asked not to use one. If we're not 1349 // allowed to open any more connections, make a request and wait. 1350 if db.maxOpen > 0 && db.numOpen >= db.maxOpen { 1351 // Make the connRequest channel. It's buffered so that the 1352 // connectionOpener doesn't block while waiting for the req to be read. 1353 req := make(chan connRequest, 1) 1354 delHandle := db.connRequests.Add(req) 1355 db.waitCount++ 1356 db.mu.Unlock() 1357 1358 waitStart := nowFunc() 1359 1360 // Timeout the connection request with the context. 1361 select { 1362 case <-ctx.Done(): 1363 // Remove the connection request and ensure no value has been sent 1364 // on it after removing. 1365 db.mu.Lock() 1366 deleted := db.connRequests.Delete(delHandle) 1367 db.mu.Unlock() 1368 1369 db.waitDuration.Add(int64(time.Since(waitStart))) 1370 1371 // If we failed to delete it, that means either the DB was closed or 1372 // something else grabbed it and is about to send on it. 1373 if !deleted { 1374 // TODO(bradfitz): rather than this best effort select, we 1375 // should probably start a goroutine to read from req. This best 1376 // effort select existed before the change to check 'deleted'. 1377 // But if we know for sure it wasn't deleted and a sender is 1378 // outstanding, we should probably block on req (in a new 1379 // goroutine) to get the connection back. 1380 select { 1381 default: 1382 case ret, ok := <-req: 1383 if ok && ret.conn != nil { 1384 db.putConn(ret.conn, ret.err, false) 1385 } 1386 } 1387 } 1388 return nil, ctx.Err() 1389 case ret, ok := <-req: 1390 db.waitDuration.Add(int64(time.Since(waitStart))) 1391 1392 if !ok { 1393 return nil, errDBClosed 1394 } 1395 // Only check if the connection is expired if the strategy is cachedOrNewConns. 1396 // If we require a new connection, just re-use the connection without looking 1397 // at the expiry time. If it is expired, it will be checked when it is placed 1398 // back into the connection pool. 1399 // This prioritizes giving a valid connection to a client over the exact connection 1400 // lifetime, which could expire exactly after this point anyway. 1401 if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) { 1402 db.mu.Lock() 1403 db.maxLifetimeClosed++ 1404 db.mu.Unlock() 1405 ret.conn.Close() 1406 return nil, driver.ErrBadConn 1407 } 1408 if ret.conn == nil { 1409 return nil, ret.err 1410 } 1411 1412 // Reset the session if required. 1413 if err := ret.conn.resetSession(ctx); errors.Is(err, driver.ErrBadConn) { 1414 ret.conn.Close() 1415 return nil, err 1416 } 1417 return ret.conn, ret.err 1418 } 1419 } 1420 1421 db.numOpen++ // optimistically 1422 db.mu.Unlock() 1423 ci, err := db.connector.Connect(ctx) 1424 if err != nil { 1425 db.mu.Lock() 1426 db.numOpen-- // correct for earlier optimism 1427 db.maybeOpenNewConnections() 1428 db.mu.Unlock() 1429 return nil, err 1430 } 1431 db.mu.Lock() 1432 dc := &driverConn{ 1433 db: db, 1434 createdAt: nowFunc(), 1435 returnedAt: nowFunc(), 1436 ci: ci, 1437 inUse: true, 1438 } 1439 db.addDepLocked(dc, dc) 1440 db.mu.Unlock() 1441 return dc, nil 1442} 1443 1444// putConnHook is a hook for testing. 1445var putConnHook func(*DB, *driverConn) 1446 1447// noteUnusedDriverStatement notes that ds is no longer used and should 1448// be closed whenever possible (when c is next not in use), unless c is 1449// already closed. 1450func (db *DB) noteUnusedDriverStatement(c *driverConn, ds *driverStmt) { 1451 db.mu.Lock() 1452 defer db.mu.Unlock() 1453 if c.inUse { 1454 c.onPut = append(c.onPut, func() { 1455 ds.Close() 1456 }) 1457 } else { 1458 c.Lock() 1459 fc := c.finalClosed 1460 c.Unlock() 1461 if !fc { 1462 ds.Close() 1463 } 1464 } 1465} 1466 1467// debugGetPut determines whether getConn & putConn calls' stack traces 1468// are returned for more verbose crashes. 1469const debugGetPut = false 1470 1471// putConn adds a connection to the db's free pool. 1472// err is optionally the last error that occurred on this connection. 1473func (db *DB) putConn(dc *driverConn, err error, resetSession bool) { 1474 if !errors.Is(err, driver.ErrBadConn) { 1475 if !dc.validateConnection(resetSession) { 1476 err = driver.ErrBadConn 1477 } 1478 } 1479 db.mu.Lock() 1480 if !dc.inUse { 1481 db.mu.Unlock() 1482 if debugGetPut { 1483 fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc]) 1484 } 1485 panic("sql: connection returned that was never out") 1486 } 1487 1488 if !errors.Is(err, driver.ErrBadConn) && dc.expired(db.maxLifetime) { 1489 db.maxLifetimeClosed++ 1490 err = driver.ErrBadConn 1491 } 1492 if debugGetPut { 1493 db.lastPut[dc] = stack() 1494 } 1495 dc.inUse = false 1496 dc.returnedAt = nowFunc() 1497 1498 for _, fn := range dc.onPut { 1499 fn() 1500 } 1501 dc.onPut = nil 1502 1503 if errors.Is(err, driver.ErrBadConn) { 1504 // Don't reuse bad connections. 1505 // Since the conn is considered bad and is being discarded, treat it 1506 // as closed. Don't decrement the open count here, finalClose will 1507 // take care of that. 1508 db.maybeOpenNewConnections() 1509 db.mu.Unlock() 1510 dc.Close() 1511 return 1512 } 1513 if putConnHook != nil { 1514 putConnHook(db, dc) 1515 } 1516 added := db.putConnDBLocked(dc, nil) 1517 db.mu.Unlock() 1518 1519 if !added { 1520 dc.Close() 1521 return 1522 } 1523} 1524 1525// Satisfy a connRequest or put the driverConn in the idle pool and return true 1526// or return false. 1527// putConnDBLocked will satisfy a connRequest if there is one, or it will 1528// return the *driverConn to the freeConn list if err == nil and the idle 1529// connection limit will not be exceeded. 1530// If err != nil, the value of dc is ignored. 1531// If err == nil, then dc must not equal nil. 1532// If a connRequest was fulfilled or the *driverConn was placed in the 1533// freeConn list, then true is returned, otherwise false is returned. 1534func (db *DB) putConnDBLocked(dc *driverConn, err error) bool { 1535 if db.closed { 1536 return false 1537 } 1538 if db.maxOpen > 0 && db.numOpen > db.maxOpen { 1539 return false 1540 } 1541 if req, ok := db.connRequests.TakeRandom(); ok { 1542 if err == nil { 1543 dc.inUse = true 1544 } 1545 req <- connRequest{ 1546 conn: dc, 1547 err: err, 1548 } 1549 return true 1550 } else if err == nil && !db.closed { 1551 if db.maxIdleConnsLocked() > len(db.freeConn) { 1552 db.freeConn = append(db.freeConn, dc) 1553 db.startCleanerLocked() 1554 return true 1555 } 1556 db.maxIdleClosed++ 1557 } 1558 return false 1559} 1560 1561// maxBadConnRetries is the number of maximum retries if the driver returns 1562// driver.ErrBadConn to signal a broken connection before forcing a new 1563// connection to be opened. 1564const maxBadConnRetries = 2 1565 1566func (db *DB) retry(fn func(strategy connReuseStrategy) error) error { 1567 for i := int64(0); i < maxBadConnRetries; i++ { 1568 err := fn(cachedOrNewConn) 1569 // retry if err is driver.ErrBadConn 1570 if err == nil || !errors.Is(err, driver.ErrBadConn) { 1571 return err 1572 } 1573 } 1574 1575 return fn(alwaysNewConn) 1576} 1577 1578// PrepareContext creates a prepared statement for later queries or executions. 1579// Multiple queries or executions may be run concurrently from the 1580// returned statement. 1581// The caller must call the statement's [*Stmt.Close] method 1582// when the statement is no longer needed. 1583// 1584// The provided context is used for the preparation of the statement, not for the 1585// execution of the statement. 1586func (db *DB) PrepareContext(ctx context.Context, query string) (*Stmt, error) { 1587 var stmt *Stmt 1588 var err error 1589 1590 err = db.retry(func(strategy connReuseStrategy) error { 1591 stmt, err = db.prepare(ctx, query, strategy) 1592 return err 1593 }) 1594 1595 return stmt, err 1596} 1597 1598// Prepare creates a prepared statement for later queries or executions. 1599// Multiple queries or executions may be run concurrently from the 1600// returned statement. 1601// The caller must call the statement's [*Stmt.Close] method 1602// when the statement is no longer needed. 1603// 1604// Prepare uses [context.Background] internally; to specify the context, use 1605// [DB.PrepareContext]. 1606func (db *DB) Prepare(query string) (*Stmt, error) { 1607 return db.PrepareContext(context.Background(), query) 1608} 1609 1610func (db *DB) prepare(ctx context.Context, query string, strategy connReuseStrategy) (*Stmt, error) { 1611 // TODO: check if db.driver supports an optional 1612 // driver.Preparer interface and call that instead, if so, 1613 // otherwise we make a prepared statement that's bound 1614 // to a connection, and to execute this prepared statement 1615 // we either need to use this connection (if it's free), else 1616 // get a new connection + re-prepare + execute on that one. 1617 dc, err := db.conn(ctx, strategy) 1618 if err != nil { 1619 return nil, err 1620 } 1621 return db.prepareDC(ctx, dc, dc.releaseConn, nil, query) 1622} 1623 1624// prepareDC prepares a query on the driverConn and calls release before 1625// returning. When cg == nil it implies that a connection pool is used, and 1626// when cg != nil only a single driver connection is used. 1627func (db *DB) prepareDC(ctx context.Context, dc *driverConn, release func(error), cg stmtConnGrabber, query string) (*Stmt, error) { 1628 var ds *driverStmt 1629 var err error 1630 defer func() { 1631 release(err) 1632 }() 1633 withLock(dc, func() { 1634 ds, err = dc.prepareLocked(ctx, cg, query) 1635 }) 1636 if err != nil { 1637 return nil, err 1638 } 1639 stmt := &Stmt{ 1640 db: db, 1641 query: query, 1642 cg: cg, 1643 cgds: ds, 1644 } 1645 1646 // When cg == nil this statement will need to keep track of various 1647 // connections they are prepared on and record the stmt dependency on 1648 // the DB. 1649 if cg == nil { 1650 stmt.css = []connStmt{{dc, ds}} 1651 stmt.lastNumClosed = db.numClosed.Load() 1652 db.addDep(stmt, stmt) 1653 } 1654 return stmt, nil 1655} 1656 1657// ExecContext executes a query without returning any rows. 1658// The args are for any placeholder parameters in the query. 1659func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (Result, error) { 1660 var res Result 1661 var err error 1662 1663 err = db.retry(func(strategy connReuseStrategy) error { 1664 res, err = db.exec(ctx, query, args, strategy) 1665 return err 1666 }) 1667 1668 return res, err 1669} 1670 1671// Exec executes a query without returning any rows. 1672// The args are for any placeholder parameters in the query. 1673// 1674// Exec uses [context.Background] internally; to specify the context, use 1675// [DB.ExecContext]. 1676func (db *DB) Exec(query string, args ...any) (Result, error) { 1677 return db.ExecContext(context.Background(), query, args...) 1678} 1679 1680func (db *DB) exec(ctx context.Context, query string, args []any, strategy connReuseStrategy) (Result, error) { 1681 dc, err := db.conn(ctx, strategy) 1682 if err != nil { 1683 return nil, err 1684 } 1685 return db.execDC(ctx, dc, dc.releaseConn, query, args) 1686} 1687 1688func (db *DB) execDC(ctx context.Context, dc *driverConn, release func(error), query string, args []any) (res Result, err error) { 1689 defer func() { 1690 release(err) 1691 }() 1692 execerCtx, ok := dc.ci.(driver.ExecerContext) 1693 var execer driver.Execer 1694 if !ok { 1695 execer, ok = dc.ci.(driver.Execer) 1696 } 1697 if ok { 1698 var nvdargs []driver.NamedValue 1699 var resi driver.Result 1700 withLock(dc, func() { 1701 nvdargs, err = driverArgsConnLocked(dc.ci, nil, args) 1702 if err != nil { 1703 return 1704 } 1705 resi, err = ctxDriverExec(ctx, execerCtx, execer, query, nvdargs) 1706 }) 1707 if err != driver.ErrSkip { 1708 if err != nil { 1709 return nil, err 1710 } 1711 return driverResult{dc, resi}, nil 1712 } 1713 } 1714 1715 var si driver.Stmt 1716 withLock(dc, func() { 1717 si, err = ctxDriverPrepare(ctx, dc.ci, query) 1718 }) 1719 if err != nil { 1720 return nil, err 1721 } 1722 ds := &driverStmt{Locker: dc, si: si} 1723 defer ds.Close() 1724 return resultFromStatement(ctx, dc.ci, ds, args...) 1725} 1726 1727// QueryContext executes a query that returns rows, typically a SELECT. 1728// The args are for any placeholder parameters in the query. 1729func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error) { 1730 var rows *Rows 1731 var err error 1732 1733 err = db.retry(func(strategy connReuseStrategy) error { 1734 rows, err = db.query(ctx, query, args, strategy) 1735 return err 1736 }) 1737 1738 return rows, err 1739} 1740 1741// Query executes a query that returns rows, typically a SELECT. 1742// The args are for any placeholder parameters in the query. 1743// 1744// Query uses [context.Background] internally; to specify the context, use 1745// [DB.QueryContext]. 1746func (db *DB) Query(query string, args ...any) (*Rows, error) { 1747 return db.QueryContext(context.Background(), query, args...) 1748} 1749 1750func (db *DB) query(ctx context.Context, query string, args []any, strategy connReuseStrategy) (*Rows, error) { 1751 dc, err := db.conn(ctx, strategy) 1752 if err != nil { 1753 return nil, err 1754 } 1755 1756 return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args) 1757} 1758 1759// queryDC executes a query on the given connection. 1760// The connection gets released by the releaseConn function. 1761// The ctx context is from a query method and the txctx context is from an 1762// optional transaction context. 1763func (db *DB) queryDC(ctx, txctx context.Context, dc *driverConn, releaseConn func(error), query string, args []any) (*Rows, error) { 1764 queryerCtx, ok := dc.ci.(driver.QueryerContext) 1765 var queryer driver.Queryer 1766 if !ok { 1767 queryer, ok = dc.ci.(driver.Queryer) 1768 } 1769 if ok { 1770 var nvdargs []driver.NamedValue 1771 var rowsi driver.Rows 1772 var err error 1773 withLock(dc, func() { 1774 nvdargs, err = driverArgsConnLocked(dc.ci, nil, args) 1775 if err != nil { 1776 return 1777 } 1778 rowsi, err = ctxDriverQuery(ctx, queryerCtx, queryer, query, nvdargs) 1779 }) 1780 if err != driver.ErrSkip { 1781 if err != nil { 1782 releaseConn(err) 1783 return nil, err 1784 } 1785 // Note: ownership of dc passes to the *Rows, to be freed 1786 // with releaseConn. 1787 rows := &Rows{ 1788 dc: dc, 1789 releaseConn: releaseConn, 1790 rowsi: rowsi, 1791 } 1792 rows.initContextClose(ctx, txctx) 1793 return rows, nil 1794 } 1795 } 1796 1797 var si driver.Stmt 1798 var err error 1799 withLock(dc, func() { 1800 si, err = ctxDriverPrepare(ctx, dc.ci, query) 1801 }) 1802 if err != nil { 1803 releaseConn(err) 1804 return nil, err 1805 } 1806 1807 ds := &driverStmt{Locker: dc, si: si} 1808 rowsi, err := rowsiFromStatement(ctx, dc.ci, ds, args...) 1809 if err != nil { 1810 ds.Close() 1811 releaseConn(err) 1812 return nil, err 1813 } 1814 1815 // Note: ownership of ci passes to the *Rows, to be freed 1816 // with releaseConn. 1817 rows := &Rows{ 1818 dc: dc, 1819 releaseConn: releaseConn, 1820 rowsi: rowsi, 1821 closeStmt: ds, 1822 } 1823 rows.initContextClose(ctx, txctx) 1824 return rows, nil 1825} 1826 1827// QueryRowContext executes a query that is expected to return at most one row. 1828// QueryRowContext always returns a non-nil value. Errors are deferred until 1829// [Row]'s Scan method is called. 1830// If the query selects no rows, the [*Row.Scan] will return [ErrNoRows]. 1831// Otherwise, [*Row.Scan] scans the first selected row and discards 1832// the rest. 1833func (db *DB) QueryRowContext(ctx context.Context, query string, args ...any) *Row { 1834 rows, err := db.QueryContext(ctx, query, args...) 1835 return &Row{rows: rows, err: err} 1836} 1837 1838// QueryRow executes a query that is expected to return at most one row. 1839// QueryRow always returns a non-nil value. Errors are deferred until 1840// [Row]'s Scan method is called. 1841// If the query selects no rows, the [*Row.Scan] will return [ErrNoRows]. 1842// Otherwise, [*Row.Scan] scans the first selected row and discards 1843// the rest. 1844// 1845// QueryRow uses [context.Background] internally; to specify the context, use 1846// [DB.QueryRowContext]. 1847func (db *DB) QueryRow(query string, args ...any) *Row { 1848 return db.QueryRowContext(context.Background(), query, args...) 1849} 1850 1851// BeginTx starts a transaction. 1852// 1853// The provided context is used until the transaction is committed or rolled back. 1854// If the context is canceled, the sql package will roll back 1855// the transaction. [Tx.Commit] will return an error if the context provided to 1856// BeginTx is canceled. 1857// 1858// The provided [TxOptions] is optional and may be nil if defaults should be used. 1859// If a non-default isolation level is used that the driver doesn't support, 1860// an error will be returned. 1861func (db *DB) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) { 1862 var tx *Tx 1863 var err error 1864 1865 err = db.retry(func(strategy connReuseStrategy) error { 1866 tx, err = db.begin(ctx, opts, strategy) 1867 return err 1868 }) 1869 1870 return tx, err 1871} 1872 1873// Begin starts a transaction. The default isolation level is dependent on 1874// the driver. 1875// 1876// Begin uses [context.Background] internally; to specify the context, use 1877// [DB.BeginTx]. 1878func (db *DB) Begin() (*Tx, error) { 1879 return db.BeginTx(context.Background(), nil) 1880} 1881 1882func (db *DB) begin(ctx context.Context, opts *TxOptions, strategy connReuseStrategy) (tx *Tx, err error) { 1883 dc, err := db.conn(ctx, strategy) 1884 if err != nil { 1885 return nil, err 1886 } 1887 return db.beginDC(ctx, dc, dc.releaseConn, opts) 1888} 1889 1890// beginDC starts a transaction. The provided dc must be valid and ready to use. 1891func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error), opts *TxOptions) (tx *Tx, err error) { 1892 var txi driver.Tx 1893 keepConnOnRollback := false 1894 withLock(dc, func() { 1895 _, hasSessionResetter := dc.ci.(driver.SessionResetter) 1896 _, hasConnectionValidator := dc.ci.(driver.Validator) 1897 keepConnOnRollback = hasSessionResetter && hasConnectionValidator 1898 txi, err = ctxDriverBegin(ctx, opts, dc.ci) 1899 }) 1900 if err != nil { 1901 release(err) 1902 return nil, err 1903 } 1904 1905 // Schedule the transaction to rollback when the context is canceled. 1906 // The cancel function in Tx will be called after done is set to true. 1907 ctx, cancel := context.WithCancel(ctx) 1908 tx = &Tx{ 1909 db: db, 1910 dc: dc, 1911 releaseConn: release, 1912 txi: txi, 1913 cancel: cancel, 1914 keepConnOnRollback: keepConnOnRollback, 1915 ctx: ctx, 1916 } 1917 go tx.awaitDone() 1918 return tx, nil 1919} 1920 1921// Driver returns the database's underlying driver. 1922func (db *DB) Driver() driver.Driver { 1923 return db.connector.Driver() 1924} 1925 1926// ErrConnDone is returned by any operation that is performed on a connection 1927// that has already been returned to the connection pool. 1928var ErrConnDone = errors.New("sql: connection is already closed") 1929 1930// Conn returns a single connection by either opening a new connection 1931// or returning an existing connection from the connection pool. Conn will 1932// block until either a connection is returned or ctx is canceled. 1933// Queries run on the same Conn will be run in the same database session. 1934// 1935// Every Conn must be returned to the database pool after use by 1936// calling [Conn.Close]. 1937func (db *DB) Conn(ctx context.Context) (*Conn, error) { 1938 var dc *driverConn 1939 var err error 1940 1941 err = db.retry(func(strategy connReuseStrategy) error { 1942 dc, err = db.conn(ctx, strategy) 1943 return err 1944 }) 1945 1946 if err != nil { 1947 return nil, err 1948 } 1949 1950 conn := &Conn{ 1951 db: db, 1952 dc: dc, 1953 } 1954 return conn, nil 1955} 1956 1957type releaseConn func(error) 1958 1959// Conn represents a single database connection rather than a pool of database 1960// connections. Prefer running queries from [DB] unless there is a specific 1961// need for a continuous single database connection. 1962// 1963// A Conn must call [Conn.Close] to return the connection to the database pool 1964// and may do so concurrently with a running query. 1965// 1966// After a call to [Conn.Close], all operations on the 1967// connection fail with [ErrConnDone]. 1968type Conn struct { 1969 db *DB 1970 1971 // closemu prevents the connection from closing while there 1972 // is an active query. It is held for read during queries 1973 // and exclusively during close. 1974 closemu sync.RWMutex 1975 1976 // dc is owned until close, at which point 1977 // it's returned to the connection pool. 1978 dc *driverConn 1979 1980 // done transitions from false to true exactly once, on close. 1981 // Once done, all operations fail with ErrConnDone. 1982 done atomic.Bool 1983 1984 releaseConnOnce sync.Once 1985 // releaseConnCache is a cache of c.closemuRUnlockCondReleaseConn 1986 // to save allocations in a call to grabConn. 1987 releaseConnCache releaseConn 1988} 1989 1990// grabConn takes a context to implement stmtConnGrabber 1991// but the context is not used. 1992func (c *Conn) grabConn(context.Context) (*driverConn, releaseConn, error) { 1993 if c.done.Load() { 1994 return nil, nil, ErrConnDone 1995 } 1996 c.releaseConnOnce.Do(func() { 1997 c.releaseConnCache = c.closemuRUnlockCondReleaseConn 1998 }) 1999 c.closemu.RLock() 2000 return c.dc, c.releaseConnCache, nil 2001} 2002 2003// PingContext verifies the connection to the database is still alive. 2004func (c *Conn) PingContext(ctx context.Context) error { 2005 dc, release, err := c.grabConn(ctx) 2006 if err != nil { 2007 return err 2008 } 2009 return c.db.pingDC(ctx, dc, release) 2010} 2011 2012// ExecContext executes a query without returning any rows. 2013// The args are for any placeholder parameters in the query. 2014func (c *Conn) ExecContext(ctx context.Context, query string, args ...any) (Result, error) { 2015 dc, release, err := c.grabConn(ctx) 2016 if err != nil { 2017 return nil, err 2018 } 2019 return c.db.execDC(ctx, dc, release, query, args) 2020} 2021 2022// QueryContext executes a query that returns rows, typically a SELECT. 2023// The args are for any placeholder parameters in the query. 2024func (c *Conn) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error) { 2025 dc, release, err := c.grabConn(ctx) 2026 if err != nil { 2027 return nil, err 2028 } 2029 return c.db.queryDC(ctx, nil, dc, release, query, args) 2030} 2031 2032// QueryRowContext executes a query that is expected to return at most one row. 2033// QueryRowContext always returns a non-nil value. Errors are deferred until 2034// the [*Row.Scan] method is called. 2035// If the query selects no rows, the [*Row.Scan] will return [ErrNoRows]. 2036// Otherwise, the [*Row.Scan] scans the first selected row and discards 2037// the rest. 2038func (c *Conn) QueryRowContext(ctx context.Context, query string, args ...any) *Row { 2039 rows, err := c.QueryContext(ctx, query, args...) 2040 return &Row{rows: rows, err: err} 2041} 2042 2043// PrepareContext creates a prepared statement for later queries or executions. 2044// Multiple queries or executions may be run concurrently from the 2045// returned statement. 2046// The caller must call the statement's [*Stmt.Close] method 2047// when the statement is no longer needed. 2048// 2049// The provided context is used for the preparation of the statement, not for the 2050// execution of the statement. 2051func (c *Conn) PrepareContext(ctx context.Context, query string) (*Stmt, error) { 2052 dc, release, err := c.grabConn(ctx) 2053 if err != nil { 2054 return nil, err 2055 } 2056 return c.db.prepareDC(ctx, dc, release, c, query) 2057} 2058 2059// Raw executes f exposing the underlying driver connection for the 2060// duration of f. The driverConn must not be used outside of f. 2061// 2062// Once f returns and err is not [driver.ErrBadConn], the [Conn] will continue to be usable 2063// until [Conn.Close] is called. 2064func (c *Conn) Raw(f func(driverConn any) error) (err error) { 2065 var dc *driverConn 2066 var release releaseConn 2067 2068 // grabConn takes a context to implement stmtConnGrabber, but the context is not used. 2069 dc, release, err = c.grabConn(nil) 2070 if err != nil { 2071 return 2072 } 2073 fPanic := true 2074 dc.Mutex.Lock() 2075 defer func() { 2076 dc.Mutex.Unlock() 2077 2078 // If f panics fPanic will remain true. 2079 // Ensure an error is passed to release so the connection 2080 // may be discarded. 2081 if fPanic { 2082 err = driver.ErrBadConn 2083 } 2084 release(err) 2085 }() 2086 err = f(dc.ci) 2087 fPanic = false 2088 2089 return 2090} 2091 2092// BeginTx starts a transaction. 2093// 2094// The provided context is used until the transaction is committed or rolled back. 2095// If the context is canceled, the sql package will roll back 2096// the transaction. [Tx.Commit] will return an error if the context provided to 2097// BeginTx is canceled. 2098// 2099// The provided [TxOptions] is optional and may be nil if defaults should be used. 2100// If a non-default isolation level is used that the driver doesn't support, 2101// an error will be returned. 2102func (c *Conn) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) { 2103 dc, release, err := c.grabConn(ctx) 2104 if err != nil { 2105 return nil, err 2106 } 2107 return c.db.beginDC(ctx, dc, release, opts) 2108} 2109 2110// closemuRUnlockCondReleaseConn read unlocks closemu 2111// as the sql operation is done with the dc. 2112func (c *Conn) closemuRUnlockCondReleaseConn(err error) { 2113 c.closemu.RUnlock() 2114 if errors.Is(err, driver.ErrBadConn) { 2115 c.close(err) 2116 } 2117} 2118 2119func (c *Conn) txCtx() context.Context { 2120 return nil 2121} 2122 2123func (c *Conn) close(err error) error { 2124 if !c.done.CompareAndSwap(false, true) { 2125 return ErrConnDone 2126 } 2127 2128 // Lock around releasing the driver connection 2129 // to ensure all queries have been stopped before doing so. 2130 c.closemu.Lock() 2131 defer c.closemu.Unlock() 2132 2133 c.dc.releaseConn(err) 2134 c.dc = nil 2135 c.db = nil 2136 return err 2137} 2138 2139// Close returns the connection to the connection pool. 2140// All operations after a Close will return with [ErrConnDone]. 2141// Close is safe to call concurrently with other operations and will 2142// block until all other operations finish. It may be useful to first 2143// cancel any used context and then call close directly after. 2144func (c *Conn) Close() error { 2145 return c.close(nil) 2146} 2147 2148// Tx is an in-progress database transaction. 2149// 2150// A transaction must end with a call to [Tx.Commit] or [Tx.Rollback]. 2151// 2152// After a call to [Tx.Commit] or [Tx.Rollback], all operations on the 2153// transaction fail with [ErrTxDone]. 2154// 2155// The statements prepared for a transaction by calling 2156// the transaction's [Tx.Prepare] or [Tx.Stmt] methods are closed 2157// by the call to [Tx.Commit] or [Tx.Rollback]. 2158type Tx struct { 2159 db *DB 2160 2161 // closemu prevents the transaction from closing while there 2162 // is an active query. It is held for read during queries 2163 // and exclusively during close. 2164 closemu sync.RWMutex 2165 2166 // dc is owned exclusively until Commit or Rollback, at which point 2167 // it's returned with putConn. 2168 dc *driverConn 2169 txi driver.Tx 2170 2171 // releaseConn is called once the Tx is closed to release 2172 // any held driverConn back to the pool. 2173 releaseConn func(error) 2174 2175 // done transitions from false to true exactly once, on Commit 2176 // or Rollback. once done, all operations fail with 2177 // ErrTxDone. 2178 done atomic.Bool 2179 2180 // keepConnOnRollback is true if the driver knows 2181 // how to reset the connection's session and if need be discard 2182 // the connection. 2183 keepConnOnRollback bool 2184 2185 // All Stmts prepared for this transaction. These will be closed after the 2186 // transaction has been committed or rolled back. 2187 stmts struct { 2188 sync.Mutex 2189 v []*Stmt 2190 } 2191 2192 // cancel is called after done transitions from 0 to 1. 2193 cancel func() 2194 2195 // ctx lives for the life of the transaction. 2196 ctx context.Context 2197} 2198 2199// awaitDone blocks until the context in Tx is canceled and rolls back 2200// the transaction if it's not already done. 2201func (tx *Tx) awaitDone() { 2202 // Wait for either the transaction to be committed or rolled 2203 // back, or for the associated context to be closed. 2204 <-tx.ctx.Done() 2205 2206 // Discard and close the connection used to ensure the 2207 // transaction is closed and the resources are released. This 2208 // rollback does nothing if the transaction has already been 2209 // committed or rolled back. 2210 // Do not discard the connection if the connection knows 2211 // how to reset the session. 2212 discardConnection := !tx.keepConnOnRollback 2213 tx.rollback(discardConnection) 2214} 2215 2216func (tx *Tx) isDone() bool { 2217 return tx.done.Load() 2218} 2219 2220// ErrTxDone is returned by any operation that is performed on a transaction 2221// that has already been committed or rolled back. 2222var ErrTxDone = errors.New("sql: transaction has already been committed or rolled back") 2223 2224// close returns the connection to the pool and 2225// must only be called by Tx.rollback or Tx.Commit while 2226// tx is already canceled and won't be executed concurrently. 2227func (tx *Tx) close(err error) { 2228 tx.releaseConn(err) 2229 tx.dc = nil 2230 tx.txi = nil 2231} 2232 2233// hookTxGrabConn specifies an optional hook to be called on 2234// a successful call to (*Tx).grabConn. For tests. 2235var hookTxGrabConn func() 2236 2237func (tx *Tx) grabConn(ctx context.Context) (*driverConn, releaseConn, error) { 2238 select { 2239 default: 2240 case <-ctx.Done(): 2241 return nil, nil, ctx.Err() 2242 } 2243 2244 // closemu.RLock must come before the check for isDone to prevent the Tx from 2245 // closing while a query is executing. 2246 tx.closemu.RLock() 2247 if tx.isDone() { 2248 tx.closemu.RUnlock() 2249 return nil, nil, ErrTxDone 2250 } 2251 if hookTxGrabConn != nil { // test hook 2252 hookTxGrabConn() 2253 } 2254 return tx.dc, tx.closemuRUnlockRelease, nil 2255} 2256 2257func (tx *Tx) txCtx() context.Context { 2258 return tx.ctx 2259} 2260 2261// closemuRUnlockRelease is used as a func(error) method value in 2262// [DB.ExecContext] and [DB.QueryContext]. Unlocking in the releaseConn keeps 2263// the driver conn from being returned to the connection pool until 2264// the Rows has been closed. 2265func (tx *Tx) closemuRUnlockRelease(error) { 2266 tx.closemu.RUnlock() 2267} 2268 2269// Closes all Stmts prepared for this transaction. 2270func (tx *Tx) closePrepared() { 2271 tx.stmts.Lock() 2272 defer tx.stmts.Unlock() 2273 for _, stmt := range tx.stmts.v { 2274 stmt.Close() 2275 } 2276} 2277 2278// Commit commits the transaction. 2279func (tx *Tx) Commit() error { 2280 // Check context first to avoid transaction leak. 2281 // If put it behind tx.done CompareAndSwap statement, we can't ensure 2282 // the consistency between tx.done and the real COMMIT operation. 2283 select { 2284 default: 2285 case <-tx.ctx.Done(): 2286 if tx.done.Load() { 2287 return ErrTxDone 2288 } 2289 return tx.ctx.Err() 2290 } 2291 if !tx.done.CompareAndSwap(false, true) { 2292 return ErrTxDone 2293 } 2294 2295 // Cancel the Tx to release any active R-closemu locks. 2296 // This is safe to do because tx.done has already transitioned 2297 // from 0 to 1. Hold the W-closemu lock prior to rollback 2298 // to ensure no other connection has an active query. 2299 tx.cancel() 2300 tx.closemu.Lock() 2301 tx.closemu.Unlock() 2302 2303 var err error 2304 withLock(tx.dc, func() { 2305 err = tx.txi.Commit() 2306 }) 2307 if !errors.Is(err, driver.ErrBadConn) { 2308 tx.closePrepared() 2309 } 2310 tx.close(err) 2311 return err 2312} 2313 2314var rollbackHook func() 2315 2316// rollback aborts the transaction and optionally forces the pool to discard 2317// the connection. 2318func (tx *Tx) rollback(discardConn bool) error { 2319 if !tx.done.CompareAndSwap(false, true) { 2320 return ErrTxDone 2321 } 2322 2323 if rollbackHook != nil { 2324 rollbackHook() 2325 } 2326 2327 // Cancel the Tx to release any active R-closemu locks. 2328 // This is safe to do because tx.done has already transitioned 2329 // from 0 to 1. Hold the W-closemu lock prior to rollback 2330 // to ensure no other connection has an active query. 2331 tx.cancel() 2332 tx.closemu.Lock() 2333 tx.closemu.Unlock() 2334 2335 var err error 2336 withLock(tx.dc, func() { 2337 err = tx.txi.Rollback() 2338 }) 2339 if !errors.Is(err, driver.ErrBadConn) { 2340 tx.closePrepared() 2341 } 2342 if discardConn { 2343 err = driver.ErrBadConn 2344 } 2345 tx.close(err) 2346 return err 2347} 2348 2349// Rollback aborts the transaction. 2350func (tx *Tx) Rollback() error { 2351 return tx.rollback(false) 2352} 2353 2354// PrepareContext creates a prepared statement for use within a transaction. 2355// 2356// The returned statement operates within the transaction and will be closed 2357// when the transaction has been committed or rolled back. 2358// 2359// To use an existing prepared statement on this transaction, see [Tx.Stmt]. 2360// 2361// The provided context will be used for the preparation of the context, not 2362// for the execution of the returned statement. The returned statement 2363// will run in the transaction context. 2364func (tx *Tx) PrepareContext(ctx context.Context, query string) (*Stmt, error) { 2365 dc, release, err := tx.grabConn(ctx) 2366 if err != nil { 2367 return nil, err 2368 } 2369 2370 stmt, err := tx.db.prepareDC(ctx, dc, release, tx, query) 2371 if err != nil { 2372 return nil, err 2373 } 2374 tx.stmts.Lock() 2375 tx.stmts.v = append(tx.stmts.v, stmt) 2376 tx.stmts.Unlock() 2377 return stmt, nil 2378} 2379 2380// Prepare creates a prepared statement for use within a transaction. 2381// 2382// The returned statement operates within the transaction and will be closed 2383// when the transaction has been committed or rolled back. 2384// 2385// To use an existing prepared statement on this transaction, see [Tx.Stmt]. 2386// 2387// Prepare uses [context.Background] internally; to specify the context, use 2388// [Tx.PrepareContext]. 2389func (tx *Tx) Prepare(query string) (*Stmt, error) { 2390 return tx.PrepareContext(context.Background(), query) 2391} 2392 2393// StmtContext returns a transaction-specific prepared statement from 2394// an existing statement. 2395// 2396// Example: 2397// 2398// updateMoney, err := db.Prepare("UPDATE balance SET money=money+? WHERE id=?") 2399// ... 2400// tx, err := db.Begin() 2401// ... 2402// res, err := tx.StmtContext(ctx, updateMoney).Exec(123.45, 98293203) 2403// 2404// The provided context is used for the preparation of the statement, not for the 2405// execution of the statement. 2406// 2407// The returned statement operates within the transaction and will be closed 2408// when the transaction has been committed or rolled back. 2409func (tx *Tx) StmtContext(ctx context.Context, stmt *Stmt) *Stmt { 2410 dc, release, err := tx.grabConn(ctx) 2411 if err != nil { 2412 return &Stmt{stickyErr: err} 2413 } 2414 defer release(nil) 2415 2416 if tx.db != stmt.db { 2417 return &Stmt{stickyErr: errors.New("sql: Tx.Stmt: statement from different database used")} 2418 } 2419 var si driver.Stmt 2420 var parentStmt *Stmt 2421 stmt.mu.Lock() 2422 if stmt.closed || stmt.cg != nil { 2423 // If the statement has been closed or already belongs to a 2424 // transaction, we can't reuse it in this connection. 2425 // Since tx.StmtContext should never need to be called with a 2426 // Stmt already belonging to tx, we ignore this edge case and 2427 // re-prepare the statement in this case. No need to add 2428 // code-complexity for this. 2429 stmt.mu.Unlock() 2430 withLock(dc, func() { 2431 si, err = ctxDriverPrepare(ctx, dc.ci, stmt.query) 2432 }) 2433 if err != nil { 2434 return &Stmt{stickyErr: err} 2435 } 2436 } else { 2437 stmt.removeClosedStmtLocked() 2438 // See if the statement has already been prepared on this connection, 2439 // and reuse it if possible. 2440 for _, v := range stmt.css { 2441 if v.dc == dc { 2442 si = v.ds.si 2443 break 2444 } 2445 } 2446 2447 stmt.mu.Unlock() 2448 2449 if si == nil { 2450 var ds *driverStmt 2451 withLock(dc, func() { 2452 ds, err = stmt.prepareOnConnLocked(ctx, dc) 2453 }) 2454 if err != nil { 2455 return &Stmt{stickyErr: err} 2456 } 2457 si = ds.si 2458 } 2459 parentStmt = stmt 2460 } 2461 2462 txs := &Stmt{ 2463 db: tx.db, 2464 cg: tx, 2465 cgds: &driverStmt{ 2466 Locker: dc, 2467 si: si, 2468 }, 2469 parentStmt: parentStmt, 2470 query: stmt.query, 2471 } 2472 if parentStmt != nil { 2473 tx.db.addDep(parentStmt, txs) 2474 } 2475 tx.stmts.Lock() 2476 tx.stmts.v = append(tx.stmts.v, txs) 2477 tx.stmts.Unlock() 2478 return txs 2479} 2480 2481// Stmt returns a transaction-specific prepared statement from 2482// an existing statement. 2483// 2484// Example: 2485// 2486// updateMoney, err := db.Prepare("UPDATE balance SET money=money+? WHERE id=?") 2487// ... 2488// tx, err := db.Begin() 2489// ... 2490// res, err := tx.Stmt(updateMoney).Exec(123.45, 98293203) 2491// 2492// The returned statement operates within the transaction and will be closed 2493// when the transaction has been committed or rolled back. 2494// 2495// Stmt uses [context.Background] internally; to specify the context, use 2496// [Tx.StmtContext]. 2497func (tx *Tx) Stmt(stmt *Stmt) *Stmt { 2498 return tx.StmtContext(context.Background(), stmt) 2499} 2500 2501// ExecContext executes a query that doesn't return rows. 2502// For example: an INSERT and UPDATE. 2503func (tx *Tx) ExecContext(ctx context.Context, query string, args ...any) (Result, error) { 2504 dc, release, err := tx.grabConn(ctx) 2505 if err != nil { 2506 return nil, err 2507 } 2508 return tx.db.execDC(ctx, dc, release, query, args) 2509} 2510 2511// Exec executes a query that doesn't return rows. 2512// For example: an INSERT and UPDATE. 2513// 2514// Exec uses [context.Background] internally; to specify the context, use 2515// [Tx.ExecContext]. 2516func (tx *Tx) Exec(query string, args ...any) (Result, error) { 2517 return tx.ExecContext(context.Background(), query, args...) 2518} 2519 2520// QueryContext executes a query that returns rows, typically a SELECT. 2521func (tx *Tx) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error) { 2522 dc, release, err := tx.grabConn(ctx) 2523 if err != nil { 2524 return nil, err 2525 } 2526 2527 return tx.db.queryDC(ctx, tx.ctx, dc, release, query, args) 2528} 2529 2530// Query executes a query that returns rows, typically a SELECT. 2531// 2532// Query uses [context.Background] internally; to specify the context, use 2533// [Tx.QueryContext]. 2534func (tx *Tx) Query(query string, args ...any) (*Rows, error) { 2535 return tx.QueryContext(context.Background(), query, args...) 2536} 2537 2538// QueryRowContext executes a query that is expected to return at most one row. 2539// QueryRowContext always returns a non-nil value. Errors are deferred until 2540// [Row]'s Scan method is called. 2541// If the query selects no rows, the [*Row.Scan] will return [ErrNoRows]. 2542// Otherwise, the [*Row.Scan] scans the first selected row and discards 2543// the rest. 2544func (tx *Tx) QueryRowContext(ctx context.Context, query string, args ...any) *Row { 2545 rows, err := tx.QueryContext(ctx, query, args...) 2546 return &Row{rows: rows, err: err} 2547} 2548 2549// QueryRow executes a query that is expected to return at most one row. 2550// QueryRow always returns a non-nil value. Errors are deferred until 2551// [Row]'s Scan method is called. 2552// If the query selects no rows, the [*Row.Scan] will return [ErrNoRows]. 2553// Otherwise, the [*Row.Scan] scans the first selected row and discards 2554// the rest. 2555// 2556// QueryRow uses [context.Background] internally; to specify the context, use 2557// [Tx.QueryRowContext]. 2558func (tx *Tx) QueryRow(query string, args ...any) *Row { 2559 return tx.QueryRowContext(context.Background(), query, args...) 2560} 2561 2562// connStmt is a prepared statement on a particular connection. 2563type connStmt struct { 2564 dc *driverConn 2565 ds *driverStmt 2566} 2567 2568// stmtConnGrabber represents a Tx or Conn that will return the underlying 2569// driverConn and release function. 2570type stmtConnGrabber interface { 2571 // grabConn returns the driverConn and the associated release function 2572 // that must be called when the operation completes. 2573 grabConn(context.Context) (*driverConn, releaseConn, error) 2574 2575 // txCtx returns the transaction context if available. 2576 // The returned context should be selected on along with 2577 // any query context when awaiting a cancel. 2578 txCtx() context.Context 2579} 2580 2581var ( 2582 _ stmtConnGrabber = &Tx{} 2583 _ stmtConnGrabber = &Conn{} 2584) 2585 2586// Stmt is a prepared statement. 2587// A Stmt is safe for concurrent use by multiple goroutines. 2588// 2589// If a Stmt is prepared on a [Tx] or [Conn], it will be bound to a single 2590// underlying connection forever. If the [Tx] or [Conn] closes, the Stmt will 2591// become unusable and all operations will return an error. 2592// If a Stmt is prepared on a [DB], it will remain usable for the lifetime of the 2593// [DB]. When the Stmt needs to execute on a new underlying connection, it will 2594// prepare itself on the new connection automatically. 2595type Stmt struct { 2596 // Immutable: 2597 db *DB // where we came from 2598 query string // that created the Stmt 2599 stickyErr error // if non-nil, this error is returned for all operations 2600 2601 closemu sync.RWMutex // held exclusively during close, for read otherwise. 2602 2603 // If Stmt is prepared on a Tx or Conn then cg is present and will 2604 // only ever grab a connection from cg. 2605 // If cg is nil then the Stmt must grab an arbitrary connection 2606 // from db and determine if it must prepare the stmt again by 2607 // inspecting css. 2608 cg stmtConnGrabber 2609 cgds *driverStmt 2610 2611 // parentStmt is set when a transaction-specific statement 2612 // is requested from an identical statement prepared on the same 2613 // conn. parentStmt is used to track the dependency of this statement 2614 // on its originating ("parent") statement so that parentStmt may 2615 // be closed by the user without them having to know whether or not 2616 // any transactions are still using it. 2617 parentStmt *Stmt 2618 2619 mu sync.Mutex // protects the rest of the fields 2620 closed bool 2621 2622 // css is a list of underlying driver statement interfaces 2623 // that are valid on particular connections. This is only 2624 // used if cg == nil and one is found that has idle 2625 // connections. If cg != nil, cgds is always used. 2626 css []connStmt 2627 2628 // lastNumClosed is copied from db.numClosed when Stmt is created 2629 // without tx and closed connections in css are removed. 2630 lastNumClosed uint64 2631} 2632 2633// ExecContext executes a prepared statement with the given arguments and 2634// returns a [Result] summarizing the effect of the statement. 2635func (s *Stmt) ExecContext(ctx context.Context, args ...any) (Result, error) { 2636 s.closemu.RLock() 2637 defer s.closemu.RUnlock() 2638 2639 var res Result 2640 err := s.db.retry(func(strategy connReuseStrategy) error { 2641 dc, releaseConn, ds, err := s.connStmt(ctx, strategy) 2642 if err != nil { 2643 return err 2644 } 2645 2646 res, err = resultFromStatement(ctx, dc.ci, ds, args...) 2647 releaseConn(err) 2648 return err 2649 }) 2650 2651 return res, err 2652} 2653 2654// Exec executes a prepared statement with the given arguments and 2655// returns a [Result] summarizing the effect of the statement. 2656// 2657// Exec uses [context.Background] internally; to specify the context, use 2658// [Stmt.ExecContext]. 2659func (s *Stmt) Exec(args ...any) (Result, error) { 2660 return s.ExecContext(context.Background(), args...) 2661} 2662 2663func resultFromStatement(ctx context.Context, ci driver.Conn, ds *driverStmt, args ...any) (Result, error) { 2664 ds.Lock() 2665 defer ds.Unlock() 2666 2667 dargs, err := driverArgsConnLocked(ci, ds, args) 2668 if err != nil { 2669 return nil, err 2670 } 2671 2672 resi, err := ctxDriverStmtExec(ctx, ds.si, dargs) 2673 if err != nil { 2674 return nil, err 2675 } 2676 return driverResult{ds.Locker, resi}, nil 2677} 2678 2679// removeClosedStmtLocked removes closed conns in s.css. 2680// 2681// To avoid lock contention on DB.mu, we do it only when 2682// s.db.numClosed - s.lastNum is large enough. 2683func (s *Stmt) removeClosedStmtLocked() { 2684 t := len(s.css)/2 + 1 2685 if t > 10 { 2686 t = 10 2687 } 2688 dbClosed := s.db.numClosed.Load() 2689 if dbClosed-s.lastNumClosed < uint64(t) { 2690 return 2691 } 2692 2693 s.db.mu.Lock() 2694 for i := 0; i < len(s.css); i++ { 2695 if s.css[i].dc.dbmuClosed { 2696 s.css[i] = s.css[len(s.css)-1] 2697 // Zero out the last element (for GC) before shrinking the slice. 2698 s.css[len(s.css)-1] = connStmt{} 2699 s.css = s.css[:len(s.css)-1] 2700 i-- 2701 } 2702 } 2703 s.db.mu.Unlock() 2704 s.lastNumClosed = dbClosed 2705} 2706 2707// connStmt returns a free driver connection on which to execute the 2708// statement, a function to call to release the connection, and a 2709// statement bound to that connection. 2710func (s *Stmt) connStmt(ctx context.Context, strategy connReuseStrategy) (dc *driverConn, releaseConn func(error), ds *driverStmt, err error) { 2711 if err = s.stickyErr; err != nil { 2712 return 2713 } 2714 s.mu.Lock() 2715 if s.closed { 2716 s.mu.Unlock() 2717 err = errors.New("sql: statement is closed") 2718 return 2719 } 2720 2721 // In a transaction or connection, we always use the connection that the 2722 // stmt was created on. 2723 if s.cg != nil { 2724 s.mu.Unlock() 2725 dc, releaseConn, err = s.cg.grabConn(ctx) // blocks, waiting for the connection. 2726 if err != nil { 2727 return 2728 } 2729 return dc, releaseConn, s.cgds, nil 2730 } 2731 2732 s.removeClosedStmtLocked() 2733 s.mu.Unlock() 2734 2735 dc, err = s.db.conn(ctx, strategy) 2736 if err != nil { 2737 return nil, nil, nil, err 2738 } 2739 2740 s.mu.Lock() 2741 for _, v := range s.css { 2742 if v.dc == dc { 2743 s.mu.Unlock() 2744 return dc, dc.releaseConn, v.ds, nil 2745 } 2746 } 2747 s.mu.Unlock() 2748 2749 // No luck; we need to prepare the statement on this connection 2750 withLock(dc, func() { 2751 ds, err = s.prepareOnConnLocked(ctx, dc) 2752 }) 2753 if err != nil { 2754 dc.releaseConn(err) 2755 return nil, nil, nil, err 2756 } 2757 2758 return dc, dc.releaseConn, ds, nil 2759} 2760 2761// prepareOnConnLocked prepares the query in Stmt s on dc and adds it to the list of 2762// open connStmt on the statement. It assumes the caller is holding the lock on dc. 2763func (s *Stmt) prepareOnConnLocked(ctx context.Context, dc *driverConn) (*driverStmt, error) { 2764 si, err := dc.prepareLocked(ctx, s.cg, s.query) 2765 if err != nil { 2766 return nil, err 2767 } 2768 cs := connStmt{dc, si} 2769 s.mu.Lock() 2770 s.css = append(s.css, cs) 2771 s.mu.Unlock() 2772 return cs.ds, nil 2773} 2774 2775// QueryContext executes a prepared query statement with the given arguments 2776// and returns the query results as a [*Rows]. 2777func (s *Stmt) QueryContext(ctx context.Context, args ...any) (*Rows, error) { 2778 s.closemu.RLock() 2779 defer s.closemu.RUnlock() 2780 2781 var rowsi driver.Rows 2782 var rows *Rows 2783 2784 err := s.db.retry(func(strategy connReuseStrategy) error { 2785 dc, releaseConn, ds, err := s.connStmt(ctx, strategy) 2786 if err != nil { 2787 return err 2788 } 2789 2790 rowsi, err = rowsiFromStatement(ctx, dc.ci, ds, args...) 2791 if err == nil { 2792 // Note: ownership of ci passes to the *Rows, to be freed 2793 // with releaseConn. 2794 rows = &Rows{ 2795 dc: dc, 2796 rowsi: rowsi, 2797 // releaseConn set below 2798 } 2799 // addDep must be added before initContextClose or it could attempt 2800 // to removeDep before it has been added. 2801 s.db.addDep(s, rows) 2802 2803 // releaseConn must be set before initContextClose or it could 2804 // release the connection before it is set. 2805 rows.releaseConn = func(err error) { 2806 releaseConn(err) 2807 s.db.removeDep(s, rows) 2808 } 2809 var txctx context.Context 2810 if s.cg != nil { 2811 txctx = s.cg.txCtx() 2812 } 2813 rows.initContextClose(ctx, txctx) 2814 return nil 2815 } 2816 2817 releaseConn(err) 2818 return err 2819 }) 2820 2821 return rows, err 2822} 2823 2824// Query executes a prepared query statement with the given arguments 2825// and returns the query results as a *Rows. 2826// 2827// Query uses [context.Background] internally; to specify the context, use 2828// [Stmt.QueryContext]. 2829func (s *Stmt) Query(args ...any) (*Rows, error) { 2830 return s.QueryContext(context.Background(), args...) 2831} 2832 2833func rowsiFromStatement(ctx context.Context, ci driver.Conn, ds *driverStmt, args ...any) (driver.Rows, error) { 2834 ds.Lock() 2835 defer ds.Unlock() 2836 dargs, err := driverArgsConnLocked(ci, ds, args) 2837 if err != nil { 2838 return nil, err 2839 } 2840 return ctxDriverStmtQuery(ctx, ds.si, dargs) 2841} 2842 2843// QueryRowContext executes a prepared query statement with the given arguments. 2844// If an error occurs during the execution of the statement, that error will 2845// be returned by a call to Scan on the returned [*Row], which is always non-nil. 2846// If the query selects no rows, the [*Row.Scan] will return [ErrNoRows]. 2847// Otherwise, the [*Row.Scan] scans the first selected row and discards 2848// the rest. 2849func (s *Stmt) QueryRowContext(ctx context.Context, args ...any) *Row { 2850 rows, err := s.QueryContext(ctx, args...) 2851 if err != nil { 2852 return &Row{err: err} 2853 } 2854 return &Row{rows: rows} 2855} 2856 2857// QueryRow executes a prepared query statement with the given arguments. 2858// If an error occurs during the execution of the statement, that error will 2859// be returned by a call to Scan on the returned [*Row], which is always non-nil. 2860// If the query selects no rows, the [*Row.Scan] will return [ErrNoRows]. 2861// Otherwise, the [*Row.Scan] scans the first selected row and discards 2862// the rest. 2863// 2864// Example usage: 2865// 2866// var name string 2867// err := nameByUseridStmt.QueryRow(id).Scan(&name) 2868// 2869// QueryRow uses [context.Background] internally; to specify the context, use 2870// [Stmt.QueryRowContext]. 2871func (s *Stmt) QueryRow(args ...any) *Row { 2872 return s.QueryRowContext(context.Background(), args...) 2873} 2874 2875// Close closes the statement. 2876func (s *Stmt) Close() error { 2877 s.closemu.Lock() 2878 defer s.closemu.Unlock() 2879 2880 if s.stickyErr != nil { 2881 return s.stickyErr 2882 } 2883 s.mu.Lock() 2884 if s.closed { 2885 s.mu.Unlock() 2886 return nil 2887 } 2888 s.closed = true 2889 txds := s.cgds 2890 s.cgds = nil 2891 2892 s.mu.Unlock() 2893 2894 if s.cg == nil { 2895 return s.db.removeDep(s, s) 2896 } 2897 2898 if s.parentStmt != nil { 2899 // If parentStmt is set, we must not close s.txds since it's stored 2900 // in the css array of the parentStmt. 2901 return s.db.removeDep(s.parentStmt, s) 2902 } 2903 return txds.Close() 2904} 2905 2906func (s *Stmt) finalClose() error { 2907 s.mu.Lock() 2908 defer s.mu.Unlock() 2909 if s.css != nil { 2910 for _, v := range s.css { 2911 s.db.noteUnusedDriverStatement(v.dc, v.ds) 2912 v.dc.removeOpenStmt(v.ds) 2913 } 2914 s.css = nil 2915 } 2916 return nil 2917} 2918 2919// Rows is the result of a query. Its cursor starts before the first row 2920// of the result set. Use [Rows.Next] to advance from row to row. 2921type Rows struct { 2922 dc *driverConn // owned; must call releaseConn when closed to release 2923 releaseConn func(error) 2924 rowsi driver.Rows 2925 cancel func() // called when Rows is closed, may be nil. 2926 closeStmt *driverStmt // if non-nil, statement to Close on close 2927 2928 contextDone atomic.Pointer[error] // error that awaitDone saw; set before close attempt 2929 2930 // closemu prevents Rows from closing while there 2931 // is an active streaming result. It is held for read during non-close operations 2932 // and exclusively during close. 2933 // 2934 // closemu guards lasterr and closed. 2935 closemu sync.RWMutex 2936 lasterr error // non-nil only if closed is true 2937 closed bool 2938 2939 // closemuScanHold is whether the previous call to Scan kept closemu RLock'ed 2940 // without unlocking it. It does that when the user passes a *RawBytes scan 2941 // target. In that case, we need to prevent awaitDone from closing the Rows 2942 // while the user's still using the memory. See go.dev/issue/60304. 2943 // 2944 // It is only used by Scan, Next, and NextResultSet which are expected 2945 // not to be called concurrently. 2946 closemuScanHold bool 2947 2948 // hitEOF is whether Next hit the end of the rows without 2949 // encountering an error. It's set in Next before 2950 // returning. It's only used by Next and Err which are 2951 // expected not to be called concurrently. 2952 hitEOF bool 2953 2954 // lastcols is only used in Scan, Next, and NextResultSet which are expected 2955 // not to be called concurrently. 2956 lastcols []driver.Value 2957 2958 // raw is a buffer for RawBytes that persists between Scan calls. 2959 // This is used when the driver returns a mismatched type that requires 2960 // a cloning allocation. For example, if the driver returns a *string and 2961 // the user is scanning into a *RawBytes, we need to copy the string. 2962 // The raw buffer here lets us reuse the memory for that copy across Scan calls. 2963 raw []byte 2964} 2965 2966// lasterrOrErrLocked returns either lasterr or the provided err. 2967// rs.closemu must be read-locked. 2968func (rs *Rows) lasterrOrErrLocked(err error) error { 2969 if rs.lasterr != nil && rs.lasterr != io.EOF { 2970 return rs.lasterr 2971 } 2972 return err 2973} 2974 2975// bypassRowsAwaitDone is only used for testing. 2976// If true, it will not close the Rows automatically from the context. 2977var bypassRowsAwaitDone = false 2978 2979func (rs *Rows) initContextClose(ctx, txctx context.Context) { 2980 if ctx.Done() == nil && (txctx == nil || txctx.Done() == nil) { 2981 return 2982 } 2983 if bypassRowsAwaitDone { 2984 return 2985 } 2986 closectx, cancel := context.WithCancel(ctx) 2987 rs.cancel = cancel 2988 go rs.awaitDone(ctx, txctx, closectx) 2989} 2990 2991// awaitDone blocks until ctx, txctx, or closectx is canceled. 2992// The ctx is provided from the query context. 2993// If the query was issued in a transaction, the transaction's context 2994// is also provided in txctx, to ensure Rows is closed if the Tx is closed. 2995// The closectx is closed by an explicit call to rs.Close. 2996func (rs *Rows) awaitDone(ctx, txctx, closectx context.Context) { 2997 var txctxDone <-chan struct{} 2998 if txctx != nil { 2999 txctxDone = txctx.Done() 3000 } 3001 select { 3002 case <-ctx.Done(): 3003 err := ctx.Err() 3004 rs.contextDone.Store(&err) 3005 case <-txctxDone: 3006 err := txctx.Err() 3007 rs.contextDone.Store(&err) 3008 case <-closectx.Done(): 3009 // rs.cancel was called via Close(); don't store this into contextDone 3010 // to ensure Err() is unaffected. 3011 } 3012 rs.close(ctx.Err()) 3013} 3014 3015// Next prepares the next result row for reading with the [Rows.Scan] method. It 3016// returns true on success, or false if there is no next result row or an error 3017// happened while preparing it. [Rows.Err] should be consulted to distinguish between 3018// the two cases. 3019// 3020// Every call to [Rows.Scan], even the first one, must be preceded by a call to [Rows.Next]. 3021func (rs *Rows) Next() bool { 3022 // If the user's calling Next, they're done with their previous row's Scan 3023 // results (any RawBytes memory), so we can release the read lock that would 3024 // be preventing awaitDone from calling close. 3025 rs.closemuRUnlockIfHeldByScan() 3026 3027 if rs.contextDone.Load() != nil { 3028 return false 3029 } 3030 3031 var doClose, ok bool 3032 withLock(rs.closemu.RLocker(), func() { 3033 doClose, ok = rs.nextLocked() 3034 }) 3035 if doClose { 3036 rs.Close() 3037 } 3038 if doClose && !ok { 3039 rs.hitEOF = true 3040 } 3041 return ok 3042} 3043 3044func (rs *Rows) nextLocked() (doClose, ok bool) { 3045 if rs.closed { 3046 return false, false 3047 } 3048 3049 // Lock the driver connection before calling the driver interface 3050 // rowsi to prevent a Tx from rolling back the connection at the same time. 3051 rs.dc.Lock() 3052 defer rs.dc.Unlock() 3053 3054 if rs.lastcols == nil { 3055 rs.lastcols = make([]driver.Value, len(rs.rowsi.Columns())) 3056 } 3057 3058 rs.lasterr = rs.rowsi.Next(rs.lastcols) 3059 if rs.lasterr != nil { 3060 // Close the connection if there is a driver error. 3061 if rs.lasterr != io.EOF { 3062 return true, false 3063 } 3064 nextResultSet, ok := rs.rowsi.(driver.RowsNextResultSet) 3065 if !ok { 3066 return true, false 3067 } 3068 // The driver is at the end of the current result set. 3069 // Test to see if there is another result set after the current one. 3070 // Only close Rows if there is no further result sets to read. 3071 if !nextResultSet.HasNextResultSet() { 3072 doClose = true 3073 } 3074 return doClose, false 3075 } 3076 return false, true 3077} 3078 3079// NextResultSet prepares the next result set for reading. It reports whether 3080// there is further result sets, or false if there is no further result set 3081// or if there is an error advancing to it. The [Rows.Err] method should be consulted 3082// to distinguish between the two cases. 3083// 3084// After calling NextResultSet, the [Rows.Next] method should always be called before 3085// scanning. If there are further result sets they may not have rows in the result 3086// set. 3087func (rs *Rows) NextResultSet() bool { 3088 // If the user's calling NextResultSet, they're done with their previous 3089 // row's Scan results (any RawBytes memory), so we can release the read lock 3090 // that would be preventing awaitDone from calling close. 3091 rs.closemuRUnlockIfHeldByScan() 3092 3093 var doClose bool 3094 defer func() { 3095 if doClose { 3096 rs.Close() 3097 } 3098 }() 3099 rs.closemu.RLock() 3100 defer rs.closemu.RUnlock() 3101 3102 if rs.closed { 3103 return false 3104 } 3105 3106 rs.lastcols = nil 3107 nextResultSet, ok := rs.rowsi.(driver.RowsNextResultSet) 3108 if !ok { 3109 doClose = true 3110 return false 3111 } 3112 3113 // Lock the driver connection before calling the driver interface 3114 // rowsi to prevent a Tx from rolling back the connection at the same time. 3115 rs.dc.Lock() 3116 defer rs.dc.Unlock() 3117 3118 rs.lasterr = nextResultSet.NextResultSet() 3119 if rs.lasterr != nil { 3120 doClose = true 3121 return false 3122 } 3123 return true 3124} 3125 3126// Err returns the error, if any, that was encountered during iteration. 3127// Err may be called after an explicit or implicit [Rows.Close]. 3128func (rs *Rows) Err() error { 3129 // Return any context error that might've happened during row iteration, 3130 // but only if we haven't reported the final Next() = false after rows 3131 // are done, in which case the user might've canceled their own context 3132 // before calling Rows.Err. 3133 if !rs.hitEOF { 3134 if errp := rs.contextDone.Load(); errp != nil { 3135 return *errp 3136 } 3137 } 3138 3139 rs.closemu.RLock() 3140 defer rs.closemu.RUnlock() 3141 return rs.lasterrOrErrLocked(nil) 3142} 3143 3144// rawbuf returns the buffer to append RawBytes values to. 3145// This buffer is reused across calls to Rows.Scan. 3146// 3147// Usage: 3148// 3149// rawBytes = rows.setrawbuf(append(rows.rawbuf(), value...)) 3150func (rs *Rows) rawbuf() []byte { 3151 if rs == nil { 3152 // convertAssignRows can take a nil *Rows; for simplicity handle it here 3153 return nil 3154 } 3155 return rs.raw 3156} 3157 3158// setrawbuf updates the RawBytes buffer with the result of appending a new value to it. 3159// It returns the new value. 3160func (rs *Rows) setrawbuf(b []byte) RawBytes { 3161 if rs == nil { 3162 // convertAssignRows can take a nil *Rows; for simplicity handle it here 3163 return RawBytes(b) 3164 } 3165 off := len(rs.raw) 3166 rs.raw = b 3167 return RawBytes(rs.raw[off:]) 3168} 3169 3170var errRowsClosed = errors.New("sql: Rows are closed") 3171var errNoRows = errors.New("sql: no Rows available") 3172 3173// Columns returns the column names. 3174// Columns returns an error if the rows are closed. 3175func (rs *Rows) Columns() ([]string, error) { 3176 rs.closemu.RLock() 3177 defer rs.closemu.RUnlock() 3178 if rs.closed { 3179 return nil, rs.lasterrOrErrLocked(errRowsClosed) 3180 } 3181 if rs.rowsi == nil { 3182 return nil, rs.lasterrOrErrLocked(errNoRows) 3183 } 3184 rs.dc.Lock() 3185 defer rs.dc.Unlock() 3186 3187 return rs.rowsi.Columns(), nil 3188} 3189 3190// ColumnTypes returns column information such as column type, length, 3191// and nullable. Some information may not be available from some drivers. 3192func (rs *Rows) ColumnTypes() ([]*ColumnType, error) { 3193 rs.closemu.RLock() 3194 defer rs.closemu.RUnlock() 3195 if rs.closed { 3196 return nil, rs.lasterrOrErrLocked(errRowsClosed) 3197 } 3198 if rs.rowsi == nil { 3199 return nil, rs.lasterrOrErrLocked(errNoRows) 3200 } 3201 rs.dc.Lock() 3202 defer rs.dc.Unlock() 3203 3204 return rowsColumnInfoSetupConnLocked(rs.rowsi), nil 3205} 3206 3207// ColumnType contains the name and type of a column. 3208type ColumnType struct { 3209 name string 3210 3211 hasNullable bool 3212 hasLength bool 3213 hasPrecisionScale bool 3214 3215 nullable bool 3216 length int64 3217 databaseType string 3218 precision int64 3219 scale int64 3220 scanType reflect.Type 3221} 3222 3223// Name returns the name or alias of the column. 3224func (ci *ColumnType) Name() string { 3225 return ci.name 3226} 3227 3228// Length returns the column type length for variable length column types such 3229// as text and binary field types. If the type length is unbounded the value will 3230// be [math.MaxInt64] (any database limits will still apply). 3231// If the column type is not variable length, such as an int, or if not supported 3232// by the driver ok is false. 3233func (ci *ColumnType) Length() (length int64, ok bool) { 3234 return ci.length, ci.hasLength 3235} 3236 3237// DecimalSize returns the scale and precision of a decimal type. 3238// If not applicable or if not supported ok is false. 3239func (ci *ColumnType) DecimalSize() (precision, scale int64, ok bool) { 3240 return ci.precision, ci.scale, ci.hasPrecisionScale 3241} 3242 3243// ScanType returns a Go type suitable for scanning into using [Rows.Scan]. 3244// If a driver does not support this property ScanType will return 3245// the type of an empty interface. 3246func (ci *ColumnType) ScanType() reflect.Type { 3247 return ci.scanType 3248} 3249 3250// Nullable reports whether the column may be null. 3251// If a driver does not support this property ok will be false. 3252func (ci *ColumnType) Nullable() (nullable, ok bool) { 3253 return ci.nullable, ci.hasNullable 3254} 3255 3256// DatabaseTypeName returns the database system name of the column type. If an empty 3257// string is returned, then the driver type name is not supported. 3258// Consult your driver documentation for a list of driver data types. [ColumnType.Length] specifiers 3259// are not included. 3260// Common type names include "VARCHAR", "TEXT", "NVARCHAR", "DECIMAL", "BOOL", 3261// "INT", and "BIGINT". 3262func (ci *ColumnType) DatabaseTypeName() string { 3263 return ci.databaseType 3264} 3265 3266func rowsColumnInfoSetupConnLocked(rowsi driver.Rows) []*ColumnType { 3267 names := rowsi.Columns() 3268 3269 list := make([]*ColumnType, len(names)) 3270 for i := range list { 3271 ci := &ColumnType{ 3272 name: names[i], 3273 } 3274 list[i] = ci 3275 3276 if prop, ok := rowsi.(driver.RowsColumnTypeScanType); ok { 3277 ci.scanType = prop.ColumnTypeScanType(i) 3278 } else { 3279 ci.scanType = reflect.TypeFor[any]() 3280 } 3281 if prop, ok := rowsi.(driver.RowsColumnTypeDatabaseTypeName); ok { 3282 ci.databaseType = prop.ColumnTypeDatabaseTypeName(i) 3283 } 3284 if prop, ok := rowsi.(driver.RowsColumnTypeLength); ok { 3285 ci.length, ci.hasLength = prop.ColumnTypeLength(i) 3286 } 3287 if prop, ok := rowsi.(driver.RowsColumnTypeNullable); ok { 3288 ci.nullable, ci.hasNullable = prop.ColumnTypeNullable(i) 3289 } 3290 if prop, ok := rowsi.(driver.RowsColumnTypePrecisionScale); ok { 3291 ci.precision, ci.scale, ci.hasPrecisionScale = prop.ColumnTypePrecisionScale(i) 3292 } 3293 } 3294 return list 3295} 3296 3297// Scan copies the columns in the current row into the values pointed 3298// at by dest. The number of values in dest must be the same as the 3299// number of columns in [Rows]. 3300// 3301// Scan converts columns read from the database into the following 3302// common Go types and special types provided by the sql package: 3303// 3304// *string 3305// *[]byte 3306// *int, *int8, *int16, *int32, *int64 3307// *uint, *uint8, *uint16, *uint32, *uint64 3308// *bool 3309// *float32, *float64 3310// *interface{} 3311// *RawBytes 3312// *Rows (cursor value) 3313// any type implementing Scanner (see Scanner docs) 3314// 3315// In the most simple case, if the type of the value from the source 3316// column is an integer, bool or string type T and dest is of type *T, 3317// Scan simply assigns the value through the pointer. 3318// 3319// Scan also converts between string and numeric types, as long as no 3320// information would be lost. While Scan stringifies all numbers 3321// scanned from numeric database columns into *string, scans into 3322// numeric types are checked for overflow. For example, a float64 with 3323// value 300 or a string with value "300" can scan into a uint16, but 3324// not into a uint8, though float64(255) or "255" can scan into a 3325// uint8. One exception is that scans of some float64 numbers to 3326// strings may lose information when stringifying. In general, scan 3327// floating point columns into *float64. 3328// 3329// If a dest argument has type *[]byte, Scan saves in that argument a 3330// copy of the corresponding data. The copy is owned by the caller and 3331// can be modified and held indefinitely. The copy can be avoided by 3332// using an argument of type [*RawBytes] instead; see the documentation 3333// for [RawBytes] for restrictions on its use. 3334// 3335// If an argument has type *interface{}, Scan copies the value 3336// provided by the underlying driver without conversion. When scanning 3337// from a source value of type []byte to *interface{}, a copy of the 3338// slice is made and the caller owns the result. 3339// 3340// Source values of type [time.Time] may be scanned into values of type 3341// *time.Time, *interface{}, *string, or *[]byte. When converting to 3342// the latter two, [time.RFC3339Nano] is used. 3343// 3344// Source values of type bool may be scanned into types *bool, 3345// *interface{}, *string, *[]byte, or [*RawBytes]. 3346// 3347// For scanning into *bool, the source may be true, false, 1, 0, or 3348// string inputs parseable by [strconv.ParseBool]. 3349// 3350// Scan can also convert a cursor returned from a query, such as 3351// "select cursor(select * from my_table) from dual", into a 3352// [*Rows] value that can itself be scanned from. The parent 3353// select query will close any cursor [*Rows] if the parent [*Rows] is closed. 3354// 3355// If any of the first arguments implementing [Scanner] returns an error, 3356// that error will be wrapped in the returned error. 3357func (rs *Rows) Scan(dest ...any) error { 3358 if rs.closemuScanHold { 3359 // This should only be possible if the user calls Scan twice in a row 3360 // without calling Next. 3361 return fmt.Errorf("sql: Scan called without calling Next (closemuScanHold)") 3362 } 3363 rs.closemu.RLock() 3364 3365 if rs.lasterr != nil && rs.lasterr != io.EOF { 3366 rs.closemu.RUnlock() 3367 return rs.lasterr 3368 } 3369 if rs.closed { 3370 err := rs.lasterrOrErrLocked(errRowsClosed) 3371 rs.closemu.RUnlock() 3372 return err 3373 } 3374 3375 if scanArgsContainRawBytes(dest) { 3376 rs.closemuScanHold = true 3377 rs.raw = rs.raw[:0] 3378 } else { 3379 rs.closemu.RUnlock() 3380 } 3381 3382 if rs.lastcols == nil { 3383 rs.closemuRUnlockIfHeldByScan() 3384 return errors.New("sql: Scan called without calling Next") 3385 } 3386 if len(dest) != len(rs.lastcols) { 3387 rs.closemuRUnlockIfHeldByScan() 3388 return fmt.Errorf("sql: expected %d destination arguments in Scan, not %d", len(rs.lastcols), len(dest)) 3389 } 3390 3391 for i, sv := range rs.lastcols { 3392 err := convertAssignRows(dest[i], sv, rs) 3393 if err != nil { 3394 rs.closemuRUnlockIfHeldByScan() 3395 return fmt.Errorf(`sql: Scan error on column index %d, name %q: %w`, i, rs.rowsi.Columns()[i], err) 3396 } 3397 } 3398 return nil 3399} 3400 3401// closemuRUnlockIfHeldByScan releases any closemu.RLock held open by a previous 3402// call to Scan with *RawBytes. 3403func (rs *Rows) closemuRUnlockIfHeldByScan() { 3404 if rs.closemuScanHold { 3405 rs.closemuScanHold = false 3406 rs.closemu.RUnlock() 3407 } 3408} 3409 3410func scanArgsContainRawBytes(args []any) bool { 3411 for _, a := range args { 3412 if _, ok := a.(*RawBytes); ok { 3413 return true 3414 } 3415 } 3416 return false 3417} 3418 3419// rowsCloseHook returns a function so tests may install the 3420// hook through a test only mutex. 3421var rowsCloseHook = func() func(*Rows, *error) { return nil } 3422 3423// Close closes the [Rows], preventing further enumeration. If [Rows.Next] is called 3424// and returns false and there are no further result sets, 3425// the [Rows] are closed automatically and it will suffice to check the 3426// result of [Rows.Err]. Close is idempotent and does not affect the result of [Rows.Err]. 3427func (rs *Rows) Close() error { 3428 // If the user's calling Close, they're done with their previous row's Scan 3429 // results (any RawBytes memory), so we can release the read lock that would 3430 // be preventing awaitDone from calling the unexported close before we do so. 3431 rs.closemuRUnlockIfHeldByScan() 3432 3433 return rs.close(nil) 3434} 3435 3436func (rs *Rows) close(err error) error { 3437 rs.closemu.Lock() 3438 defer rs.closemu.Unlock() 3439 3440 if rs.closed { 3441 return nil 3442 } 3443 rs.closed = true 3444 3445 if rs.lasterr == nil { 3446 rs.lasterr = err 3447 } 3448 3449 withLock(rs.dc, func() { 3450 err = rs.rowsi.Close() 3451 }) 3452 if fn := rowsCloseHook(); fn != nil { 3453 fn(rs, &err) 3454 } 3455 if rs.cancel != nil { 3456 rs.cancel() 3457 } 3458 3459 if rs.closeStmt != nil { 3460 rs.closeStmt.Close() 3461 } 3462 rs.releaseConn(err) 3463 3464 rs.lasterr = rs.lasterrOrErrLocked(err) 3465 return err 3466} 3467 3468// Row is the result of calling [DB.QueryRow] to select a single row. 3469type Row struct { 3470 // One of these two will be non-nil: 3471 err error // deferred error for easy chaining 3472 rows *Rows 3473} 3474 3475// Scan copies the columns from the matched row into the values 3476// pointed at by dest. See the documentation on [Rows.Scan] for details. 3477// If more than one row matches the query, 3478// Scan uses the first row and discards the rest. If no row matches 3479// the query, Scan returns [ErrNoRows]. 3480func (r *Row) Scan(dest ...any) error { 3481 if r.err != nil { 3482 return r.err 3483 } 3484 3485 // TODO(bradfitz): for now we need to defensively clone all 3486 // []byte that the driver returned (not permitting 3487 // *RawBytes in Rows.Scan), since we're about to close 3488 // the Rows in our defer, when we return from this function. 3489 // the contract with the driver.Next(...) interface is that it 3490 // can return slices into read-only temporary memory that's 3491 // only valid until the next Scan/Close. But the TODO is that 3492 // for a lot of drivers, this copy will be unnecessary. We 3493 // should provide an optional interface for drivers to 3494 // implement to say, "don't worry, the []bytes that I return 3495 // from Next will not be modified again." (for instance, if 3496 // they were obtained from the network anyway) But for now we 3497 // don't care. 3498 defer r.rows.Close() 3499 if scanArgsContainRawBytes(dest) { 3500 return errors.New("sql: RawBytes isn't allowed on Row.Scan") 3501 } 3502 3503 if !r.rows.Next() { 3504 if err := r.rows.Err(); err != nil { 3505 return err 3506 } 3507 return ErrNoRows 3508 } 3509 err := r.rows.Scan(dest...) 3510 if err != nil { 3511 return err 3512 } 3513 // Make sure the query can be processed to completion with no errors. 3514 return r.rows.Close() 3515} 3516 3517// Err provides a way for wrapping packages to check for 3518// query errors without calling [Row.Scan]. 3519// Err returns the error, if any, that was encountered while running the query. 3520// If this error is not nil, this error will also be returned from [Row.Scan]. 3521func (r *Row) Err() error { 3522 return r.err 3523} 3524 3525// A Result summarizes an executed SQL command. 3526type Result interface { 3527 // LastInsertId returns the integer generated by the database 3528 // in response to a command. Typically this will be from an 3529 // "auto increment" column when inserting a new row. Not all 3530 // databases support this feature, and the syntax of such 3531 // statements varies. 3532 LastInsertId() (int64, error) 3533 3534 // RowsAffected returns the number of rows affected by an 3535 // update, insert, or delete. Not every database or database 3536 // driver may support this. 3537 RowsAffected() (int64, error) 3538} 3539 3540type driverResult struct { 3541 sync.Locker // the *driverConn 3542 resi driver.Result 3543} 3544 3545func (dr driverResult) LastInsertId() (int64, error) { 3546 dr.Lock() 3547 defer dr.Unlock() 3548 return dr.resi.LastInsertId() 3549} 3550 3551func (dr driverResult) RowsAffected() (int64, error) { 3552 dr.Lock() 3553 defer dr.Unlock() 3554 return dr.resi.RowsAffected() 3555} 3556 3557func stack() string { 3558 var buf [2 << 10]byte 3559 return string(buf[:runtime.Stack(buf[:], false)]) 3560} 3561 3562// withLock runs while holding lk. 3563func withLock(lk sync.Locker, fn func()) { 3564 lk.Lock() 3565 defer lk.Unlock() // in case fn panics 3566 fn() 3567} 3568 3569// connRequestSet is a set of chan connRequest that's 3570// optimized for: 3571// 3572// - adding an element 3573// - removing an element (only by the caller who added it) 3574// - taking (get + delete) a random element 3575// 3576// We previously used a map for this but the take of a random element 3577// was expensive, making mapiters. This type avoids a map entirely 3578// and just uses a slice. 3579type connRequestSet struct { 3580 // s are the elements in the set. 3581 s []connRequestAndIndex 3582} 3583 3584type connRequestAndIndex struct { 3585 // req is the element in the set. 3586 req chan connRequest 3587 3588 // curIdx points to the current location of this element in 3589 // connRequestSet.s. It gets set to -1 upon removal. 3590 curIdx *int 3591} 3592 3593// CloseAndRemoveAll closes all channels in the set 3594// and clears the set. 3595func (s *connRequestSet) CloseAndRemoveAll() { 3596 for _, v := range s.s { 3597 *v.curIdx = -1 3598 close(v.req) 3599 } 3600 s.s = nil 3601} 3602 3603// Len returns the length of the set. 3604func (s *connRequestSet) Len() int { return len(s.s) } 3605 3606// connRequestDelHandle is an opaque handle to delete an 3607// item from calling Add. 3608type connRequestDelHandle struct { 3609 idx *int // pointer to index; or -1 if not in slice 3610} 3611 3612// Add adds v to the set of waiting requests. 3613// The returned connRequestDelHandle can be used to remove the item from 3614// the set. 3615func (s *connRequestSet) Add(v chan connRequest) connRequestDelHandle { 3616 idx := len(s.s) 3617 // TODO(bradfitz): for simplicity, this always allocates a new int-sized 3618 // allocation to store the index. But generally the set will be small and 3619 // under a scannable-threshold. As an optimization, we could permit the *int 3620 // to be nil when the set is small and should be scanned. This works even if 3621 // the set grows over the threshold with delete handles outstanding because 3622 // an element can only move to a lower index. So if it starts with a nil 3623 // position, it'll always be in a low index and thus scannable. But that 3624 // can be done in a follow-up change. 3625 idxPtr := &idx 3626 s.s = append(s.s, connRequestAndIndex{v, idxPtr}) 3627 return connRequestDelHandle{idxPtr} 3628} 3629 3630// Delete removes an element from the set. 3631// 3632// It reports whether the element was deleted. (It can return false if a caller 3633// of TakeRandom took it meanwhile, or upon the second call to Delete) 3634func (s *connRequestSet) Delete(h connRequestDelHandle) bool { 3635 idx := *h.idx 3636 if idx < 0 { 3637 return false 3638 } 3639 s.deleteIndex(idx) 3640 return true 3641} 3642 3643func (s *connRequestSet) deleteIndex(idx int) { 3644 // Mark item as deleted. 3645 *(s.s[idx].curIdx) = -1 3646 // Copy last element, updating its position 3647 // to its new home. 3648 if idx < len(s.s)-1 { 3649 last := s.s[len(s.s)-1] 3650 *last.curIdx = idx 3651 s.s[idx] = last 3652 } 3653 // Zero out last element (for GC) before shrinking the slice. 3654 s.s[len(s.s)-1] = connRequestAndIndex{} 3655 s.s = s.s[:len(s.s)-1] 3656} 3657 3658// TakeRandom returns and removes a random element from s 3659// and reports whether there was one to take. (It returns ok=false 3660// if the set is empty.) 3661func (s *connRequestSet) TakeRandom() (v chan connRequest, ok bool) { 3662 if len(s.s) == 0 { 3663 return nil, false 3664 } 3665 pick := rand.IntN(len(s.s)) 3666 e := s.s[pick] 3667 s.deleteIndex(pick) 3668 return e.req, true 3669} 3670