1// Copyright 2021 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package poll_test
6
7import (
8	"internal/poll"
9	"runtime"
10	"sync"
11	"sync/atomic"
12	"testing"
13	"time"
14)
15
16var closeHook atomic.Value // func(fd int)
17
18func init() {
19	closeFunc := poll.CloseFunc
20	poll.CloseFunc = func(fd int) (err error) {
21		if v := closeHook.Load(); v != nil {
22			if hook := v.(func(int)); hook != nil {
23				hook(fd)
24			}
25		}
26		return closeFunc(fd)
27	}
28}
29
30func TestSplicePipePool(t *testing.T) {
31	const N = 64
32	var (
33		p          *poll.SplicePipe
34		ps         []*poll.SplicePipe
35		allFDs     []int
36		pendingFDs sync.Map // fd → struct{}{}
37		err        error
38	)
39
40	closeHook.Store(func(fd int) { pendingFDs.Delete(fd) })
41	t.Cleanup(func() { closeHook.Store((func(int))(nil)) })
42
43	for i := 0; i < N; i++ {
44		p, err = poll.GetPipe()
45		if err != nil {
46			t.Skipf("failed to create pipe due to error(%v), skip this test", err)
47		}
48		_, pwfd := poll.GetPipeFds(p)
49		allFDs = append(allFDs, pwfd)
50		pendingFDs.Store(pwfd, struct{}{})
51		ps = append(ps, p)
52	}
53	for _, p = range ps {
54		poll.PutPipe(p)
55	}
56	ps = nil
57	p = nil
58
59	// Exploit the timeout of "go test" as a timer for the subsequent verification.
60	timeout := 5 * time.Minute
61	if deadline, ok := t.Deadline(); ok {
62		timeout = deadline.Sub(time.Now())
63		timeout -= timeout / 10 // Leave 10% headroom for cleanup.
64	}
65	expiredTime := time.NewTimer(timeout)
66	defer expiredTime.Stop()
67
68	// Trigger garbage collection repeatedly, waiting for all pipes in sync.Pool
69	// to either be deallocated and closed, or to time out.
70	for {
71		runtime.GC()
72		time.Sleep(10 * time.Millisecond)
73
74		// Detect whether all pipes are closed properly.
75		var leakedFDs []int
76		pendingFDs.Range(func(k, v any) bool {
77			leakedFDs = append(leakedFDs, k.(int))
78			return true
79		})
80		if len(leakedFDs) == 0 {
81			break
82		}
83
84		select {
85		case <-expiredTime.C:
86			t.Logf("all descriptors: %v", allFDs)
87			t.Fatalf("leaked descriptors: %v", leakedFDs)
88		default:
89		}
90	}
91}
92
93func BenchmarkSplicePipe(b *testing.B) {
94	b.Run("SplicePipeWithPool", func(b *testing.B) {
95		for i := 0; i < b.N; i++ {
96			p, err := poll.GetPipe()
97			if err != nil {
98				continue
99			}
100			poll.PutPipe(p)
101		}
102	})
103	b.Run("SplicePipeWithoutPool", func(b *testing.B) {
104		for i := 0; i < b.N; i++ {
105			p := poll.NewPipe()
106			if p == nil {
107				b.Skip("newPipe returned nil")
108			}
109			poll.DestroyPipe(p)
110		}
111	})
112}
113
114func BenchmarkSplicePipePoolParallel(b *testing.B) {
115	b.RunParallel(func(pb *testing.PB) {
116		for pb.Next() {
117			p, err := poll.GetPipe()
118			if err != nil {
119				continue
120			}
121			poll.PutPipe(p)
122		}
123	})
124}
125
126func BenchmarkSplicePipeNativeParallel(b *testing.B) {
127	b.RunParallel(func(pb *testing.PB) {
128		for pb.Next() {
129			p := poll.NewPipe()
130			if p == nil {
131				b.Skip("newPipe returned nil")
132			}
133			poll.DestroyPipe(p)
134		}
135	})
136}
137