ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/cms/source/util/uk/org/iscream/cms/util/Queue.java
Revision: 1.26
Committed: Wed Feb 5 14:27:59 2003 UTC (21 years, 2 months ago) by tdb
Branch: MAIN
Changes since 1.25: +4 -4 lines
Log Message:
Util package has been pulled out of the server. Next step will be to modify
the server and conient (and anything else?) to use this instead. New
package name is uk.org.iscream.cms.util. All the java files were moved with
a repo copy, so they retain their history.

File Contents

# Content
1 /*
2 * i-scream central monitoring system
3 * http://www.i-scream.org.uk
4 * Copyright (C) 2000-2002 i-scream
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version 2
9 * of the License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
19 */
20
21 //---PACKAGE DECLARATION---
22 package uk.org.iscream.cms.util;
23
24 //---IMPORTS---
25 import java.util.LinkedList;
26 import java.util.NoSuchElementException;
27 import java.util.Random;
28 //import uk.org.iscream.cms.util.*;
29
30 /**
31 * A Queue class designed to operate in a multi-threaded environment, with
32 * added support for multiple "consumer" threads. Also offers blocking on
33 * the get() methods, which ensures the consumer waits until the queue
34 * actually contains some elements.
35 *
36 * @author $Author: tdb $
37 * @version $Id: Queue.java,v 1.25 2002/05/21 16:47:19 tdb Exp $
38 */
39 public class Queue {
40
41 //---FINAL ATTRIBUTES---
42
43 /**
44 * The current CVS revision of this class
45 */
46 public static final String REVISION = "$Revision: 1.25 $";
47
48 /**
49 * Pass to constructor to remove a RANDOM item from
50 * the Queue upon reaching the maximum limit.
51 */
52 public static final int RANDOM = 0;
53
54 /**
55 * Pass to constructor to remove the FIRST item from
56 * the Queue upon reaching the maximum limit.
57 */
58 public static final int FIRST = 1;
59
60 /**
61 * Pass to constructor to remove the LAST item from
62 * the Queue upon reaching the maximum limit.
63 */
64 public static final int LAST = 2;
65
66 /**
67 * Pass to constructor to drop the new item upon reaching
68 * the maximum Queue limit.
69 */
70 public static final int DROP = 3;
71
72 /**
73 * To allow opposite lookups.
74 */
75 public static final String[] algorithms = {"RANDOM", "FIRST", "LAST", "DROP"};
76
77 //---STATIC METHODS---
78
79 //---CONSTRUCTORS---
80
81 /**
82 * Constructs a new Queue with a maximum size limit on
83 * any individual queue. This should be used to stop
84 * conditions where the Queue cannot be guaranteed to
85 * be emptied as quick as it's filled.
86 *
87 * An algorithm will be used to remove data when new data
88 * arrives. There may be choices of algorithms later on.
89 *
90 * @param maxSize the upper limit for a queue
91 * @param removeAlgorithm the remove algorithm to use upon reaching the maxSize
92 */
93 public Queue(int maxSize, int removeAlgorithm) {
94 _maxSize = maxSize;
95 _removeAlgorithm = removeAlgorithm;
96 }
97
98 /**
99 * Constructs a Queue with no maximum size.
100 */
101 public Queue() {
102 _maxSize = -1;
103 }
104
105 //---PUBLIC METHODS---
106
107 /**
108 * This method adds a given object to every queue. It will notify
109 * any waiting consumers (on an empty queue) during this process.
110 *
111 * @param o An Object to be added to the queues.
112 */
113 public void add(Object o) {
114 for(int i=0; i < _lists.size(); i++) {
115 // skip over any gaps left in the list
116 if(_lists.get(i) != null) {
117 synchronized(((LinkedList) _lists.get(i))) {
118 // get size before adding to the Queue
119 int s = ((LinkedList) _lists.get(i)).size();
120 // check whether we need to remove an item from the current Queue
121 if(_maxSize!=-1 && s>=_maxSize && _removeAlgorithm!=DROP) {
122 // we need to remove an item
123 removeQueueItem((LinkedList) _lists.get(i));
124 }
125 // check if we should add (not if Queue full, and using DROP algorithm)
126 if(!(s>=_maxSize && _removeAlgorithm==DROP)) {
127 // add the next item, ensuring we lock
128 synchronized(this) {
129 // LinkedList.add() does the same thing, but this ensures behaviour
130 ((LinkedList) _lists.get(i)).addLast(o);
131 }
132 // if the queue was empty before the add it is possible
133 // that a consumer is waiting... so we notify them
134 //synchronized(((LinkedList) _lists.get(i))) {
135 ((LinkedList) _lists.get(i)).notifyAll();
136 //}
137 }
138 }
139 }
140 }
141 // we keep track of the total additions for the status() method
142 _count++;
143 }
144
145 /**
146 * This method returns an object from the front of a given queue.
147 * It will block until data exists in the queue if required.
148 *
149 * @param The queue to retrieve data from.
150 * @return The object from the front of the queue.
151 * @throws InvalidQueueException if the queue does not exist.
152 */
153 public Object get(int queue) throws InvalidQueueException {
154 // make sure queue exists
155 if (queue >= _lists.size() || _lists.get(queue) == null) {
156 throw new InvalidQueueException("Requested queue "+queue+" does not exist");
157 }
158 // block if the queue is empty
159 synchronized(((LinkedList) _lists.get(queue))) {
160 if (((LinkedList) _lists.get(queue)).size() == 0) {
161 try { ((LinkedList) _lists.get(queue)).wait(); } catch(Exception e) {}
162 }
163 }
164 // get an item, it should never be null due to the blocking above
165 Object o = null;
166 synchronized(this) {
167 try {
168 o = ((LinkedList) _lists.get(queue)).removeFirst();
169 }
170 catch (NoSuchElementException e) {
171 // This should never happen !
172 }
173 }
174 return o;
175 }
176
177 /**
178 * This method releases a get() method that's currently
179 * waiting on an empty queue. This was designed for
180 * shutdown() type methods that may have problems closing
181 * if the thread of control is waiting on a queue.
182 *
183 * @param queue the queue to release.
184 */
185 public void releaseQueue(int queue) {
186 synchronized(((LinkedList) _lists.get(queue))) {
187 ((LinkedList) _lists.get(queue)).notifyAll();
188 }
189 }
190
191 /**
192 * This method erases the contents of a given queue. This
193 * method should be used with care. It can only empty one
194 * internal queue, not all of them. This must be called
195 * multiple times to empty all queues.
196 *
197 * @param queue the queue to empty.
198 */
199 public void clearQueue(int queue) {
200 synchronized(this) {
201 ((LinkedList) _lists.get(queue)).clear();
202 }
203 }
204
205 /**
206 * This method returns an XML textual status of the queues.
207 * It is merely for observation, and would most likely be
208 * used by a larger "monitoring" component. Information
209 * returned includes the current size of each queue, and
210 * the total items passed through.
211 *
212 * @return A String message containing status information in XML format
213 */
214 public String xmlStatus() {
215 String status = "<queue ";
216 for(int i=0; i < _lists.size(); i++) {
217 // check for null entries
218 if(_lists.get(i) != null) {
219 status += "queue"+i+"=\""+((LinkedList) _lists.get(i)).size()+"\" ";
220 }
221 else {
222 status += "queue"+i+"=\"[deleted]\" ";
223 }
224 }
225 status += "total=\""+_count+"\"";
226 if(_maxSize != -1) {
227 status += " maxSize=\""+_maxSize+"\"";
228 }
229 status += "></queue>";
230 return status;
231 }
232
233 /**
234 * Returns the size of a given queue. A consumer can use
235 * this to see how big their queue is at any given time.
236 * they should use their queue number as the parameter.
237 *
238 * @param queue The queue number to query.
239 * @return the current size of the queue.
240 * @throws InvalidQueueException if the queue does not exist.
241 */
242 public int queueSize(int queue) throws InvalidQueueException {
243 if (queue >= _lists.size() || _lists.get(queue) == null) {
244 throw new InvalidQueueException("Requested queue "+queue+" does not exist");
245 }
246 return ((LinkedList) _lists.get(queue)).size();
247 }
248
249 /**
250 * Returns the total numer of elements to have passed
251 * through this queue. This is essentially a counter
252 * on the add method.
253 *
254 * @return the element-ometer.
255 */
256 public int elementCount() {
257 return _count;
258 }
259
260 /**
261 * This method assigns a queue to a consumer. The idea behind
262 * this is to ensure that only 1 consumer can be associated with
263 * a given queue, otherwise the whole "blocking" thing fails
264 * miserably. Queues are created upon requested.
265 *
266 * It is IMPORTANT that removeQueue() is used when the queue is
267 * no longer required.
268 *
269 * @return An integer to be passed to the get() method.
270 */
271 public synchronized int getQueue() {
272 int pos = -1;
273 for(int i=0; i < _lists.size(); i++) {
274 if(_lists.get(i) == null) {
275 // found a gap, re-use it
276 pos = i;
277 _lists.set(i, new LinkedList());
278 }
279 }
280 if(pos == -1) {
281 //we didn't find a gap, add at end
282 pos = _lists.size();
283 _lists.add(pos, new LinkedList());
284 }
285 return pos;
286 }
287
288 /**
289 * This method sets a entry to null in the list. This ensures
290 * that it will no longer be added to after it is no longer
291 * required be a consumer.
292 *
293 * @param queue The integer identifier for the queue, given by getQueue().
294 */
295 public synchronized void removeQueue(int queue) {
296 _lists.set(queue, null);
297 }
298
299 /**
300 * Start a monitor on our own Queue. This will log XML
301 * statistics about our Queue to a given Queue (could be
302 * the one being monitored).
303 *
304 * @param interval The long interval, in milliseconds, at which to take samples
305 * @param destQueue The queue to monitor to
306 * @param name A name to identify this Queue with
307 * @return whether we succeeded
308 */
309 public boolean startMonitor(long interval, Queue destQueue, String name) {
310 if(_queueMon == null) {
311 // start a monitor
312 _queueMon = new QueueMonitor(this, destQueue, interval, name);
313 _queueMon.start();
314 return true;
315 }
316 else {
317 // already have a monitor running
318 return false;
319 }
320 }
321
322 /**
323 * Start a monitor on our own Queue. This will log XML
324 * statistics about our Queue to this Queue.
325 *
326 * @param interval The long interval, in milliseconds, at which to take samples
327 * @param name A name to identify this Queue with
328 * @return whether we succeeded
329 */
330 public boolean startMonitor(long interval, String name) {
331 return startMonitor(interval, this, name);
332 }
333
334 /**
335 * Stop a monitor on our Queue if we have on running.
336 *
337 * @return whether we succeeded
338 */
339 public boolean stopMonitor() {
340 if(_queueMon != null) {
341 // stop a monitor
342 _queueMon.shutdown();
343 _queueMon = null;
344 return true;
345 }
346 else {
347 // no monitor running
348 return false;
349 }
350 }
351
352 /**
353 * Overrides the {@link java.lang.Object#toString() Object.toString()}
354 * method to provide clean logging (every class should have this).
355 *
356 * This uses the uk.org.iscream.cms.server.util.FormatName class
357 * to format the toString()
358 *
359 * @return the name of this class and its CVS revision.
360 */
361 public String toString() {
362 return FormatName.getName(
363 _name,
364 getClass().getName(),
365 REVISION);
366 }
367
368 //---PRIVATE METHODS---
369
370 /**
371 * This method removes an item from a Queue, using a method
372 * requested at construction.
373 *
374 * @param list The LinkedList from which to remove an item.
375 */
376 private void removeQueueItem(LinkedList list) {
377 // look at our algorithm
378 // remove a random item from the list
379 if(_removeAlgorithm==RANDOM) {
380 // new Random, with a good seed
381 Random rand = new Random(System.currentTimeMillis());
382 int i = rand.nextInt(_maxSize);
383 synchronized(this) {
384 list.remove(i);
385 }
386 }
387 // remove the first item from the list
388 else if(_removeAlgorithm==FIRST) {
389 synchronized(this) {
390 list.removeFirst();
391 }
392 }
393 // remove the last item from the list
394 else if(_removeAlgorithm==LAST) {
395 synchronized(this) {
396 list.removeLast();
397 }
398 }
399 }
400
401 //---ACCESSOR/MUTATOR METHODS---
402
403 //---ATTRIBUTES---
404
405 /**
406 * The LinkedLists of queues.
407 */
408 private LinkedList _lists = new LinkedList();
409
410 /**
411 * A counter so we know how many data items have been
412 * passed through, for statistical purposes.
413 */
414 private int _count = 0;
415
416 /**
417 * A reference to our QueueMonitor, if we have one.
418 */
419 private QueueMonitor _queueMon = null;
420
421 /**
422 * The maximum size of any Queue.
423 */
424 private int _maxSize = -1;
425
426 /**
427 * The remove algorithm to use upon a Queue reaching
428 * it's maximum size.
429 */
430 private int _removeAlgorithm = -1;
431
432 /**
433 * This is the friendly identifier of the
434 * component this class is running in.
435 * eg, a Filter may be called "filter1",
436 * If this class does not have an owning
437 * component, a name from the configuration
438 * can be placed here. This name could also
439 * be changed to null for utility classes.
440 */
441 private String _name = null;
442
443 //---STATIC ATTRIBUTES---
444
445 }