1// Copyright 2016 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package zip 16 17import ( 18 "fmt" 19 "runtime" 20) 21 22type RateLimit struct { 23 requests chan request 24 completions chan int64 25 26 stop chan struct{} 27} 28 29type request struct { 30 size int64 31 serviced chan struct{} 32} 33 34// NewRateLimit starts a new rate limiter that permits the usage of up to <capacity> at once, 35// except when no capacity is in use, in which case the first caller is always permitted 36func NewRateLimit(capacity int64) *RateLimit { 37 ret := &RateLimit{ 38 requests: make(chan request), 39 completions: make(chan int64), 40 41 stop: make(chan struct{}), 42 } 43 44 go ret.monitorChannels(capacity) 45 46 return ret 47} 48 49// RequestExecution blocks until another execution of size <size> can be allowed to run. 50func (r *RateLimit) Request(size int64) { 51 request := request{ 52 size: size, 53 serviced: make(chan struct{}, 1), 54 } 55 56 // wait for the request to be received 57 r.requests <- request 58 59 // wait for the request to be accepted 60 <-request.serviced 61} 62 63// Finish declares the completion of an execution of size <size> 64func (r *RateLimit) Finish(size int64) { 65 r.completions <- size 66} 67 68// Stop the background goroutine 69func (r *RateLimit) Stop() { 70 close(r.stop) 71} 72 73// monitorChannels processes incoming requests from channels 74func (r *RateLimit) monitorChannels(capacity int64) { 75 var usedCapacity int64 76 var currentRequest *request 77 78 for { 79 var requests chan request 80 if currentRequest == nil { 81 // If we don't already have a queued request, then we should check for a new request 82 requests = r.requests 83 } 84 85 select { 86 case newRequest := <-requests: 87 currentRequest = &newRequest 88 case amountCompleted := <-r.completions: 89 usedCapacity -= amountCompleted 90 91 if usedCapacity < 0 { 92 panic(fmt.Sprintf("usedCapacity < 0: %v (decreased by %v)", usedCapacity, amountCompleted)) 93 } 94 case <-r.stop: 95 return 96 } 97 98 if currentRequest != nil { 99 accepted := false 100 if usedCapacity == 0 { 101 accepted = true 102 } else { 103 if capacity >= usedCapacity+currentRequest.size { 104 accepted = true 105 } 106 } 107 if accepted { 108 usedCapacity += currentRequest.size 109 currentRequest.serviced <- struct{}{} 110 currentRequest = nil 111 } 112 } 113 } 114} 115 116// A CPURateLimiter limits the number of active calls based on CPU requirements 117type CPURateLimiter struct { 118 impl *RateLimit 119} 120 121func NewCPURateLimiter(capacity int64) *CPURateLimiter { 122 if capacity <= 0 { 123 capacity = int64(runtime.NumCPU()) 124 } 125 impl := NewRateLimit(capacity) 126 return &CPURateLimiter{impl: impl} 127} 128 129func (e CPURateLimiter) Request() { 130 e.impl.Request(1) 131} 132 133func (e CPURateLimiter) Finish() { 134 e.impl.Finish(1) 135} 136 137func (e CPURateLimiter) Stop() { 138 e.impl.Stop() 139} 140 141// A MemoryRateLimiter limits the number of active calls based on Memory requirements 142type MemoryRateLimiter struct { 143 *RateLimit 144} 145 146func NewMemoryRateLimiter(capacity int64) *MemoryRateLimiter { 147 if capacity <= 0 { 148 capacity = 512 * 1024 * 1024 // 512MB 149 } 150 impl := NewRateLimit(capacity) 151 return &MemoryRateLimiter{RateLimit: impl} 152} 153