1// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package zip
16
17import (
18	"fmt"
19	"runtime"
20)
21
22type RateLimit struct {
23	requests    chan request
24	completions chan int64
25
26	stop chan struct{}
27}
28
29type request struct {
30	size     int64
31	serviced chan struct{}
32}
33
34// NewRateLimit starts a new rate limiter that permits the usage of up to <capacity> at once,
35// except when no capacity is in use, in which case the first caller is always permitted
36func NewRateLimit(capacity int64) *RateLimit {
37	ret := &RateLimit{
38		requests:    make(chan request),
39		completions: make(chan int64),
40
41		stop: make(chan struct{}),
42	}
43
44	go ret.monitorChannels(capacity)
45
46	return ret
47}
48
49// RequestExecution blocks until another execution of size <size> can be allowed to run.
50func (r *RateLimit) Request(size int64) {
51	request := request{
52		size:     size,
53		serviced: make(chan struct{}, 1),
54	}
55
56	// wait for the request to be received
57	r.requests <- request
58
59	// wait for the request to be accepted
60	<-request.serviced
61}
62
63// Finish declares the completion of an execution of size <size>
64func (r *RateLimit) Finish(size int64) {
65	r.completions <- size
66}
67
68// Stop the background goroutine
69func (r *RateLimit) Stop() {
70	close(r.stop)
71}
72
73// monitorChannels processes incoming requests from channels
74func (r *RateLimit) monitorChannels(capacity int64) {
75	var usedCapacity int64
76	var currentRequest *request
77
78	for {
79		var requests chan request
80		if currentRequest == nil {
81			// If we don't already have a queued request, then we should check for a new request
82			requests = r.requests
83		}
84
85		select {
86		case newRequest := <-requests:
87			currentRequest = &newRequest
88		case amountCompleted := <-r.completions:
89			usedCapacity -= amountCompleted
90
91			if usedCapacity < 0 {
92				panic(fmt.Sprintf("usedCapacity < 0: %v (decreased by %v)", usedCapacity, amountCompleted))
93			}
94		case <-r.stop:
95			return
96		}
97
98		if currentRequest != nil {
99			accepted := false
100			if usedCapacity == 0 {
101				accepted = true
102			} else {
103				if capacity >= usedCapacity+currentRequest.size {
104					accepted = true
105				}
106			}
107			if accepted {
108				usedCapacity += currentRequest.size
109				currentRequest.serviced <- struct{}{}
110				currentRequest = nil
111			}
112		}
113	}
114}
115
116// A CPURateLimiter limits the number of active calls based on CPU requirements
117type CPURateLimiter struct {
118	impl *RateLimit
119}
120
121func NewCPURateLimiter(capacity int64) *CPURateLimiter {
122	if capacity <= 0 {
123		capacity = int64(runtime.NumCPU())
124	}
125	impl := NewRateLimit(capacity)
126	return &CPURateLimiter{impl: impl}
127}
128
129func (e CPURateLimiter) Request() {
130	e.impl.Request(1)
131}
132
133func (e CPURateLimiter) Finish() {
134	e.impl.Finish(1)
135}
136
137func (e CPURateLimiter) Stop() {
138	e.impl.Stop()
139}
140
141// A MemoryRateLimiter limits the number of active calls based on Memory requirements
142type MemoryRateLimiter struct {
143	*RateLimit
144}
145
146func NewMemoryRateLimiter(capacity int64) *MemoryRateLimiter {
147	if capacity <= 0 {
148		capacity = 512 * 1024 * 1024 // 512MB
149	}
150	impl := NewRateLimit(capacity)
151	return &MemoryRateLimiter{RateLimit: impl}
152}
153