1// Copyright 2010 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package textproto 6 7import ( 8 "sync" 9) 10 11// A Pipeline manages a pipelined in-order request/response sequence. 12// 13// To use a Pipeline p to manage multiple clients on a connection, 14// each client should run: 15// 16// id := p.Next() // take a number 17// 18// p.StartRequest(id) // wait for turn to send request 19// «send request» 20// p.EndRequest(id) // notify Pipeline that request is sent 21// 22// p.StartResponse(id) // wait for turn to read response 23// «read response» 24// p.EndResponse(id) // notify Pipeline that response is read 25// 26// A pipelined server can use the same calls to ensure that 27// responses computed in parallel are written in the correct order. 28type Pipeline struct { 29 mu sync.Mutex 30 id uint 31 request sequencer 32 response sequencer 33} 34 35// Next returns the next id for a request/response pair. 36func (p *Pipeline) Next() uint { 37 p.mu.Lock() 38 id := p.id 39 p.id++ 40 p.mu.Unlock() 41 return id 42} 43 44// StartRequest blocks until it is time to send (or, if this is a server, receive) 45// the request with the given id. 46func (p *Pipeline) StartRequest(id uint) { 47 p.request.Start(id) 48} 49 50// EndRequest notifies p that the request with the given id has been sent 51// (or, if this is a server, received). 52func (p *Pipeline) EndRequest(id uint) { 53 p.request.End(id) 54} 55 56// StartResponse blocks until it is time to receive (or, if this is a server, send) 57// the request with the given id. 58func (p *Pipeline) StartResponse(id uint) { 59 p.response.Start(id) 60} 61 62// EndResponse notifies p that the response with the given id has been received 63// (or, if this is a server, sent). 64func (p *Pipeline) EndResponse(id uint) { 65 p.response.End(id) 66} 67 68// A sequencer schedules a sequence of numbered events that must 69// happen in order, one after the other. The event numbering must start 70// at 0 and increment without skipping. The event number wraps around 71// safely as long as there are not 2^32 simultaneous events pending. 72type sequencer struct { 73 mu sync.Mutex 74 id uint 75 wait map[uint]chan struct{} 76} 77 78// Start waits until it is time for the event numbered id to begin. 79// That is, except for the first event, it waits until End(id-1) has 80// been called. 81func (s *sequencer) Start(id uint) { 82 s.mu.Lock() 83 if s.id == id { 84 s.mu.Unlock() 85 return 86 } 87 c := make(chan struct{}) 88 if s.wait == nil { 89 s.wait = make(map[uint]chan struct{}) 90 } 91 s.wait[id] = c 92 s.mu.Unlock() 93 <-c 94} 95 96// End notifies the sequencer that the event numbered id has completed, 97// allowing it to schedule the event numbered id+1. It is a run-time error 98// to call End with an id that is not the number of the active event. 99func (s *sequencer) End(id uint) { 100 s.mu.Lock() 101 if s.id != id { 102 s.mu.Unlock() 103 panic("out of sync") 104 } 105 id++ 106 s.id = id 107 if s.wait == nil { 108 s.wait = make(map[uint]chan struct{}) 109 } 110 c, ok := s.wait[id] 111 if ok { 112 delete(s.wait, id) 113 } 114 s.mu.Unlock() 115 if ok { 116 close(c) 117 } 118} 119