001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.commons.lang3.concurrent;
018
019 import java.util.concurrent.ScheduledExecutorService;
020 import java.util.concurrent.ScheduledFuture;
021 import java.util.concurrent.ScheduledThreadPoolExecutor;
022 import java.util.concurrent.TimeUnit;
023
024 /**
025 * <p>
026 * A specialized <em>semaphore</em> implementation that provides a number of
027 * permits in a given time frame.
028 * </p>
029 * <p>
030 * This class is similar to the {@code java.util.concurrent.Semaphore} class
031 * provided by the JDK in that it manages a configurable number of permits.
032 * Using the {@link #acquire()} method a permit can be requested by a thread.
033 * However, there is an additional timing dimension: there is no {@code
034 * release()} method for freeing a permit, but all permits are automatically
035 * released at the end of a configurable time frame. If a thread calls
036 * {@link #acquire()} and the available permits are already exhausted for this
037 * time frame, the thread is blocked. When the time frame ends all permits
038 * requested so far are restored, and blocking threads are waked up again, so
039 * that they can try to acquire a new permit. This basically means that in the
040 * specified time frame only the given number of operations is possible.
041 * </p>
042 * <p>
043 * A use case for this class is to artificially limit the load produced by a
044 * process. As an example consider an application that issues database queries
045 * on a production system in a background process to gather statistical
046 * information. This background processing should not produce so much database
047 * load that the functionality and the performance of the production system are
048 * impacted. Here a {@code TimedSemaphore} could be installed to guarantee that
049 * only a given number of database queries are issued per second.
050 * </p>
051 * <p>
052 * A thread class for performing database queries could look as follows:
053 *
054 * <pre>
055 * public class StatisticsThread extends Thread {
056 * // The semaphore for limiting database load.
057 * private final TimedSemaphore semaphore;
058 * // Create an instance and set the semaphore
059 * public StatisticsThread(TimedSemaphore timedSemaphore) {
060 * semaphore = timedSemaphore;
061 * }
062 * // Gather statistics
063 * public void run() {
064 * try {
065 * while(true) {
066 * semaphore.acquire(); // limit database load
067 * performQuery(); // issue a query
068 * }
069 * } catch(InterruptedException) {
070 * // fall through
071 * }
072 * }
073 * ...
074 * }
075 * </pre>
076 *
077 * The following code fragment shows how a {@code TimedSemaphore} is created
078 * that allows only 10 operations per second and passed to the statistics
079 * thread:
080 *
081 * <pre>
082 * TimedSemaphore sem = new TimedSemaphore(1, TimeUnit.SECOND, 10);
083 * StatisticsThread thread = new StatisticsThread(sem);
084 * thread.start();
085 * </pre>
086 *
087 * </p>
088 * <p>
089 * When creating an instance the time period for the semaphore must be
090 * specified. {@code TimedSemaphore} uses an executor service with a
091 * corresponding period to monitor this interval. The {@code
092 * ScheduledExecutorService} to be used for this purpose can be provided at
093 * construction time. Alternatively the class creates an internal executor
094 * service.
095 * </p>
096 * <p>
097 * Client code that uses {@code TimedSemaphore} has to call the
098 * {@link #acquire()} method in aach processing step. {@code TimedSemaphore}
099 * keeps track of the number of invocations of the {@link #acquire()} method and
100 * blocks the calling thread if the counter exceeds the limit specified. When
101 * the timer signals the end of the time period the counter is reset and all
102 * waiting threads are released. Then another cycle can start.
103 * </p>
104 * <p>
105 * It is possible to modify the limit at any time using the
106 * {@link #setLimit(int)} method. This is useful if the load produced by an
107 * operation has to be adapted dynamically. In the example scenario with the
108 * thread collecting statistics it may make sense to specify a low limit during
109 * day time while allowing a higher load in the night time. Reducing the limit
110 * takes effect immediately by blocking incoming callers. If the limit is
111 * increased, waiting threads are not released immediately, but wake up when the
112 * timer runs out. Then, in the next period more processing steps can be
113 * performed without blocking. By setting the limit to 0 the semaphore can be
114 * switched off: in this mode the {@link #acquire()} method never blocks, but
115 * lets all callers pass directly.
116 * </p>
117 * <p>
118 * When the {@code TimedSemaphore} is no more needed its {@link #shutdown()}
119 * method should be called. This causes the periodic task that monitors the time
120 * interval to be canceled. If the {@code ScheduledExecutorService} has been
121 * created by the semaphore at construction time, it is also shut down.
122 * resources. After that {@link #acquire()} must not be called any more.
123 * </p>
124 *
125 * @version $Id: TimedSemaphore.java 895466 2010-01-03 19:04:08Z oheger $
126 */
127 public class TimedSemaphore {
128 /**
129 * Constant for a value representing no limit. If the limit is set to a
130 * value less or equal this constant, the {@code TimedSemaphore} will be
131 * effectively switched off.
132 */
133 public static final int NO_LIMIT = 0;
134
135 /** Constant for the thread pool size for the executor. */
136 private static final int THREAD_POOL_SIZE = 1;
137
138 /** The executor service for managing the timer thread. */
139 private final ScheduledExecutorService executorService;
140
141 /** Stores the period for this timed semaphore. */
142 private final long period;
143
144 /** The time unit for the period. */
145 private final TimeUnit unit;
146
147 /** A flag whether the executor service was created by this object. */
148 private final boolean ownExecutor;
149
150 /** A future object representing the timer task. */
151 private ScheduledFuture<?> task;
152
153 /** Stores the total number of invocations of the acquire() method. */
154 private long totalAcquireCount;
155
156 /**
157 * The counter for the periods. This counter is increased every time a
158 * period ends.
159 */
160 private long periodCount;
161
162 /** The limit. */
163 private int limit;
164
165 /** The current counter. */
166 private int acquireCount;
167
168 /** The number of invocations of acquire() in the last period. */
169 private int lastCallsPerPeriod;
170
171 /** A flag whether shutdown() was called. */
172 private boolean shutdown;
173
174 /**
175 * Creates a new instance of {@link TimedSemaphore} and initializes it with
176 * the given time period and the limit.
177 *
178 * @param timePeriod the time period
179 * @param timeUnit the unit for the period
180 * @param limit the limit for the semaphore
181 * @throws IllegalArgumentException if the period is less or equals 0
182 */
183 public TimedSemaphore(long timePeriod, TimeUnit timeUnit, int limit) {
184 this(null, timePeriod, timeUnit, limit);
185 }
186
187 /**
188 * Creates a new instance of {@link TimedSemaphore} and initializes it with
189 * an executor service, the given time period, and the limit. The executor
190 * service will be used for creating a periodic task for monitoring the time
191 * period. It can be <b>null</b>, then a default service will be created.
192 *
193 * @param service the executor service
194 * @param timePeriod the time period
195 * @param timeUnit the unit for the period
196 * @param limit the limit for the semaphore
197 * @throws IllegalArgumentException if the period is less or equals 0
198 */
199 public TimedSemaphore(ScheduledExecutorService service, long timePeriod,
200 TimeUnit timeUnit, int limit) {
201 if (timePeriod <= 0) {
202 throw new IllegalArgumentException("Time period must be greater 0!");
203 }
204
205 period = timePeriod;
206 unit = timeUnit;
207
208 if (service != null) {
209 executorService = service;
210 ownExecutor = false;
211 } else {
212 ScheduledThreadPoolExecutor s = new ScheduledThreadPoolExecutor(
213 THREAD_POOL_SIZE);
214 s.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
215 s.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
216 executorService = s;
217 ownExecutor = true;
218 }
219
220 setLimit(limit);
221 }
222
223 /**
224 * Returns the limit enforced by this semaphore. The limit determines how
225 * many invocations of {@link #acquire()} are allowed within the monitored
226 * period.
227 *
228 * @return the limit
229 */
230 public final synchronized int getLimit() {
231 return limit;
232 }
233
234 /**
235 * Sets the limit. This is the number of times the {@link #acquire()} method
236 * can be called within the time period specified. If this limit is reached,
237 * further invocations of {@link #acquire()} will block. Setting the limit
238 * to a value <= {@link #NO_LIMIT} will cause the limit to be disabled,
239 * i.e. an arbitrary number of{@link #acquire()} invocations is allowed in
240 * the time period.
241 *
242 * @param limit the limit
243 */
244 public final synchronized void setLimit(int limit) {
245 this.limit = limit;
246 }
247
248 /**
249 * Initializes a shutdown. After that the object cannot be used any more.
250 * This method can be invoked an arbitrary number of times. All invocations
251 * after the first one do not have any effect.
252 */
253 public synchronized void shutdown() {
254 if (!shutdown) {
255
256 if (ownExecutor) {
257 // if the executor was created by this instance, it has
258 // to be shutdown
259 getExecutorService().shutdownNow();
260 }
261 if (task != null) {
262 task.cancel(false);
263 }
264
265 shutdown = true;
266 }
267 }
268
269 /**
270 * Tests whether the {@link #shutdown()} method has been called on this
271 * object. If this method returns <b>true</b>, this instance cannot be used
272 * any longer.
273 *
274 * @return a flag whether a shutdown has been performed
275 */
276 public synchronized boolean isShutdown() {
277 return shutdown;
278 }
279
280 /**
281 * Tries to acquire a permit from this semaphore. This method will block if
282 * the limit for the current period has already been reached. If
283 * {@link #shutdown()} has already been invoked, calling this method will
284 * cause an exception. The very first call of this method starts the timer
285 * task which monitors the time period set for this {@code TimedSemaphore}.
286 * From now on the semaphore is active.
287 *
288 * @throws InterruptedException if the thread gets interrupted
289 * @throws IllegalStateException if this semaphore is already shut down
290 */
291 public synchronized void acquire() throws InterruptedException {
292 if (isShutdown()) {
293 throw new IllegalStateException("TimedSemaphore is shut down!");
294 }
295
296 if (task == null) {
297 task = startTimer();
298 }
299
300 boolean canPass = false;
301 do {
302 canPass = getLimit() <= NO_LIMIT || acquireCount < getLimit();
303 if (!canPass) {
304 wait();
305 } else {
306 acquireCount++;
307 }
308 } while (!canPass);
309 }
310
311 /**
312 * Returns the number of (successful) acquire invocations during the last
313 * period. This is the number of times the {@link #acquire()} method was
314 * called without blocking. This can be useful for testing or debugging
315 * purposes or to determine a meaningful threshold value. If a limit is set,
316 * the value returned by this method won't be greater than this limit.
317 *
318 * @return the number of non-blocking invocations of the {@link #acquire()}
319 * method
320 */
321 public synchronized int getLastAcquiresPerPeriod() {
322 return lastCallsPerPeriod;
323 }
324
325 /**
326 * Returns the number of invocations of the {@link #acquire()} method for
327 * the current period. This may be useful for testing or debugging purposes.
328 *
329 * @return the current number of {@link #acquire()} invocations
330 */
331 public synchronized int getAcquireCount() {
332 return acquireCount;
333 }
334
335 /**
336 * Returns the number of calls to the {@link #acquire()} method that can
337 * still be performed in the current period without blocking. This method
338 * can give an indication whether it is safe to call the {@link #acquire()}
339 * method without risking to be suspended. However, there is no guarantee
340 * that a subsequent call to {@link #acquire()} actually is not-blocking
341 * because in the mean time other threads may have invoked the semaphore.
342 *
343 * @return the current number of available {@link #acquire()} calls in the
344 * current period
345 */
346 public synchronized int getAvailablePermits() {
347 return getLimit() - getAcquireCount();
348 }
349
350 /**
351 * Returns the average number of successful (i.e. non-blocking)
352 * {@link #acquire()} invocations for the entire life-time of this {@code
353 * TimedSemaphore}. This method can be used for instance for statistical
354 * calculations.
355 *
356 * @return the average number of {@link #acquire()} invocations per time
357 * unit
358 */
359 public synchronized double getAverageCallsPerPeriod() {
360 return (periodCount == 0) ? 0 : (double) totalAcquireCount
361 / (double) periodCount;
362 }
363
364 /**
365 * Returns the time period. This is the time monitored by this semaphore.
366 * Only a given number of invocations of the {@link #acquire()} method is
367 * possible in this period.
368 *
369 * @return the time period
370 */
371 public long getPeriod() {
372 return period;
373 }
374
375 /**
376 * Returns the time unit. This is the unit used by {@link #getPeriod()}.
377 *
378 * @return the time unit
379 */
380 public TimeUnit getUnit() {
381 return unit;
382 }
383
384 /**
385 * Returns the executor service used by this instance.
386 *
387 * @return the executor service
388 */
389 protected ScheduledExecutorService getExecutorService() {
390 return executorService;
391 }
392
393 /**
394 * Starts the timer. This method is called when {@link #acquire()} is called
395 * for the first time. It schedules a task to be executed at fixed rate to
396 * monitor the time period specified.
397 *
398 * @return a future object representing the task scheduled
399 */
400 protected ScheduledFuture<?> startTimer() {
401 return getExecutorService().scheduleAtFixedRate(new Runnable() {
402 public void run() {
403 endOfPeriod();
404 }
405 }, getPeriod(), getPeriod(), getUnit());
406 }
407
408 /**
409 * The current time period is finished. This method is called by the timer
410 * used internally to monitor the time period. It resets the counter and
411 * releases the threads waiting for this barrier.
412 */
413 synchronized void endOfPeriod() {
414 lastCallsPerPeriod = acquireCount;
415 totalAcquireCount += acquireCount;
416 periodCount++;
417 acquireCount = 0;
418 notifyAll();
419 }
420 }