ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/cms/source/util/uk/org/iscream/cms/util/Queue.java
Revision: 1.20
Committed: Mon Mar 26 17:59:47 2001 UTC (23 years, 1 month ago) by tdb
Branch: MAIN
CVS Tags: PROJECT_COMPLETION
Changes since 1.19: +6 -8 lines
Log Message:
Some further checking to solve deadlock issues.
Firstly when checking if the queue is "full", we now do a check for "greater
than or equal" to the limit, rather than just "equal". If for any reason it
does get over the limit, it'll still get caught.
Secondly we no longer check if the queue is of size 0 to do the notify, we just
do it anyway. We figure this is probably more efficient than trying to do
a massive synchronization block.

File Contents

# Content
1 //---PACKAGE DECLARATION---
2 package uk.org.iscream.util;
3
4 //---IMPORTS---
5 import java.util.LinkedList;
6 import java.util.NoSuchElementException;
7 import java.util.Random;
8 import uk.org.iscream.util.*;
9
10 /**
11 * A Queue class designed to operate in a multi-threaded environment, with
12 * added support for multiple "consumer" threads. Also offers blocking on
13 * the get() methods, which ensures the consumer waits until the queue
14 * actually contains some elements.
15 *
16 * @author $Author: tdb1 $
17 * @version $Id: Queue.java,v 1.19 2001/03/22 18:23:27 tdb1 Exp $
18 */
19 public class Queue {
20
21 //---FINAL ATTRIBUTES---
22
23 /**
24 * The current CVS revision of this class
25 */
26 public static final String REVISION = "$Revision: 1.19 $";
27
28 /**
29 * Pass to constructor to remove a RANDOM item from
30 * the Queue upon reaching the maximum limit.
31 */
32 public static final int RANDOM = 0;
33
34 /**
35 * Pass to constructor to remove the FIRST item from
36 * the Queue upon reaching the maximum limit.
37 */
38 public static final int FIRST = 1;
39
40 /**
41 * Pass to constructor to remove the LAST item from
42 * the Queue upon reaching the maximum limit.
43 */
44 public static final int LAST = 2;
45
46 /**
47 * Pass to constructor to drop the new item upon reaching
48 * the maximum Queue limit.
49 */
50 public static final int DROP = 3;
51
52 /**
53 * To allow opposite lookups.
54 */
55 public static final String[] algorithms = {"RANDOM", "FIRST", "LAST", "DROP"};
56
57 //---STATIC METHODS---
58
59 //---CONSTRUCTORS---
60
61 /**
62 * Constructs a new Queue with a maximum size limit on
63 * any individual queue. This should be used to stop
64 * conditions where the Queue cannot be guaranteed to
65 * be emptied as quick as it's filled.
66 *
67 * An algorithm will be used to remove data when new data
68 * arrives. There may be choices of algorithms later on.
69 *
70 * @param maxSize the upper limit for a queue
71 * @param removeAlgorithm the remove algorithm to use upon reaching the maxSize
72 */
73 public Queue(int maxSize, int removeAlgorithm) {
74 _maxSize = maxSize;
75 _removeAlgorithm = removeAlgorithm;
76 }
77
78 /**
79 * Constructs a Queue with no maximum size.
80 */
81 public Queue() {
82 _maxSize = -1;
83 }
84
85 //---PUBLIC METHODS---
86
87 /**
88 * This method adds a given object to every queue. It will notify
89 * any waiting consumers (on an empty queue) during this process.
90 *
91 * @param o An Object to be added to the queues.
92 */
93 public void add(Object o) {
94 for(int i=0; i < _lists.size(); i++) {
95 // skip over any gaps left in the list
96 if(_lists.get(i) != null) {
97 // get size before adding to the Queue
98 int s = ((LinkedList) _lists.get(i)).size();
99 // check whether we need to remove an item from the current Queue
100 if(_maxSize!=-1 && s>=_maxSize && _removeAlgorithm!=DROP) {
101 // we need to remove an item
102 removeQueueItem((LinkedList) _lists.get(i));
103 }
104 // check if we should add (not if Queue full, and using DROP algorithm)
105 if(!(s>=_maxSize && _removeAlgorithm==DROP)) {
106 // add the next item, ensuring we lock
107 synchronized(this) {
108 // LinkedList.add() does the same thing, but this ensures behaviour
109 ((LinkedList) _lists.get(i)).addLast(o);
110 }
111 }
112 // if the queue was empty before the add it is possible
113 // that a consumer is waiting... so we notify them
114 synchronized(((LinkedList) _lists.get(i))) {
115 ((LinkedList) _lists.get(i)).notifyAll();
116 }
117 }
118 }
119 // we keep track of the total additions for the status() method
120 _count++;
121 }
122
123 /**
124 * This method returns an object from the front of a given queue.
125 * It will block until data exists in the queue if required.
126 *
127 * @param The queue to retrieve data from.
128 * @return The object from the front of the queue.
129 * @throws InvalidQueueException if the queue does not exist.
130 */
131 public Object get(int queue) throws InvalidQueueException {
132 // make sure queue exists
133 if (queue >= _lists.size() || _lists.get(queue) == null) {
134 throw new InvalidQueueException("Requested queue "+queue+" does not exist");
135 }
136 // block if the queue is empty
137 synchronized(((LinkedList) _lists.get(queue))) {
138 if (((LinkedList) _lists.get(queue)).size() == 0) {
139 try { ((LinkedList) _lists.get(queue)).wait(); } catch(Exception e) {}
140 }
141 }
142 // get an item, it should never be null due to the blocking above
143 Object o = null;
144 synchronized(this) {
145 try {
146 o = ((LinkedList) _lists.get(queue)).removeFirst();
147 }
148 catch (NoSuchElementException e) {
149 // This should never happen !
150 }
151 }
152 return o;
153 }
154
155 /**
156 * This method releases a get() method that's currently
157 * waiting on an empty queue. This was designed for
158 * shutdown() type methods that may have problems closing
159 * if the thread of control is waiting on a queue.
160 *
161 * @param queue the queue to release.
162 */
163 public void releaseQueue(int queue) {
164 synchronized(((LinkedList) _lists.get(queue))) {
165 ((LinkedList) _lists.get(queue)).notifyAll();
166 }
167 }
168
169 /**
170 * This method erases the contents of a given queue. This
171 * method should be used with care. It can only empty one
172 * internal queue, not all of them. This must be called
173 * multiple times to empty all queues.
174 *
175 * @param queue the queue to empty.
176 */
177 public void clearQueue(int queue) {
178 synchronized(this) {
179 ((LinkedList) _lists.get(queue)).clear();
180 }
181 }
182
183 /**
184 * This method returns an XML textual status of the queues.
185 * It is merely for observation, and would most likely be
186 * used by a larger "monitoring" component. Information
187 * returned includes the current size of each queue, and
188 * the total items passed through.
189 *
190 * @return A String message containing status information in XML format
191 */
192 public String xmlStatus() {
193 String status = "<queue ";
194 for(int i=0; i < _lists.size(); i++) {
195 // check for null entries
196 if(_lists.get(i) != null) {
197 status += "queue"+i+"=\""+((LinkedList) _lists.get(i)).size()+"\" ";
198 }
199 else {
200 status += "queue"+i+"=\"[deleted]\" ";
201 }
202 }
203 status += "total=\""+_count+"\"";
204 if(_maxSize != -1) {
205 status += " maxSize=\""+_maxSize+"\"";
206 }
207 status += "></queue>";
208 return status;
209 }
210
211 /**
212 * Returns the size of a given queue. A consumer can use
213 * this to see how big their queue is at any given time.
214 * they should use their queue number as the parameter.
215 *
216 * @param queue The queue number to query.
217 * @return the current size of the queue.
218 * @throws InvalidQueueException if the queue does not exist.
219 */
220 public int queueSize(int queue) throws InvalidQueueException {
221 if (queue >= _lists.size() || _lists.get(queue) == null) {
222 throw new InvalidQueueException("Requested queue "+queue+" does not exist");
223 }
224 return ((LinkedList) _lists.get(queue)).size();
225 }
226
227 /**
228 * Returns the total numer of elements to have passed
229 * through this queue (ie. a counter on the add method).
230 *
231 * @return the element-ometer.
232 */
233 public int elementCount() {
234 return _count;
235 }
236
237 /**
238 * This method assigns a queue to a consumer. The idea behind
239 * this is to ensure that only 1 consumer can be associated with
240 * a given queue, otherwise the whole "blocking" thing fails
241 * miserably. Queues are created upon requested.
242 *
243 * It is IMPORTANT that removeQueue() is used when the queue is
244 * no longer required.
245 *
246 * @return An integer to be passed to the get() method.
247 */
248 public synchronized int getQueue() {
249 int pos = -1;
250 for(int i=0; i < _lists.size(); i++) {
251 if(_lists.get(i) == null) {
252 // found a gap, re-use it
253 pos = i;
254 _lists.set(i, new LinkedList());
255 }
256 }
257 if(pos == -1) {
258 //we didn't find a gap, add at end
259 pos = _lists.size();
260 _lists.add(pos, new LinkedList());
261 }
262 return pos;
263 }
264
265 /**
266 * This method sets a entry to null in the list. This ensures
267 * that it will no longer be added to after it is no longer
268 * required be a consumer.
269 *
270 * @param queue The integer identifier for the queue, given by getQueue().
271 */
272 public synchronized void removeQueue(int queue) {
273 _lists.set(queue, null);
274 }
275
276 /**
277 * Start a monitor on our own Queue. This will log XML
278 * statistics about our Queue to a given Queue (could be
279 * the one being monitored).
280 *
281 * @param interval The long interval, in milliseconds, at which to take samples
282 * @param destQueue The queue to monitor to
283 * @param name A name to identify this Queue with
284 * @return whether we succeeded
285 */
286 public boolean startMonitor(long interval, Queue destQueue, String name) {
287 if(_queueMon == null) {
288 // start a monitor
289 _queueMon = new QueueMonitor(this, destQueue, interval, name);
290 _queueMon.start();
291 return true;
292 }
293 else {
294 // already have a monitor running
295 return false;
296 }
297 }
298
299 /**
300 * Start a monitor on our own Queue. This will log XML
301 * statistics about our Queue to this Queue.
302 *
303 * @param interval The long interval, in milliseconds, at which to take samples
304 * @param name A name to identify this Queue with
305 * @return whether we succeeded
306 */
307 public boolean startMonitor(long interval, String name) {
308 return startMonitor(interval, this, name);
309 }
310
311 /**
312 * Stop a monitor on our Queue if we have on running.
313 *
314 * @return whether we succeeded
315 */
316 public boolean stopMonitor() {
317 if(_queueMon != null) {
318 // stop a monitor
319 _queueMon.shutdown();
320 _queueMon = null;
321 return true;
322 }
323 else {
324 // no monitor running
325 return false;
326 }
327 }
328
329 /**
330 * Overrides the {@link java.lang.Object#toString() Object.toString()}
331 * method to provide clean logging (every class should have this).
332 *
333 * This uses the uk.org.iscream.util.FormatName class
334 * to format the toString()
335 *
336 * @return the name of this class and its CVS revision.
337 */
338 public String toString() {
339 return FormatName.getName(
340 _name,
341 getClass().getName(),
342 REVISION);
343 }
344
345 //---PRIVATE METHODS---
346
347 /**
348 * This method removes an item from a Queue, using a method
349 * requested at construction.
350 *
351 * @param list The LinkedList from which to remove an item.
352 */
353 private void removeQueueItem(LinkedList list) {
354 // look at our algorithm
355 // remove a random item from the list
356 if(_removeAlgorithm==RANDOM) {
357 // new Random, with a good seed
358 Random rand = new Random(System.currentTimeMillis());
359 int i = rand.nextInt(_maxSize);
360 synchronized(this) {
361 list.remove(i);
362 }
363 }
364 // remove the first item from the list
365 else if(_removeAlgorithm==FIRST) {
366 synchronized(this) {
367 list.removeFirst();
368 }
369 }
370 // remove the last item from the list
371 else if(_removeAlgorithm==LAST) {
372 synchronized(this) {
373 list.removeLast();
374 }
375 }
376 }
377
378 //---ACCESSOR/MUTATOR METHODS---
379
380 //---ATTRIBUTES---
381
382 /**
383 * The LinkedLists of queues.
384 */
385 private LinkedList _lists = new LinkedList();
386
387 /**
388 * A counter so we know how many data items have been
389 * passed through, for statistical purposes.
390 */
391 private int _count = 0;
392
393 /**
394 * A reference to our QueueMonitor, if we have one.
395 */
396 private QueueMonitor _queueMon = null;
397
398 /**
399 * The maximum size of any Queue.
400 */
401 private int _maxSize = -1;
402
403 /**
404 * The remove algorithm to use upon a Queue reaching
405 * it's maximum size.
406 */
407 private int _removeAlgorithm = -1;
408
409 /**
410 * This is the friendly identifier of the
411 * component this class is running in.
412 * eg, a Filter may be called "filter1",
413 * If this class does not have an owning
414 * component, a name from the configuration
415 * can be placed here. This name could also
416 * be changed to null for utility classes.
417 */
418 private String _name = null;
419
420 //---STATIC ATTRIBUTES---
421
422 }