1// Copyright 2019 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package sync
6
7import (
8	"sync/atomic"
9	"unsafe"
10)
11
12// poolDequeue is a lock-free fixed-size single-producer,
13// multi-consumer queue. The single producer can both push and pop
14// from the head, and consumers can pop from the tail.
15//
16// It has the added feature that it nils out unused slots to avoid
17// unnecessary retention of objects. This is important for sync.Pool,
18// but not typically a property considered in the literature.
19type poolDequeue struct {
20	// headTail packs together a 32-bit head index and a 32-bit
21	// tail index. Both are indexes into vals modulo len(vals)-1.
22	//
23	// tail = index of oldest data in queue
24	// head = index of next slot to fill
25	//
26	// Slots in the range [tail, head) are owned by consumers.
27	// A consumer continues to own a slot outside this range until
28	// it nils the slot, at which point ownership passes to the
29	// producer.
30	//
31	// The head index is stored in the most-significant bits so
32	// that we can atomically add to it and the overflow is
33	// harmless.
34	headTail atomic.Uint64
35
36	// vals is a ring buffer of interface{} values stored in this
37	// dequeue. The size of this must be a power of 2.
38	//
39	// vals[i].typ is nil if the slot is empty and non-nil
40	// otherwise. A slot is still in use until *both* the tail
41	// index has moved beyond it and typ has been set to nil. This
42	// is set to nil atomically by the consumer and read
43	// atomically by the producer.
44	vals []eface
45}
46
47type eface struct {
48	typ, val unsafe.Pointer
49}
50
51const dequeueBits = 32
52
53// dequeueLimit is the maximum size of a poolDequeue.
54//
55// This must be at most (1<<dequeueBits)/2 because detecting fullness
56// depends on wrapping around the ring buffer without wrapping around
57// the index. We divide by 4 so this fits in an int on 32-bit.
58const dequeueLimit = (1 << dequeueBits) / 4
59
60// dequeueNil is used in poolDequeue to represent interface{}(nil).
61// Since we use nil to represent empty slots, we need a sentinel value
62// to represent nil.
63type dequeueNil *struct{}
64
65func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
66	const mask = 1<<dequeueBits - 1
67	head = uint32((ptrs >> dequeueBits) & mask)
68	tail = uint32(ptrs & mask)
69	return
70}
71
72func (d *poolDequeue) pack(head, tail uint32) uint64 {
73	const mask = 1<<dequeueBits - 1
74	return (uint64(head) << dequeueBits) |
75		uint64(tail&mask)
76}
77
78// pushHead adds val at the head of the queue. It returns false if the
79// queue is full. It must only be called by a single producer.
80func (d *poolDequeue) pushHead(val any) bool {
81	ptrs := d.headTail.Load()
82	head, tail := d.unpack(ptrs)
83	if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
84		// Queue is full.
85		return false
86	}
87	slot := &d.vals[head&uint32(len(d.vals)-1)]
88
89	// Check if the head slot has been released by popTail.
90	typ := atomic.LoadPointer(&slot.typ)
91	if typ != nil {
92		// Another goroutine is still cleaning up the tail, so
93		// the queue is actually still full.
94		return false
95	}
96
97	// The head slot is free, so we own it.
98	if val == nil {
99		val = dequeueNil(nil)
100	}
101	*(*any)(unsafe.Pointer(slot)) = val
102
103	// Increment head. This passes ownership of slot to popTail
104	// and acts as a store barrier for writing the slot.
105	d.headTail.Add(1 << dequeueBits)
106	return true
107}
108
109// popHead removes and returns the element at the head of the queue.
110// It returns false if the queue is empty. It must only be called by a
111// single producer.
112func (d *poolDequeue) popHead() (any, bool) {
113	var slot *eface
114	for {
115		ptrs := d.headTail.Load()
116		head, tail := d.unpack(ptrs)
117		if tail == head {
118			// Queue is empty.
119			return nil, false
120		}
121
122		// Confirm tail and decrement head. We do this before
123		// reading the value to take back ownership of this
124		// slot.
125		head--
126		ptrs2 := d.pack(head, tail)
127		if d.headTail.CompareAndSwap(ptrs, ptrs2) {
128			// We successfully took back slot.
129			slot = &d.vals[head&uint32(len(d.vals)-1)]
130			break
131		}
132	}
133
134	val := *(*any)(unsafe.Pointer(slot))
135	if val == dequeueNil(nil) {
136		val = nil
137	}
138	// Zero the slot. Unlike popTail, this isn't racing with
139	// pushHead, so we don't need to be careful here.
140	*slot = eface{}
141	return val, true
142}
143
144// popTail removes and returns the element at the tail of the queue.
145// It returns false if the queue is empty. It may be called by any
146// number of consumers.
147func (d *poolDequeue) popTail() (any, bool) {
148	var slot *eface
149	for {
150		ptrs := d.headTail.Load()
151		head, tail := d.unpack(ptrs)
152		if tail == head {
153			// Queue is empty.
154			return nil, false
155		}
156
157		// Confirm head and tail (for our speculative check
158		// above) and increment tail. If this succeeds, then
159		// we own the slot at tail.
160		ptrs2 := d.pack(head, tail+1)
161		if d.headTail.CompareAndSwap(ptrs, ptrs2) {
162			// Success.
163			slot = &d.vals[tail&uint32(len(d.vals)-1)]
164			break
165		}
166	}
167
168	// We now own slot.
169	val := *(*any)(unsafe.Pointer(slot))
170	if val == dequeueNil(nil) {
171		val = nil
172	}
173
174	// Tell pushHead that we're done with this slot. Zeroing the
175	// slot is also important so we don't leave behind references
176	// that could keep this object live longer than necessary.
177	//
178	// We write to val first and then publish that we're done with
179	// this slot by atomically writing to typ.
180	slot.val = nil
181	atomic.StorePointer(&slot.typ, nil)
182	// At this point pushHead owns the slot.
183
184	return val, true
185}
186
187// poolChain is a dynamically-sized version of poolDequeue.
188//
189// This is implemented as a doubly-linked list queue of poolDequeues
190// where each dequeue is double the size of the previous one. Once a
191// dequeue fills up, this allocates a new one and only ever pushes to
192// the latest dequeue. Pops happen from the other end of the list and
193// once a dequeue is exhausted, it gets removed from the list.
194type poolChain struct {
195	// head is the poolDequeue to push to. This is only accessed
196	// by the producer, so doesn't need to be synchronized.
197	head *poolChainElt
198
199	// tail is the poolDequeue to popTail from. This is accessed
200	// by consumers, so reads and writes must be atomic.
201	tail atomic.Pointer[poolChainElt]
202}
203
204type poolChainElt struct {
205	poolDequeue
206
207	// next and prev link to the adjacent poolChainElts in this
208	// poolChain.
209	//
210	// next is written atomically by the producer and read
211	// atomically by the consumer. It only transitions from nil to
212	// non-nil.
213	//
214	// prev is written atomically by the consumer and read
215	// atomically by the producer. It only transitions from
216	// non-nil to nil.
217	next, prev atomic.Pointer[poolChainElt]
218}
219
220func (c *poolChain) pushHead(val any) {
221	d := c.head
222	if d == nil {
223		// Initialize the chain.
224		const initSize = 8 // Must be a power of 2
225		d = new(poolChainElt)
226		d.vals = make([]eface, initSize)
227		c.head = d
228		c.tail.Store(d)
229	}
230
231	if d.pushHead(val) {
232		return
233	}
234
235	// The current dequeue is full. Allocate a new one of twice
236	// the size.
237	newSize := len(d.vals) * 2
238	if newSize >= dequeueLimit {
239		// Can't make it any bigger.
240		newSize = dequeueLimit
241	}
242
243	d2 := &poolChainElt{}
244	d2.prev.Store(d)
245	d2.vals = make([]eface, newSize)
246	c.head = d2
247	d.next.Store(d2)
248	d2.pushHead(val)
249}
250
251func (c *poolChain) popHead() (any, bool) {
252	d := c.head
253	for d != nil {
254		if val, ok := d.popHead(); ok {
255			return val, ok
256		}
257		// There may still be unconsumed elements in the
258		// previous dequeue, so try backing up.
259		d = d.prev.Load()
260	}
261	return nil, false
262}
263
264func (c *poolChain) popTail() (any, bool) {
265	d := c.tail.Load()
266	if d == nil {
267		return nil, false
268	}
269
270	for {
271		// It's important that we load the next pointer
272		// *before* popping the tail. In general, d may be
273		// transiently empty, but if next is non-nil before
274		// the pop and the pop fails, then d is permanently
275		// empty, which is the only condition under which it's
276		// safe to drop d from the chain.
277		d2 := d.next.Load()
278
279		if val, ok := d.popTail(); ok {
280			return val, ok
281		}
282
283		if d2 == nil {
284			// This is the only dequeue. It's empty right
285			// now, but could be pushed to in the future.
286			return nil, false
287		}
288
289		// The tail of the chain has been drained, so move on
290		// to the next dequeue. Try to drop it from the chain
291		// so the next pop doesn't have to look at the empty
292		// dequeue again.
293		if c.tail.CompareAndSwap(d, d2) {
294			// We won the race. Clear the prev pointer so
295			// the garbage collector can collect the empty
296			// dequeue and so popHead doesn't back up
297			// further than necessary.
298			d2.prev.Store(nil)
299		}
300		d = d2
301	}
302}
303