ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/cms/source/util/uk/org/iscream/cms/util/Queue.java
Revision: 1.14
Committed: Thu Mar 1 01:05:49 2001 UTC (23 years, 2 months ago) by tdb
Branch: MAIN
Changes since 1.13: +112 -7 lines
Log Message:
Finally implemented maximum size on Queue's. This is an optional feature, and
is only used if the correct constructor is called.

For normal use (with no limit), just use the empty constructor;
    new Queue()

For using a limit, use the following constructor;
    new Queue(int maxSize, int removeAlgorithm)
Where;
    maxSize = the maximum size of a queue
    removeAlgorithm = Queue.RANDOM, Queue.FIRST, Queue.LAST, or Queue.DROP

Queue.RANDOM = Remove a RANDOM item from a full queue when adding.
Queue.FIRST  = Remove the FIRST item from a full queue when adding.
Queue.LAST   = Remove the LAST item from a full queue when adding.
Queue.DROP   = Don't remove, just drop the new item when adding.

File Contents

# User Rev Content
1 tdb 1.1 //---PACKAGE DECLARATION---
2     package uk.ac.ukc.iscream.util;
3    
4     //---IMPORTS---
5     import java.util.LinkedList;
6     import java.util.NoSuchElementException;
7 tdb 1.14 import java.util.Random;
8 tdb 1.1 import uk.ac.ukc.iscream.util.*;
9    
10     /**
11     * A Queue class designed to operate in a multi-threaded environment, with
12     * added support for multiple "consumer" threads. Also offers blocking on
13     * the get() methods, which ensures the consumer waits until the queue
14     * actually contains some elements.
15     *
16     * @author $Author: tdb1 $
17 tdb 1.14 * @version $Id: Queue.java,v 1.13 2001/02/25 23:29:06 tdb1 Exp $
18 tdb 1.1 */
19 tdb 1.2 public class Queue {
20 tdb 1.1
21     //---FINAL ATTRIBUTES---
22    
23     /**
24     * The current CVS revision of this class
25     */
26 tdb 1.14 public static final String REVISION = "$Revision: 1.13 $";
27    
28     /**
29     * Pass to constructor to remove a RANDOM item from
30     * the Queue upon reaching the maximum limit.
31     */
32     public static final int RANDOM = 0;
33    
34     /**
35     * Pass to constructor to remove the FIRST item from
36     * the Queue upon reaching the maximum limit.
37     */
38     public static final int FIRST = 1;
39    
40     /**
41     * Pass to constructor to remove the LAST item from
42     * the Queue upon reaching the maximum limit.
43     */
44     public static final int LAST = 2;
45    
46     /**
47     * Pass to constructor to drop the new item upon reaching
48     * the maximum Queue limit.
49     */
50     public static final int DROP = 3;
51 tdb 1.1
52     //---STATIC METHODS---
53    
54     //---CONSTRUCTORS---
55 tdb 1.14
56     /**
57     * Constructs a new Queue with a maximum size limit on
58     * any individual queue. This should be used to stop
59     * conditions where the Queue cannot be guaranteed to
60     * be emptied as quick as it's filled.
61     *
62     * An algorithm will be used to remove data when new data
63     * arrives. There may be choices of algorithms later on.
64     *
65     * @param maxSize the upper limit for a queue
66     * @param removeAlgorithm the remove algorithm to use upon reaching the maxSize
67     */
68     public Queue(int maxSize, int removeAlgorithm) {
69     _maxSize = maxSize;
70     _removeAlgorithm = removeAlgorithm;
71     }
72    
73     /**
74     * Constructs a Queue with no maximum size.
75     */
76     public Queue() {
77     _maxSize = -1;
78     }
79 tdb 1.1
80     //---PUBLIC METHODS---
81    
82     /**
83     * This method adds a given object to every queue. It will notify
84     * any waiting consumers (on an empty queue) during this process.
85     *
86     * @param o An Object to be added to the queues.
87     */
88     public void add(Object o) {
89     for(int i=0; i < _lists.size(); i++) {
90     // skip over any gaps left in the list
91     if(_lists.get(i) != null) {
92 tdb 1.14 // get size before adding to the Queue
93 tdb 1.1 int s = ((LinkedList) _lists.get(i)).size();
94 tdb 1.14 // check whether we need to remove an item from the current Queue
95     if(_maxSize!=-1 && s==_maxSize && _removeAlgorithm!=DROP) {
96     // we need to remove an item
97     removeQueueItem((LinkedList) _lists.get(i));
98     }
99     // check if we should add (not if Queue full, and using DROP algorithm)
100     if(!(s==_maxSize && _removeAlgorithm==DROP)) {
101     // add the next item, ensuring we lock
102     synchronized(this) {
103     // LinkedList.add() does the same thing, but this ensures behaviour
104     ((LinkedList) _lists.get(i)).addLast(o);
105     }
106 tdb 1.1 }
107     // if the queue was empty before the add it is possible
108     // that a consumer is waiting... so we notify them
109     if (s == 0) {
110     synchronized(((LinkedList) _lists.get(i))) {
111     ((LinkedList) _lists.get(i)).notifyAll();
112     }
113     }
114     }
115     }
116     // we keep track of the total additions for the status() method
117     _count++;
118     }
119    
120     /**
121     * This method returns an object from the front of a given queue.
122     * It will block until data exists in the queue if required.
123     *
124 tdb 1.8 * @param The queue to retrieve data from.
125 tdb 1.1 * @return The object from the front of the queue.
126     * @throws InvalidQueueException if the queue does not exist.
127     */
128     public Object get(int queue) throws InvalidQueueException {
129     // make sure queue exists
130     if (queue >= _lists.size() || _lists.get(queue) == null) {
131     throw new InvalidQueueException("Requested queue "+queue+" does not exist");
132     }
133     // block if the queue is empty
134     if (((LinkedList) _lists.get(queue)).size() == 0) {
135     synchronized(((LinkedList) _lists.get(queue))) {
136     try { ((LinkedList) _lists.get(queue)).wait(); } catch(Exception e) {}
137     }
138     }
139     // get an item, it should never be null due to the blocking above
140     Object o = null;
141     synchronized(this) {
142     try {
143     o = ((LinkedList) _lists.get(queue)).removeFirst();
144     }
145     catch (NoSuchElementException e) {
146     // This should never happen !
147     }
148     }
149     return o;
150     }
151 tdb 1.6
152     /**
153     * This method releases a get() method that's currently
154     * waiting on an empty queue. This was designed for
155     * shutdown() type methods that may have problems closing
156     * if the thread of control is waiting on a queue.
157     *
158 tdb 1.8 * @param queue the queue to release.
159 tdb 1.6 */
160     public void releaseQueue(int queue) {
161     synchronized(((LinkedList) _lists.get(queue))) {
162     ((LinkedList) _lists.get(queue)).notifyAll();
163 tdb 1.7 }
164     }
165    
166     /**
167     * This method erases the contents of a given queue. This
168     * method should be used with care. It can only empty one
169     * internal queue, not all of them. This must be called
170     * multiple times to empty all queues.
171     *
172     * @param queue the queue to empty.
173     */
174     public void clearQueue(int queue) {
175     synchronized(this) {
176     ((LinkedList) _lists.get(queue)).clear();
177 tdb 1.6 }
178     }
179    
180 tdb 1.1 /**
181 tdb 1.9 * This method returns an XML textual status of the queues.
182     * It is merely for observation, and would most likely be
183     * used by a larger "monitoring" component. Information
184     * returned includes the current size of each queue, and
185     * the total items passed through.
186 tdb 1.1 *
187 tdb 1.9 * @return A String message containing status information in XML format
188 tdb 1.1 */
189 tdb 1.9 public String xmlStatus() {
190     String status = "<queue ";
191 tdb 1.1 for(int i=0; i < _lists.size(); i++) {
192     // check for null entries
193     if(_lists.get(i) != null) {
194 tdb 1.9 status += "queue"+i+"=\""+((LinkedList) _lists.get(i)).size()+"\" ";
195 tdb 1.1 }
196     else {
197 tdb 1.13 status += "queue"+i+"=\"[deleted]\" ";
198 tdb 1.1 }
199     }
200 tdb 1.14 status += "total=\""+_count+"\"";
201     if(_maxSize != -1) {
202     status += " maxSize=\""+_maxSize+"\"";
203     }
204     status += "</queue>";
205 tdb 1.1 return status;
206 tdb 1.4 }
207    
208     /**
209     * Returns the size of a given queue. A consumer can use
210     * this to see how big their queue is at any given time.
211     * they should use their queue number as the parameter.
212     *
213     * @param queue The queue number to query.
214     * @return the current size of the queue.
215 tdb 1.8 * @throws InvalidQueueException if the queue does not exist.
216 tdb 1.4 */
217     public int queueSize(int queue) throws InvalidQueueException {
218 tdb 1.5 if (queue >= _lists.size() || _lists.get(queue) == null) {
219 tdb 1.4 throw new InvalidQueueException("Requested queue "+queue+" does not exist");
220     }
221 tdb 1.5 return ((LinkedList) _lists.get(queue)).size();
222 tdb 1.4 }
223    
224     /**
225     * Returns the total numer of elements to have passed
226     * through this queue (ie. a counter on the add method).
227     *
228 tdb 1.8 * @return the element-ometer.
229 tdb 1.4 */
230     public int elementCount() {
231 tdb 1.5 return _count;
232 tdb 1.1 }
233    
234     /**
235     * This method assigns a queue to a consumer. The idea behind
236     * this is to ensure that only 1 consumer can be associated with
237     * a given queue, otherwise the whole "blocking" thing fails
238     * miserably. Queues are created upon requested.
239     *
240     * It is IMPORTANT that removeQueue() is used when the queue is
241     * no longer required.
242     *
243     * @return An integer to be passed to the get() method.
244     */
245     public int getQueue() {
246     int pos = -1;
247     for(int i=0; i < _lists.size(); i++) {
248     if(_lists.get(i) == null) {
249     // found a gap, re-use it
250     pos = i;
251     _lists.set(i, new LinkedList());
252     }
253     }
254     if(pos == -1) {
255     //we didn't find a gap, add at end
256     pos = _lists.size();
257     _lists.add(pos, new LinkedList());
258     }
259     return pos;
260     }
261    
262     /**
263     * This method sets a entry to null in the list. This ensures
264     * that it will no longer be added to after it is no longer
265     * required be a consumer.
266     *
267     * @param queue The integer identifier for the queue, given by getQueue().
268     */
269     public void removeQueue(int queue) {
270     _lists.set(queue, null);
271     }
272    
273     /**
274 tdb 1.9 * Start a monitor on our own Queue. This will log XML
275     * statistics about our Queue to a given Queue (could be
276     * the one being monitored).
277     *
278     * @param interval The long interval, in milliseconds, at which to take samples
279     * @param destQueue The queue to monitor to
280     * @param name A name to identify this Queue with
281     * @return whether we succeeded
282     */
283     public boolean startMonitor(long interval, Queue destQueue, String name) {
284     if(_queueMon == null) {
285     // start a monitor
286     _queueMon = new QueueMonitor(this, destQueue, interval, name);
287     _queueMon.start();
288     return true;
289     }
290     else {
291     // already have a monitor running
292     return false;
293     }
294     }
295    
296     /**
297     * Start a monitor on our own Queue. This will log XML
298     * statistics about our Queue to this Queue.
299     *
300     * @param interval The long interval, in milliseconds, at which to take samples
301     * @param name A name to identify this Queue with
302     * @return whether we succeeded
303     */
304     public boolean startMonitor(long interval, String name) {
305     return startMonitor(interval, this, name);
306     }
307    
308     /**
309     * Stop a monitor on our Queue if we have on running.
310     *
311     * @return whether we succeeded
312     */
313     public boolean stopMonitor() {
314     if(_queueMon != null) {
315     // stop a monitor
316     _queueMon.shutdown();
317     _queueMon = null;
318     return true;
319     }
320     else {
321     // no monitor running
322     return false;
323     }
324     }
325    
326     /**
327 tdb 1.1 * Overrides the {@link java.lang.Object#toString() Object.toString()}
328     * method to provide clean logging (every class should have this).
329     *
330     * This uses the uk.ac.ukc.iscream.util.FormatName class
331     * to format the toString()
332     *
333 tdb 1.8 * @return the name of this class and its CVS revision.
334 tdb 1.1 */
335 tdb 1.3 public String toString() {
336 tdb 1.1 return FormatName.getName(
337     _name,
338     getClass().getName(),
339     REVISION);
340 tdb 1.3 }
341 tdb 1.1
342     //---PRIVATE METHODS---
343 tdb 1.14
344     /**
345     * This method removes an item from a Queue, using a method
346     * requested at construction.
347     *
348     * @param list The LinkedList from which to remove an item.
349     */
350     private void removeQueueItem(LinkedList list) {
351     // look at our algorithm
352     // remove a random item from the list
353     if(_removeAlgorithm==RANDOM) {
354     // new Random, with a good seed
355     Random rand = new Random(System.currentTimeMillis());
356     int i = rand.nextInt(_maxSize);
357     synchronized(this) {
358     list.remove(i);
359     }
360     }
361     // remove the first item from the list
362     else if(_removeAlgorithm==FIRST) {
363     synchronized(this) {
364     list.removeFirst();
365     }
366     }
367     // remove the last item from the list
368     else if(_removeAlgorithm==LAST) {
369     synchronized(this) {
370     list.removeLast();
371     }
372     }
373     }
374    
375 tdb 1.1 //---ACCESSOR/MUTATOR METHODS---
376    
377     //---ATTRIBUTES---
378    
379     /**
380     * The LinkedLists of queues.
381     */
382     private LinkedList _lists = new LinkedList();
383    
384     /**
385     * A counter so we know how many data items have been
386     * passed through, for statistical purposes.
387     */
388     private int _count = 0;
389 tdb 1.9
390     /**
391     * A reference to our QueueMonitor, if we have one.
392     */
393     private QueueMonitor _queueMon = null;
394 tdb 1.14
395     /**
396     * The maximum size of any Queue.
397     */
398     private int _maxSize = -1;
399    
400     /**
401     * The remove algorithm to use upon a Queue reaching
402     * it's maximum size.
403     */
404     private int _removeAlgorithm = -1;
405 tdb 1.1
406     /**
407     * This is the friendly identifier of the
408     * component this class is running in.
409     * eg, a Filter may be called "filter1",
410     * If this class does not have an owning
411     * component, a name from the configuration
412     * can be placed here. This name could also
413     * be changed to null for utility classes.
414     */
415 tdb 1.3 private String _name = null;
416 tdb 1.1
417     //---STATIC ATTRIBUTES---
418    
419     }