ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/cms/source/util/uk/org/iscream/cms/util/Queue.java
Revision: 1.24
Committed: Sat May 18 18:16:03 2002 UTC (21 years, 11 months ago) by tdb
Branch: MAIN
Changes since 1.23: +22 -3 lines
Log Message:
i-scream is now licensed under the GPL. I've added the GPL headers to every
source file, and put a full copy of the license in the appropriate places.
I think I've covered everything. This is going to be a mad commit ;)

File Contents

# Content
1 /*
2 * i-scream central monitoring system
3 * Copyright (C) 2000-2002 i-scream
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 */
19
20 //---PACKAGE DECLARATION---
21 package uk.org.iscream.cms.server.util;
22
23 //---IMPORTS---
24 import java.util.LinkedList;
25 import java.util.NoSuchElementException;
26 import java.util.Random;
27 import uk.org.iscream.cms.server.util.*;
28
29 /**
30 * A Queue class designed to operate in a multi-threaded environment, with
31 * added support for multiple "consumer" threads. Also offers blocking on
32 * the get() methods, which ensures the consumer waits until the queue
33 * actually contains some elements.
34 *
35 * @author $Author: tdb $
36 * @version $Id: Queue.java,v 1.23 2001/12/10 22:20:23 tdb Exp $
37 */
38 public class Queue {
39
40 //---FINAL ATTRIBUTES---
41
42 /**
43 * The current CVS revision of this class
44 */
45 public static final String REVISION = "$Revision: 1.23 $";
46
47 /**
48 * Pass to constructor to remove a RANDOM item from
49 * the Queue upon reaching the maximum limit.
50 */
51 public static final int RANDOM = 0;
52
53 /**
54 * Pass to constructor to remove the FIRST item from
55 * the Queue upon reaching the maximum limit.
56 */
57 public static final int FIRST = 1;
58
59 /**
60 * Pass to constructor to remove the LAST item from
61 * the Queue upon reaching the maximum limit.
62 */
63 public static final int LAST = 2;
64
65 /**
66 * Pass to constructor to drop the new item upon reaching
67 * the maximum Queue limit.
68 */
69 public static final int DROP = 3;
70
71 /**
72 * To allow opposite lookups.
73 */
74 public static final String[] algorithms = {"RANDOM", "FIRST", "LAST", "DROP"};
75
76 //---STATIC METHODS---
77
78 //---CONSTRUCTORS---
79
80 /**
81 * Constructs a new Queue with a maximum size limit on
82 * any individual queue. This should be used to stop
83 * conditions where the Queue cannot be guaranteed to
84 * be emptied as quick as it's filled.
85 *
86 * An algorithm will be used to remove data when new data
87 * arrives. There may be choices of algorithms later on.
88 *
89 * @param maxSize the upper limit for a queue
90 * @param removeAlgorithm the remove algorithm to use upon reaching the maxSize
91 */
92 public Queue(int maxSize, int removeAlgorithm) {
93 _maxSize = maxSize;
94 _removeAlgorithm = removeAlgorithm;
95 }
96
97 /**
98 * Constructs a Queue with no maximum size.
99 */
100 public Queue() {
101 _maxSize = -1;
102 }
103
104 //---PUBLIC METHODS---
105
106 /**
107 * This method adds a given object to every queue. It will notify
108 * any waiting consumers (on an empty queue) during this process.
109 *
110 * @param o An Object to be added to the queues.
111 */
112 public void add(Object o) {
113 for(int i=0; i < _lists.size(); i++) {
114 // skip over any gaps left in the list
115 if(_lists.get(i) != null) {
116 synchronized(((LinkedList) _lists.get(i))) {
117 // get size before adding to the Queue
118 int s = ((LinkedList) _lists.get(i)).size();
119 // check whether we need to remove an item from the current Queue
120 if(_maxSize!=-1 && s>=_maxSize && _removeAlgorithm!=DROP) {
121 // we need to remove an item
122 removeQueueItem((LinkedList) _lists.get(i));
123 }
124 // check if we should add (not if Queue full, and using DROP algorithm)
125 if(!(s>=_maxSize && _removeAlgorithm==DROP)) {
126 // add the next item, ensuring we lock
127 synchronized(this) {
128 // LinkedList.add() does the same thing, but this ensures behaviour
129 ((LinkedList) _lists.get(i)).addLast(o);
130 }
131 // if the queue was empty before the add it is possible
132 // that a consumer is waiting... so we notify them
133 //synchronized(((LinkedList) _lists.get(i))) {
134 ((LinkedList) _lists.get(i)).notifyAll();
135 //}
136 }
137 }
138 }
139 }
140 // we keep track of the total additions for the status() method
141 _count++;
142 }
143
144 /**
145 * This method returns an object from the front of a given queue.
146 * It will block until data exists in the queue if required.
147 *
148 * @param The queue to retrieve data from.
149 * @return The object from the front of the queue.
150 * @throws InvalidQueueException if the queue does not exist.
151 */
152 public Object get(int queue) throws InvalidQueueException {
153 // make sure queue exists
154 if (queue >= _lists.size() || _lists.get(queue) == null) {
155 throw new InvalidQueueException("Requested queue "+queue+" does not exist");
156 }
157 // block if the queue is empty
158 synchronized(((LinkedList) _lists.get(queue))) {
159 if (((LinkedList) _lists.get(queue)).size() == 0) {
160 try { ((LinkedList) _lists.get(queue)).wait(); } catch(Exception e) {}
161 }
162 }
163 // get an item, it should never be null due to the blocking above
164 Object o = null;
165 synchronized(this) {
166 try {
167 o = ((LinkedList) _lists.get(queue)).removeFirst();
168 }
169 catch (NoSuchElementException e) {
170 // This should never happen !
171 }
172 }
173 return o;
174 }
175
176 /**
177 * This method releases a get() method that's currently
178 * waiting on an empty queue. This was designed for
179 * shutdown() type methods that may have problems closing
180 * if the thread of control is waiting on a queue.
181 *
182 * @param queue the queue to release.
183 */
184 public void releaseQueue(int queue) {
185 synchronized(((LinkedList) _lists.get(queue))) {
186 ((LinkedList) _lists.get(queue)).notifyAll();
187 }
188 }
189
190 /**
191 * This method erases the contents of a given queue. This
192 * method should be used with care. It can only empty one
193 * internal queue, not all of them. This must be called
194 * multiple times to empty all queues.
195 *
196 * @param queue the queue to empty.
197 */
198 public void clearQueue(int queue) {
199 synchronized(this) {
200 ((LinkedList) _lists.get(queue)).clear();
201 }
202 }
203
204 /**
205 * This method returns an XML textual status of the queues.
206 * It is merely for observation, and would most likely be
207 * used by a larger "monitoring" component. Information
208 * returned includes the current size of each queue, and
209 * the total items passed through.
210 *
211 * @return A String message containing status information in XML format
212 */
213 public String xmlStatus() {
214 String status = "<queue ";
215 for(int i=0; i < _lists.size(); i++) {
216 // check for null entries
217 if(_lists.get(i) != null) {
218 status += "queue"+i+"=\""+((LinkedList) _lists.get(i)).size()+"\" ";
219 }
220 else {
221 status += "queue"+i+"=\"[deleted]\" ";
222 }
223 }
224 status += "total=\""+_count+"\"";
225 if(_maxSize != -1) {
226 status += " maxSize=\""+_maxSize+"\"";
227 }
228 status += "></queue>";
229 return status;
230 }
231
232 /**
233 * Returns the size of a given queue. A consumer can use
234 * this to see how big their queue is at any given time.
235 * they should use their queue number as the parameter.
236 *
237 * @param queue The queue number to query.
238 * @return the current size of the queue.
239 * @throws InvalidQueueException if the queue does not exist.
240 */
241 public int queueSize(int queue) throws InvalidQueueException {
242 if (queue >= _lists.size() || _lists.get(queue) == null) {
243 throw new InvalidQueueException("Requested queue "+queue+" does not exist");
244 }
245 return ((LinkedList) _lists.get(queue)).size();
246 }
247
248 /**
249 * Returns the total numer of elements to have passed
250 * through this queue. This is essentially a counter
251 * on the add method.
252 *
253 * @return the element-ometer.
254 */
255 public int elementCount() {
256 return _count;
257 }
258
259 /**
260 * This method assigns a queue to a consumer. The idea behind
261 * this is to ensure that only 1 consumer can be associated with
262 * a given queue, otherwise the whole "blocking" thing fails
263 * miserably. Queues are created upon requested.
264 *
265 * It is IMPORTANT that removeQueue() is used when the queue is
266 * no longer required.
267 *
268 * @return An integer to be passed to the get() method.
269 */
270 public synchronized int getQueue() {
271 int pos = -1;
272 for(int i=0; i < _lists.size(); i++) {
273 if(_lists.get(i) == null) {
274 // found a gap, re-use it
275 pos = i;
276 _lists.set(i, new LinkedList());
277 }
278 }
279 if(pos == -1) {
280 //we didn't find a gap, add at end
281 pos = _lists.size();
282 _lists.add(pos, new LinkedList());
283 }
284 return pos;
285 }
286
287 /**
288 * This method sets a entry to null in the list. This ensures
289 * that it will no longer be added to after it is no longer
290 * required be a consumer.
291 *
292 * @param queue The integer identifier for the queue, given by getQueue().
293 */
294 public synchronized void removeQueue(int queue) {
295 _lists.set(queue, null);
296 }
297
298 /**
299 * Start a monitor on our own Queue. This will log XML
300 * statistics about our Queue to a given Queue (could be
301 * the one being monitored).
302 *
303 * @param interval The long interval, in milliseconds, at which to take samples
304 * @param destQueue The queue to monitor to
305 * @param name A name to identify this Queue with
306 * @return whether we succeeded
307 */
308 public boolean startMonitor(long interval, Queue destQueue, String name) {
309 if(_queueMon == null) {
310 // start a monitor
311 _queueMon = new QueueMonitor(this, destQueue, interval, name);
312 _queueMon.start();
313 return true;
314 }
315 else {
316 // already have a monitor running
317 return false;
318 }
319 }
320
321 /**
322 * Start a monitor on our own Queue. This will log XML
323 * statistics about our Queue to this Queue.
324 *
325 * @param interval The long interval, in milliseconds, at which to take samples
326 * @param name A name to identify this Queue with
327 * @return whether we succeeded
328 */
329 public boolean startMonitor(long interval, String name) {
330 return startMonitor(interval, this, name);
331 }
332
333 /**
334 * Stop a monitor on our Queue if we have on running.
335 *
336 * @return whether we succeeded
337 */
338 public boolean stopMonitor() {
339 if(_queueMon != null) {
340 // stop a monitor
341 _queueMon.shutdown();
342 _queueMon = null;
343 return true;
344 }
345 else {
346 // no monitor running
347 return false;
348 }
349 }
350
351 /**
352 * Overrides the {@link java.lang.Object#toString() Object.toString()}
353 * method to provide clean logging (every class should have this).
354 *
355 * This uses the uk.org.iscream.cms.server.util.FormatName class
356 * to format the toString()
357 *
358 * @return the name of this class and its CVS revision.
359 */
360 public String toString() {
361 return FormatName.getName(
362 _name,
363 getClass().getName(),
364 REVISION);
365 }
366
367 //---PRIVATE METHODS---
368
369 /**
370 * This method removes an item from a Queue, using a method
371 * requested at construction.
372 *
373 * @param list The LinkedList from which to remove an item.
374 */
375 private void removeQueueItem(LinkedList list) {
376 // look at our algorithm
377 // remove a random item from the list
378 if(_removeAlgorithm==RANDOM) {
379 // new Random, with a good seed
380 Random rand = new Random(System.currentTimeMillis());
381 int i = rand.nextInt(_maxSize);
382 synchronized(this) {
383 list.remove(i);
384 }
385 }
386 // remove the first item from the list
387 else if(_removeAlgorithm==FIRST) {
388 synchronized(this) {
389 list.removeFirst();
390 }
391 }
392 // remove the last item from the list
393 else if(_removeAlgorithm==LAST) {
394 synchronized(this) {
395 list.removeLast();
396 }
397 }
398 }
399
400 //---ACCESSOR/MUTATOR METHODS---
401
402 //---ATTRIBUTES---
403
404 /**
405 * The LinkedLists of queues.
406 */
407 private LinkedList _lists = new LinkedList();
408
409 /**
410 * A counter so we know how many data items have been
411 * passed through, for statistical purposes.
412 */
413 private int _count = 0;
414
415 /**
416 * A reference to our QueueMonitor, if we have one.
417 */
418 private QueueMonitor _queueMon = null;
419
420 /**
421 * The maximum size of any Queue.
422 */
423 private int _maxSize = -1;
424
425 /**
426 * The remove algorithm to use upon a Queue reaching
427 * it's maximum size.
428 */
429 private int _removeAlgorithm = -1;
430
431 /**
432 * This is the friendly identifier of the
433 * component this class is running in.
434 * eg, a Filter may be called "filter1",
435 * If this class does not have an owning
436 * component, a name from the configuration
437 * can be placed here. This name could also
438 * be changed to null for utility classes.
439 */
440 private String _name = null;
441
442 //---STATIC ATTRIBUTES---
443
444 }