ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/cms/source/util/uk/org/iscream/cms/util/Queue.java
Revision: 1.14
Committed: Thu Mar 1 01:05:49 2001 UTC (23 years, 2 months ago) by tdb
Branch: MAIN
Changes since 1.13: +112 -7 lines
Log Message:
Finally implemented maximum size on Queue's. This is an optional feature, and
is only used if the correct constructor is called.

For normal use (with no limit), just use the empty constructor;
    new Queue()

For using a limit, use the following constructor;
    new Queue(int maxSize, int removeAlgorithm)
Where;
    maxSize = the maximum size of a queue
    removeAlgorithm = Queue.RANDOM, Queue.FIRST, Queue.LAST, or Queue.DROP

Queue.RANDOM = Remove a RANDOM item from a full queue when adding.
Queue.FIRST  = Remove the FIRST item from a full queue when adding.
Queue.LAST   = Remove the LAST item from a full queue when adding.
Queue.DROP   = Don't remove, just drop the new item when adding.

File Contents

# Content
1 //---PACKAGE DECLARATION---
2 package uk.ac.ukc.iscream.util;
3
4 //---IMPORTS---
5 import java.util.LinkedList;
6 import java.util.NoSuchElementException;
7 import java.util.Random;
8 import uk.ac.ukc.iscream.util.*;
9
10 /**
11 * A Queue class designed to operate in a multi-threaded environment, with
12 * added support for multiple "consumer" threads. Also offers blocking on
13 * the get() methods, which ensures the consumer waits until the queue
14 * actually contains some elements.
15 *
16 * @author $Author: tdb1 $
17 * @version $Id: Queue.java,v 1.13 2001/02/25 23:29:06 tdb1 Exp $
18 */
19 public class Queue {
20
21 //---FINAL ATTRIBUTES---
22
23 /**
24 * The current CVS revision of this class
25 */
26 public static final String REVISION = "$Revision: 1.13 $";
27
28 /**
29 * Pass to constructor to remove a RANDOM item from
30 * the Queue upon reaching the maximum limit.
31 */
32 public static final int RANDOM = 0;
33
34 /**
35 * Pass to constructor to remove the FIRST item from
36 * the Queue upon reaching the maximum limit.
37 */
38 public static final int FIRST = 1;
39
40 /**
41 * Pass to constructor to remove the LAST item from
42 * the Queue upon reaching the maximum limit.
43 */
44 public static final int LAST = 2;
45
46 /**
47 * Pass to constructor to drop the new item upon reaching
48 * the maximum Queue limit.
49 */
50 public static final int DROP = 3;
51
52 //---STATIC METHODS---
53
54 //---CONSTRUCTORS---
55
56 /**
57 * Constructs a new Queue with a maximum size limit on
58 * any individual queue. This should be used to stop
59 * conditions where the Queue cannot be guaranteed to
60 * be emptied as quick as it's filled.
61 *
62 * An algorithm will be used to remove data when new data
63 * arrives. There may be choices of algorithms later on.
64 *
65 * @param maxSize the upper limit for a queue
66 * @param removeAlgorithm the remove algorithm to use upon reaching the maxSize
67 */
68 public Queue(int maxSize, int removeAlgorithm) {
69 _maxSize = maxSize;
70 _removeAlgorithm = removeAlgorithm;
71 }
72
73 /**
74 * Constructs a Queue with no maximum size.
75 */
76 public Queue() {
77 _maxSize = -1;
78 }
79
80 //---PUBLIC METHODS---
81
82 /**
83 * This method adds a given object to every queue. It will notify
84 * any waiting consumers (on an empty queue) during this process.
85 *
86 * @param o An Object to be added to the queues.
87 */
88 public void add(Object o) {
89 for(int i=0; i < _lists.size(); i++) {
90 // skip over any gaps left in the list
91 if(_lists.get(i) != null) {
92 // get size before adding to the Queue
93 int s = ((LinkedList) _lists.get(i)).size();
94 // check whether we need to remove an item from the current Queue
95 if(_maxSize!=-1 && s==_maxSize && _removeAlgorithm!=DROP) {
96 // we need to remove an item
97 removeQueueItem((LinkedList) _lists.get(i));
98 }
99 // check if we should add (not if Queue full, and using DROP algorithm)
100 if(!(s==_maxSize && _removeAlgorithm==DROP)) {
101 // add the next item, ensuring we lock
102 synchronized(this) {
103 // LinkedList.add() does the same thing, but this ensures behaviour
104 ((LinkedList) _lists.get(i)).addLast(o);
105 }
106 }
107 // if the queue was empty before the add it is possible
108 // that a consumer is waiting... so we notify them
109 if (s == 0) {
110 synchronized(((LinkedList) _lists.get(i))) {
111 ((LinkedList) _lists.get(i)).notifyAll();
112 }
113 }
114 }
115 }
116 // we keep track of the total additions for the status() method
117 _count++;
118 }
119
120 /**
121 * This method returns an object from the front of a given queue.
122 * It will block until data exists in the queue if required.
123 *
124 * @param The queue to retrieve data from.
125 * @return The object from the front of the queue.
126 * @throws InvalidQueueException if the queue does not exist.
127 */
128 public Object get(int queue) throws InvalidQueueException {
129 // make sure queue exists
130 if (queue >= _lists.size() || _lists.get(queue) == null) {
131 throw new InvalidQueueException("Requested queue "+queue+" does not exist");
132 }
133 // block if the queue is empty
134 if (((LinkedList) _lists.get(queue)).size() == 0) {
135 synchronized(((LinkedList) _lists.get(queue))) {
136 try { ((LinkedList) _lists.get(queue)).wait(); } catch(Exception e) {}
137 }
138 }
139 // get an item, it should never be null due to the blocking above
140 Object o = null;
141 synchronized(this) {
142 try {
143 o = ((LinkedList) _lists.get(queue)).removeFirst();
144 }
145 catch (NoSuchElementException e) {
146 // This should never happen !
147 }
148 }
149 return o;
150 }
151
152 /**
153 * This method releases a get() method that's currently
154 * waiting on an empty queue. This was designed for
155 * shutdown() type methods that may have problems closing
156 * if the thread of control is waiting on a queue.
157 *
158 * @param queue the queue to release.
159 */
160 public void releaseQueue(int queue) {
161 synchronized(((LinkedList) _lists.get(queue))) {
162 ((LinkedList) _lists.get(queue)).notifyAll();
163 }
164 }
165
166 /**
167 * This method erases the contents of a given queue. This
168 * method should be used with care. It can only empty one
169 * internal queue, not all of them. This must be called
170 * multiple times to empty all queues.
171 *
172 * @param queue the queue to empty.
173 */
174 public void clearQueue(int queue) {
175 synchronized(this) {
176 ((LinkedList) _lists.get(queue)).clear();
177 }
178 }
179
180 /**
181 * This method returns an XML textual status of the queues.
182 * It is merely for observation, and would most likely be
183 * used by a larger "monitoring" component. Information
184 * returned includes the current size of each queue, and
185 * the total items passed through.
186 *
187 * @return A String message containing status information in XML format
188 */
189 public String xmlStatus() {
190 String status = "<queue ";
191 for(int i=0; i < _lists.size(); i++) {
192 // check for null entries
193 if(_lists.get(i) != null) {
194 status += "queue"+i+"=\""+((LinkedList) _lists.get(i)).size()+"\" ";
195 }
196 else {
197 status += "queue"+i+"=\"[deleted]\" ";
198 }
199 }
200 status += "total=\""+_count+"\"";
201 if(_maxSize != -1) {
202 status += " maxSize=\""+_maxSize+"\"";
203 }
204 status += "</queue>";
205 return status;
206 }
207
208 /**
209 * Returns the size of a given queue. A consumer can use
210 * this to see how big their queue is at any given time.
211 * they should use their queue number as the parameter.
212 *
213 * @param queue The queue number to query.
214 * @return the current size of the queue.
215 * @throws InvalidQueueException if the queue does not exist.
216 */
217 public int queueSize(int queue) throws InvalidQueueException {
218 if (queue >= _lists.size() || _lists.get(queue) == null) {
219 throw new InvalidQueueException("Requested queue "+queue+" does not exist");
220 }
221 return ((LinkedList) _lists.get(queue)).size();
222 }
223
224 /**
225 * Returns the total numer of elements to have passed
226 * through this queue (ie. a counter on the add method).
227 *
228 * @return the element-ometer.
229 */
230 public int elementCount() {
231 return _count;
232 }
233
234 /**
235 * This method assigns a queue to a consumer. The idea behind
236 * this is to ensure that only 1 consumer can be associated with
237 * a given queue, otherwise the whole "blocking" thing fails
238 * miserably. Queues are created upon requested.
239 *
240 * It is IMPORTANT that removeQueue() is used when the queue is
241 * no longer required.
242 *
243 * @return An integer to be passed to the get() method.
244 */
245 public int getQueue() {
246 int pos = -1;
247 for(int i=0; i < _lists.size(); i++) {
248 if(_lists.get(i) == null) {
249 // found a gap, re-use it
250 pos = i;
251 _lists.set(i, new LinkedList());
252 }
253 }
254 if(pos == -1) {
255 //we didn't find a gap, add at end
256 pos = _lists.size();
257 _lists.add(pos, new LinkedList());
258 }
259 return pos;
260 }
261
262 /**
263 * This method sets a entry to null in the list. This ensures
264 * that it will no longer be added to after it is no longer
265 * required be a consumer.
266 *
267 * @param queue The integer identifier for the queue, given by getQueue().
268 */
269 public void removeQueue(int queue) {
270 _lists.set(queue, null);
271 }
272
273 /**
274 * Start a monitor on our own Queue. This will log XML
275 * statistics about our Queue to a given Queue (could be
276 * the one being monitored).
277 *
278 * @param interval The long interval, in milliseconds, at which to take samples
279 * @param destQueue The queue to monitor to
280 * @param name A name to identify this Queue with
281 * @return whether we succeeded
282 */
283 public boolean startMonitor(long interval, Queue destQueue, String name) {
284 if(_queueMon == null) {
285 // start a monitor
286 _queueMon = new QueueMonitor(this, destQueue, interval, name);
287 _queueMon.start();
288 return true;
289 }
290 else {
291 // already have a monitor running
292 return false;
293 }
294 }
295
296 /**
297 * Start a monitor on our own Queue. This will log XML
298 * statistics about our Queue to this Queue.
299 *
300 * @param interval The long interval, in milliseconds, at which to take samples
301 * @param name A name to identify this Queue with
302 * @return whether we succeeded
303 */
304 public boolean startMonitor(long interval, String name) {
305 return startMonitor(interval, this, name);
306 }
307
308 /**
309 * Stop a monitor on our Queue if we have on running.
310 *
311 * @return whether we succeeded
312 */
313 public boolean stopMonitor() {
314 if(_queueMon != null) {
315 // stop a monitor
316 _queueMon.shutdown();
317 _queueMon = null;
318 return true;
319 }
320 else {
321 // no monitor running
322 return false;
323 }
324 }
325
326 /**
327 * Overrides the {@link java.lang.Object#toString() Object.toString()}
328 * method to provide clean logging (every class should have this).
329 *
330 * This uses the uk.ac.ukc.iscream.util.FormatName class
331 * to format the toString()
332 *
333 * @return the name of this class and its CVS revision.
334 */
335 public String toString() {
336 return FormatName.getName(
337 _name,
338 getClass().getName(),
339 REVISION);
340 }
341
342 //---PRIVATE METHODS---
343
344 /**
345 * This method removes an item from a Queue, using a method
346 * requested at construction.
347 *
348 * @param list The LinkedList from which to remove an item.
349 */
350 private void removeQueueItem(LinkedList list) {
351 // look at our algorithm
352 // remove a random item from the list
353 if(_removeAlgorithm==RANDOM) {
354 // new Random, with a good seed
355 Random rand = new Random(System.currentTimeMillis());
356 int i = rand.nextInt(_maxSize);
357 synchronized(this) {
358 list.remove(i);
359 }
360 }
361 // remove the first item from the list
362 else if(_removeAlgorithm==FIRST) {
363 synchronized(this) {
364 list.removeFirst();
365 }
366 }
367 // remove the last item from the list
368 else if(_removeAlgorithm==LAST) {
369 synchronized(this) {
370 list.removeLast();
371 }
372 }
373 }
374
375 //---ACCESSOR/MUTATOR METHODS---
376
377 //---ATTRIBUTES---
378
379 /**
380 * The LinkedLists of queues.
381 */
382 private LinkedList _lists = new LinkedList();
383
384 /**
385 * A counter so we know how many data items have been
386 * passed through, for statistical purposes.
387 */
388 private int _count = 0;
389
390 /**
391 * A reference to our QueueMonitor, if we have one.
392 */
393 private QueueMonitor _queueMon = null;
394
395 /**
396 * The maximum size of any Queue.
397 */
398 private int _maxSize = -1;
399
400 /**
401 * The remove algorithm to use upon a Queue reaching
402 * it's maximum size.
403 */
404 private int _removeAlgorithm = -1;
405
406 /**
407 * This is the friendly identifier of the
408 * component this class is running in.
409 * eg, a Filter may be called "filter1",
410 * If this class does not have an owning
411 * component, a name from the configuration
412 * can be placed here. This name could also
413 * be changed to null for utility classes.
414 */
415 private String _name = null;
416
417 //---STATIC ATTRIBUTES---
418
419 }