1// Copyright 2010 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package textproto
6
7import (
8	"sync"
9)
10
11// A Pipeline manages a pipelined in-order request/response sequence.
12//
13// To use a Pipeline p to manage multiple clients on a connection,
14// each client should run:
15//
16//	id := p.Next()	// take a number
17//
18//	p.StartRequest(id)	// wait for turn to send request
19//	«send request»
20//	p.EndRequest(id)	// notify Pipeline that request is sent
21//
22//	p.StartResponse(id)	// wait for turn to read response
23//	«read response»
24//	p.EndResponse(id)	// notify Pipeline that response is read
25//
26// A pipelined server can use the same calls to ensure that
27// responses computed in parallel are written in the correct order.
28type Pipeline struct {
29	mu       sync.Mutex
30	id       uint
31	request  sequencer
32	response sequencer
33}
34
35// Next returns the next id for a request/response pair.
36func (p *Pipeline) Next() uint {
37	p.mu.Lock()
38	id := p.id
39	p.id++
40	p.mu.Unlock()
41	return id
42}
43
44// StartRequest blocks until it is time to send (or, if this is a server, receive)
45// the request with the given id.
46func (p *Pipeline) StartRequest(id uint) {
47	p.request.Start(id)
48}
49
50// EndRequest notifies p that the request with the given id has been sent
51// (or, if this is a server, received).
52func (p *Pipeline) EndRequest(id uint) {
53	p.request.End(id)
54}
55
56// StartResponse blocks until it is time to receive (or, if this is a server, send)
57// the request with the given id.
58func (p *Pipeline) StartResponse(id uint) {
59	p.response.Start(id)
60}
61
62// EndResponse notifies p that the response with the given id has been received
63// (or, if this is a server, sent).
64func (p *Pipeline) EndResponse(id uint) {
65	p.response.End(id)
66}
67
68// A sequencer schedules a sequence of numbered events that must
69// happen in order, one after the other. The event numbering must start
70// at 0 and increment without skipping. The event number wraps around
71// safely as long as there are not 2^32 simultaneous events pending.
72type sequencer struct {
73	mu   sync.Mutex
74	id   uint
75	wait map[uint]chan struct{}
76}
77
78// Start waits until it is time for the event numbered id to begin.
79// That is, except for the first event, it waits until End(id-1) has
80// been called.
81func (s *sequencer) Start(id uint) {
82	s.mu.Lock()
83	if s.id == id {
84		s.mu.Unlock()
85		return
86	}
87	c := make(chan struct{})
88	if s.wait == nil {
89		s.wait = make(map[uint]chan struct{})
90	}
91	s.wait[id] = c
92	s.mu.Unlock()
93	<-c
94}
95
96// End notifies the sequencer that the event numbered id has completed,
97// allowing it to schedule the event numbered id+1.  It is a run-time error
98// to call End with an id that is not the number of the active event.
99func (s *sequencer) End(id uint) {
100	s.mu.Lock()
101	if s.id != id {
102		s.mu.Unlock()
103		panic("out of sync")
104	}
105	id++
106	s.id = id
107	if s.wait == nil {
108		s.wait = make(map[uint]chan struct{})
109	}
110	c, ok := s.wait[id]
111	if ok {
112		delete(s.wait, id)
113	}
114	s.mu.Unlock()
115	if ok {
116		close(c)
117	}
118}
119