1// Copyright 2013 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5//go:build darwin || dragonfly || freebsd || netbsd || openbsd 6 7package runtime 8 9// Integrated network poller (kqueue-based implementation). 10 11import ( 12 "internal/goarch" 13 "internal/runtime/atomic" 14 "unsafe" 15) 16 17var ( 18 kq int32 = -1 19 netpollWakeSig atomic.Uint32 // used to avoid duplicate calls of netpollBreak 20) 21 22func netpollinit() { 23 kq = kqueue() 24 if kq < 0 { 25 println("runtime: kqueue failed with", -kq) 26 throw("runtime: netpollinit failed") 27 } 28 closeonexec(kq) 29 addWakeupEvent(kq) 30} 31 32func netpollopen(fd uintptr, pd *pollDesc) int32 { 33 // Arm both EVFILT_READ and EVFILT_WRITE in edge-triggered mode (EV_CLEAR) 34 // for the whole fd lifetime. The notifications are automatically unregistered 35 // when fd is closed. 36 var ev [2]keventt 37 *(*uintptr)(unsafe.Pointer(&ev[0].ident)) = fd 38 ev[0].filter = _EVFILT_READ 39 ev[0].flags = _EV_ADD | _EV_CLEAR 40 ev[0].fflags = 0 41 ev[0].data = 0 42 43 if goarch.PtrSize == 4 { 44 // We only have a pointer-sized field to store into, 45 // so on a 32-bit system we get no sequence protection. 46 // TODO(iant): If we notice any problems we could at least 47 // steal the low-order 2 bits for a tiny sequence number. 48 ev[0].udata = (*byte)(unsafe.Pointer(pd)) 49 } else { 50 tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load()) 51 ev[0].udata = (*byte)(unsafe.Pointer(uintptr(tp))) 52 } 53 ev[1] = ev[0] 54 ev[1].filter = _EVFILT_WRITE 55 n := kevent(kq, &ev[0], 2, nil, 0, nil) 56 if n < 0 { 57 return -n 58 } 59 return 0 60} 61 62func netpollclose(fd uintptr) int32 { 63 // Don't need to unregister because calling close() 64 // on fd will remove any kevents that reference the descriptor. 65 return 0 66} 67 68func netpollarm(pd *pollDesc, mode int) { 69 throw("runtime: unused") 70} 71 72// netpollBreak interrupts a kevent. 73func netpollBreak() { 74 // Failing to cas indicates there is an in-flight wakeup, so we're done here. 75 if !netpollWakeSig.CompareAndSwap(0, 1) { 76 return 77 } 78 79 wakeNetpoll(kq) 80} 81 82// netpoll checks for ready network connections. 83// Returns list of goroutines that become runnable. 84// delay < 0: blocks indefinitely 85// delay == 0: does not block, just polls 86// delay > 0: block for up to that many nanoseconds 87func netpoll(delay int64) (gList, int32) { 88 if kq == -1 { 89 return gList{}, 0 90 } 91 var tp *timespec 92 var ts timespec 93 if delay < 0 { 94 tp = nil 95 } else if delay == 0 { 96 tp = &ts 97 } else { 98 ts.setNsec(delay) 99 if ts.tv_sec > 1e6 { 100 // Darwin returns EINVAL if the sleep time is too long. 101 ts.tv_sec = 1e6 102 } 103 tp = &ts 104 } 105 var events [64]keventt 106retry: 107 n := kevent(kq, nil, 0, &events[0], int32(len(events)), tp) 108 if n < 0 { 109 // Ignore the ETIMEDOUT error for now, but try to dive deep and 110 // figure out what really happened with n == ETIMEOUT, 111 // see https://go.dev/issue/59679 for details. 112 if n != -_EINTR && n != -_ETIMEDOUT { 113 println("runtime: kevent on fd", kq, "failed with", -n) 114 throw("runtime: netpoll failed") 115 } 116 // If a timed sleep was interrupted, just return to 117 // recalculate how long we should sleep now. 118 if delay > 0 { 119 return gList{}, 0 120 } 121 goto retry 122 } 123 var toRun gList 124 delta := int32(0) 125 for i := 0; i < int(n); i++ { 126 ev := &events[i] 127 128 if isWakeup(ev) { 129 if delay != 0 { 130 // netpollBreak could be picked up by a nonblocking poll. 131 // Only call drainWakeupEvent and reset the netpollWakeSig if blocking. 132 drainWakeupEvent(kq) 133 netpollWakeSig.Store(0) 134 } 135 continue 136 } 137 138 var mode int32 139 switch ev.filter { 140 case _EVFILT_READ: 141 mode += 'r' 142 143 // On some systems when the read end of a pipe 144 // is closed the write end will not get a 145 // _EVFILT_WRITE event, but will get a 146 // _EVFILT_READ event with EV_EOF set. 147 // Note that setting 'w' here just means that we 148 // will wake up a goroutine waiting to write; 149 // that goroutine will try the write again, 150 // and the appropriate thing will happen based 151 // on what that write returns (success, EPIPE, EAGAIN). 152 if ev.flags&_EV_EOF != 0 { 153 mode += 'w' 154 } 155 case _EVFILT_WRITE: 156 mode += 'w' 157 } 158 if mode != 0 { 159 var pd *pollDesc 160 var tag uintptr 161 if goarch.PtrSize == 4 { 162 // No sequence protection on 32-bit systems. 163 // See netpollopen for details. 164 pd = (*pollDesc)(unsafe.Pointer(ev.udata)) 165 tag = 0 166 } else { 167 tp := taggedPointer(uintptr(unsafe.Pointer(ev.udata))) 168 pd = (*pollDesc)(tp.pointer()) 169 tag = tp.tag() 170 if pd.fdseq.Load() != tag { 171 continue 172 } 173 } 174 pd.setEventErr(ev.flags == _EV_ERROR, tag) 175 delta += netpollready(&toRun, pd, mode) 176 } 177 } 178 return toRun, delta 179} 180