1*333d2b36SAndroid Build Coastguard Worker// Copyright 2016 Google Inc. All rights reserved. 2*333d2b36SAndroid Build Coastguard Worker// 3*333d2b36SAndroid Build Coastguard Worker// Licensed under the Apache License, Version 2.0 (the "License"); 4*333d2b36SAndroid Build Coastguard Worker// you may not use this file except in compliance with the License. 5*333d2b36SAndroid Build Coastguard Worker// You may obtain a copy of the License at 6*333d2b36SAndroid Build Coastguard Worker// 7*333d2b36SAndroid Build Coastguard Worker// http://www.apache.org/licenses/LICENSE-2.0 8*333d2b36SAndroid Build Coastguard Worker// 9*333d2b36SAndroid Build Coastguard Worker// Unless required by applicable law or agreed to in writing, software 10*333d2b36SAndroid Build Coastguard Worker// distributed under the License is distributed on an "AS IS" BASIS, 11*333d2b36SAndroid Build Coastguard Worker// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12*333d2b36SAndroid Build Coastguard Worker// See the License for the specific language governing permissions and 13*333d2b36SAndroid Build Coastguard Worker// limitations under the License. 14*333d2b36SAndroid Build Coastguard Worker 15*333d2b36SAndroid Build Coastguard Workerpackage zip 16*333d2b36SAndroid Build Coastguard Worker 17*333d2b36SAndroid Build Coastguard Workerimport ( 18*333d2b36SAndroid Build Coastguard Worker "fmt" 19*333d2b36SAndroid Build Coastguard Worker "runtime" 20*333d2b36SAndroid Build Coastguard Worker) 21*333d2b36SAndroid Build Coastguard Worker 22*333d2b36SAndroid Build Coastguard Workertype RateLimit struct { 23*333d2b36SAndroid Build Coastguard Worker requests chan request 24*333d2b36SAndroid Build Coastguard Worker completions chan int64 25*333d2b36SAndroid Build Coastguard Worker 26*333d2b36SAndroid Build Coastguard Worker stop chan struct{} 27*333d2b36SAndroid Build Coastguard Worker} 28*333d2b36SAndroid Build Coastguard Worker 29*333d2b36SAndroid Build Coastguard Workertype request struct { 30*333d2b36SAndroid Build Coastguard Worker size int64 31*333d2b36SAndroid Build Coastguard Worker serviced chan struct{} 32*333d2b36SAndroid Build Coastguard Worker} 33*333d2b36SAndroid Build Coastguard Worker 34*333d2b36SAndroid Build Coastguard Worker// NewRateLimit starts a new rate limiter that permits the usage of up to <capacity> at once, 35*333d2b36SAndroid Build Coastguard Worker// except when no capacity is in use, in which case the first caller is always permitted 36*333d2b36SAndroid Build Coastguard Workerfunc NewRateLimit(capacity int64) *RateLimit { 37*333d2b36SAndroid Build Coastguard Worker ret := &RateLimit{ 38*333d2b36SAndroid Build Coastguard Worker requests: make(chan request), 39*333d2b36SAndroid Build Coastguard Worker completions: make(chan int64), 40*333d2b36SAndroid Build Coastguard Worker 41*333d2b36SAndroid Build Coastguard Worker stop: make(chan struct{}), 42*333d2b36SAndroid Build Coastguard Worker } 43*333d2b36SAndroid Build Coastguard Worker 44*333d2b36SAndroid Build Coastguard Worker go ret.monitorChannels(capacity) 45*333d2b36SAndroid Build Coastguard Worker 46*333d2b36SAndroid Build Coastguard Worker return ret 47*333d2b36SAndroid Build Coastguard Worker} 48*333d2b36SAndroid Build Coastguard Worker 49*333d2b36SAndroid Build Coastguard Worker// RequestExecution blocks until another execution of size <size> can be allowed to run. 50*333d2b36SAndroid Build Coastguard Workerfunc (r *RateLimit) Request(size int64) { 51*333d2b36SAndroid Build Coastguard Worker request := request{ 52*333d2b36SAndroid Build Coastguard Worker size: size, 53*333d2b36SAndroid Build Coastguard Worker serviced: make(chan struct{}, 1), 54*333d2b36SAndroid Build Coastguard Worker } 55*333d2b36SAndroid Build Coastguard Worker 56*333d2b36SAndroid Build Coastguard Worker // wait for the request to be received 57*333d2b36SAndroid Build Coastguard Worker r.requests <- request 58*333d2b36SAndroid Build Coastguard Worker 59*333d2b36SAndroid Build Coastguard Worker // wait for the request to be accepted 60*333d2b36SAndroid Build Coastguard Worker <-request.serviced 61*333d2b36SAndroid Build Coastguard Worker} 62*333d2b36SAndroid Build Coastguard Worker 63*333d2b36SAndroid Build Coastguard Worker// Finish declares the completion of an execution of size <size> 64*333d2b36SAndroid Build Coastguard Workerfunc (r *RateLimit) Finish(size int64) { 65*333d2b36SAndroid Build Coastguard Worker r.completions <- size 66*333d2b36SAndroid Build Coastguard Worker} 67*333d2b36SAndroid Build Coastguard Worker 68*333d2b36SAndroid Build Coastguard Worker// Stop the background goroutine 69*333d2b36SAndroid Build Coastguard Workerfunc (r *RateLimit) Stop() { 70*333d2b36SAndroid Build Coastguard Worker close(r.stop) 71*333d2b36SAndroid Build Coastguard Worker} 72*333d2b36SAndroid Build Coastguard Worker 73*333d2b36SAndroid Build Coastguard Worker// monitorChannels processes incoming requests from channels 74*333d2b36SAndroid Build Coastguard Workerfunc (r *RateLimit) monitorChannels(capacity int64) { 75*333d2b36SAndroid Build Coastguard Worker var usedCapacity int64 76*333d2b36SAndroid Build Coastguard Worker var currentRequest *request 77*333d2b36SAndroid Build Coastguard Worker 78*333d2b36SAndroid Build Coastguard Worker for { 79*333d2b36SAndroid Build Coastguard Worker var requests chan request 80*333d2b36SAndroid Build Coastguard Worker if currentRequest == nil { 81*333d2b36SAndroid Build Coastguard Worker // If we don't already have a queued request, then we should check for a new request 82*333d2b36SAndroid Build Coastguard Worker requests = r.requests 83*333d2b36SAndroid Build Coastguard Worker } 84*333d2b36SAndroid Build Coastguard Worker 85*333d2b36SAndroid Build Coastguard Worker select { 86*333d2b36SAndroid Build Coastguard Worker case newRequest := <-requests: 87*333d2b36SAndroid Build Coastguard Worker currentRequest = &newRequest 88*333d2b36SAndroid Build Coastguard Worker case amountCompleted := <-r.completions: 89*333d2b36SAndroid Build Coastguard Worker usedCapacity -= amountCompleted 90*333d2b36SAndroid Build Coastguard Worker 91*333d2b36SAndroid Build Coastguard Worker if usedCapacity < 0 { 92*333d2b36SAndroid Build Coastguard Worker panic(fmt.Sprintf("usedCapacity < 0: %v (decreased by %v)", usedCapacity, amountCompleted)) 93*333d2b36SAndroid Build Coastguard Worker } 94*333d2b36SAndroid Build Coastguard Worker case <-r.stop: 95*333d2b36SAndroid Build Coastguard Worker return 96*333d2b36SAndroid Build Coastguard Worker } 97*333d2b36SAndroid Build Coastguard Worker 98*333d2b36SAndroid Build Coastguard Worker if currentRequest != nil { 99*333d2b36SAndroid Build Coastguard Worker accepted := false 100*333d2b36SAndroid Build Coastguard Worker if usedCapacity == 0 { 101*333d2b36SAndroid Build Coastguard Worker accepted = true 102*333d2b36SAndroid Build Coastguard Worker } else { 103*333d2b36SAndroid Build Coastguard Worker if capacity >= usedCapacity+currentRequest.size { 104*333d2b36SAndroid Build Coastguard Worker accepted = true 105*333d2b36SAndroid Build Coastguard Worker } 106*333d2b36SAndroid Build Coastguard Worker } 107*333d2b36SAndroid Build Coastguard Worker if accepted { 108*333d2b36SAndroid Build Coastguard Worker usedCapacity += currentRequest.size 109*333d2b36SAndroid Build Coastguard Worker currentRequest.serviced <- struct{}{} 110*333d2b36SAndroid Build Coastguard Worker currentRequest = nil 111*333d2b36SAndroid Build Coastguard Worker } 112*333d2b36SAndroid Build Coastguard Worker } 113*333d2b36SAndroid Build Coastguard Worker } 114*333d2b36SAndroid Build Coastguard Worker} 115*333d2b36SAndroid Build Coastguard Worker 116*333d2b36SAndroid Build Coastguard Worker// A CPURateLimiter limits the number of active calls based on CPU requirements 117*333d2b36SAndroid Build Coastguard Workertype CPURateLimiter struct { 118*333d2b36SAndroid Build Coastguard Worker impl *RateLimit 119*333d2b36SAndroid Build Coastguard Worker} 120*333d2b36SAndroid Build Coastguard Worker 121*333d2b36SAndroid Build Coastguard Workerfunc NewCPURateLimiter(capacity int64) *CPURateLimiter { 122*333d2b36SAndroid Build Coastguard Worker if capacity <= 0 { 123*333d2b36SAndroid Build Coastguard Worker capacity = int64(runtime.NumCPU()) 124*333d2b36SAndroid Build Coastguard Worker } 125*333d2b36SAndroid Build Coastguard Worker impl := NewRateLimit(capacity) 126*333d2b36SAndroid Build Coastguard Worker return &CPURateLimiter{impl: impl} 127*333d2b36SAndroid Build Coastguard Worker} 128*333d2b36SAndroid Build Coastguard Worker 129*333d2b36SAndroid Build Coastguard Workerfunc (e CPURateLimiter) Request() { 130*333d2b36SAndroid Build Coastguard Worker e.impl.Request(1) 131*333d2b36SAndroid Build Coastguard Worker} 132*333d2b36SAndroid Build Coastguard Worker 133*333d2b36SAndroid Build Coastguard Workerfunc (e CPURateLimiter) Finish() { 134*333d2b36SAndroid Build Coastguard Worker e.impl.Finish(1) 135*333d2b36SAndroid Build Coastguard Worker} 136*333d2b36SAndroid Build Coastguard Worker 137*333d2b36SAndroid Build Coastguard Workerfunc (e CPURateLimiter) Stop() { 138*333d2b36SAndroid Build Coastguard Worker e.impl.Stop() 139*333d2b36SAndroid Build Coastguard Worker} 140*333d2b36SAndroid Build Coastguard Worker 141*333d2b36SAndroid Build Coastguard Worker// A MemoryRateLimiter limits the number of active calls based on Memory requirements 142*333d2b36SAndroid Build Coastguard Workertype MemoryRateLimiter struct { 143*333d2b36SAndroid Build Coastguard Worker *RateLimit 144*333d2b36SAndroid Build Coastguard Worker} 145*333d2b36SAndroid Build Coastguard Worker 146*333d2b36SAndroid Build Coastguard Workerfunc NewMemoryRateLimiter(capacity int64) *MemoryRateLimiter { 147*333d2b36SAndroid Build Coastguard Worker if capacity <= 0 { 148*333d2b36SAndroid Build Coastguard Worker capacity = 512 * 1024 * 1024 // 512MB 149*333d2b36SAndroid Build Coastguard Worker } 150*333d2b36SAndroid Build Coastguard Worker impl := NewRateLimit(capacity) 151*333d2b36SAndroid Build Coastguard Worker return &MemoryRateLimiter{RateLimit: impl} 152*333d2b36SAndroid Build Coastguard Worker} 153