ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/cms/source/util/uk/org/iscream/cms/util/Queue.java
Revision: 1.20
Committed: Mon Mar 26 17:59:47 2001 UTC (23 years, 1 month ago) by tdb
Branch: MAIN
CVS Tags: PROJECT_COMPLETION
Changes since 1.19: +6 -8 lines
Log Message:
Some further checking to solve deadlock issues.
Firstly when checking if the queue is "full", we now do a check for "greater
than or equal" to the limit, rather than just "equal". If for any reason it
does get over the limit, it'll still get caught.
Secondly we no longer check if the queue is of size 0 to do the notify, we just
do it anyway. We figure this is probably more efficient than trying to do
a massive synchronization block.

File Contents

# User Rev Content
1 tdb 1.1 //---PACKAGE DECLARATION---
2 tdb 1.16 package uk.org.iscream.util;
3 tdb 1.1
4     //---IMPORTS---
5     import java.util.LinkedList;
6     import java.util.NoSuchElementException;
7 tdb 1.14 import java.util.Random;
8 tdb 1.16 import uk.org.iscream.util.*;
9 tdb 1.1
10     /**
11     * A Queue class designed to operate in a multi-threaded environment, with
12     * added support for multiple "consumer" threads. Also offers blocking on
13     * the get() methods, which ensures the consumer waits until the queue
14     * actually contains some elements.
15     *
16     * @author $Author: tdb1 $
17 tdb 1.20 * @version $Id: Queue.java,v 1.19 2001/03/22 18:23:27 tdb1 Exp $
18 tdb 1.1 */
19 tdb 1.2 public class Queue {
20 tdb 1.1
21     //---FINAL ATTRIBUTES---
22    
23     /**
24     * The current CVS revision of this class
25     */
26 tdb 1.20 public static final String REVISION = "$Revision: 1.19 $";
27 tdb 1.14
28     /**
29     * Pass to constructor to remove a RANDOM item from
30     * the Queue upon reaching the maximum limit.
31     */
32     public static final int RANDOM = 0;
33    
34     /**
35     * Pass to constructor to remove the FIRST item from
36     * the Queue upon reaching the maximum limit.
37     */
38     public static final int FIRST = 1;
39    
40     /**
41     * Pass to constructor to remove the LAST item from
42     * the Queue upon reaching the maximum limit.
43     */
44     public static final int LAST = 2;
45    
46     /**
47     * Pass to constructor to drop the new item upon reaching
48     * the maximum Queue limit.
49     */
50     public static final int DROP = 3;
51 tdb 1.17
52     /**
53     * To allow opposite lookups.
54     */
55     public static final String[] algorithms = {"RANDOM", "FIRST", "LAST", "DROP"};
56 tdb 1.1
57     //---STATIC METHODS---
58    
59     //---CONSTRUCTORS---
60 tdb 1.14
61     /**
62     * Constructs a new Queue with a maximum size limit on
63     * any individual queue. This should be used to stop
64     * conditions where the Queue cannot be guaranteed to
65     * be emptied as quick as it's filled.
66     *
67     * An algorithm will be used to remove data when new data
68     * arrives. There may be choices of algorithms later on.
69     *
70     * @param maxSize the upper limit for a queue
71     * @param removeAlgorithm the remove algorithm to use upon reaching the maxSize
72     */
73     public Queue(int maxSize, int removeAlgorithm) {
74     _maxSize = maxSize;
75     _removeAlgorithm = removeAlgorithm;
76     }
77    
78     /**
79     * Constructs a Queue with no maximum size.
80     */
81     public Queue() {
82     _maxSize = -1;
83     }
84 tdb 1.1
85     //---PUBLIC METHODS---
86    
87     /**
88     * This method adds a given object to every queue. It will notify
89     * any waiting consumers (on an empty queue) during this process.
90     *
91     * @param o An Object to be added to the queues.
92     */
93     public void add(Object o) {
94     for(int i=0; i < _lists.size(); i++) {
95     // skip over any gaps left in the list
96     if(_lists.get(i) != null) {
97 tdb 1.14 // get size before adding to the Queue
98 tdb 1.1 int s = ((LinkedList) _lists.get(i)).size();
99 tdb 1.14 // check whether we need to remove an item from the current Queue
100 tdb 1.20 if(_maxSize!=-1 && s>=_maxSize && _removeAlgorithm!=DROP) {
101 tdb 1.14 // we need to remove an item
102     removeQueueItem((LinkedList) _lists.get(i));
103     }
104     // check if we should add (not if Queue full, and using DROP algorithm)
105 tdb 1.20 if(!(s>=_maxSize && _removeAlgorithm==DROP)) {
106 tdb 1.14 // add the next item, ensuring we lock
107     synchronized(this) {
108     // LinkedList.add() does the same thing, but this ensures behaviour
109     ((LinkedList) _lists.get(i)).addLast(o);
110     }
111 tdb 1.1 }
112     // if the queue was empty before the add it is possible
113     // that a consumer is waiting... so we notify them
114 tdb 1.20 synchronized(((LinkedList) _lists.get(i))) {
115     ((LinkedList) _lists.get(i)).notifyAll();
116 tdb 1.1 }
117     }
118     }
119     // we keep track of the total additions for the status() method
120     _count++;
121     }
122    
123     /**
124     * This method returns an object from the front of a given queue.
125     * It will block until data exists in the queue if required.
126     *
127 tdb 1.8 * @param The queue to retrieve data from.
128 tdb 1.1 * @return The object from the front of the queue.
129     * @throws InvalidQueueException if the queue does not exist.
130     */
131     public Object get(int queue) throws InvalidQueueException {
132     // make sure queue exists
133     if (queue >= _lists.size() || _lists.get(queue) == null) {
134     throw new InvalidQueueException("Requested queue "+queue+" does not exist");
135     }
136     // block if the queue is empty
137 tdb 1.18 synchronized(((LinkedList) _lists.get(queue))) {
138     if (((LinkedList) _lists.get(queue)).size() == 0) {
139 tdb 1.1 try { ((LinkedList) _lists.get(queue)).wait(); } catch(Exception e) {}
140     }
141     }
142     // get an item, it should never be null due to the blocking above
143     Object o = null;
144     synchronized(this) {
145     try {
146     o = ((LinkedList) _lists.get(queue)).removeFirst();
147     }
148     catch (NoSuchElementException e) {
149     // This should never happen !
150     }
151     }
152     return o;
153     }
154 tdb 1.6
155     /**
156     * This method releases a get() method that's currently
157     * waiting on an empty queue. This was designed for
158     * shutdown() type methods that may have problems closing
159     * if the thread of control is waiting on a queue.
160     *
161 tdb 1.8 * @param queue the queue to release.
162 tdb 1.6 */
163     public void releaseQueue(int queue) {
164     synchronized(((LinkedList) _lists.get(queue))) {
165     ((LinkedList) _lists.get(queue)).notifyAll();
166 tdb 1.7 }
167     }
168    
169     /**
170     * This method erases the contents of a given queue. This
171     * method should be used with care. It can only empty one
172     * internal queue, not all of them. This must be called
173     * multiple times to empty all queues.
174     *
175     * @param queue the queue to empty.
176     */
177     public void clearQueue(int queue) {
178     synchronized(this) {
179     ((LinkedList) _lists.get(queue)).clear();
180 tdb 1.6 }
181     }
182    
183 tdb 1.1 /**
184 tdb 1.9 * This method returns an XML textual status of the queues.
185     * It is merely for observation, and would most likely be
186     * used by a larger "monitoring" component. Information
187     * returned includes the current size of each queue, and
188     * the total items passed through.
189 tdb 1.1 *
190 tdb 1.9 * @return A String message containing status information in XML format
191 tdb 1.1 */
192 tdb 1.9 public String xmlStatus() {
193     String status = "<queue ";
194 tdb 1.1 for(int i=0; i < _lists.size(); i++) {
195     // check for null entries
196     if(_lists.get(i) != null) {
197 tdb 1.9 status += "queue"+i+"=\""+((LinkedList) _lists.get(i)).size()+"\" ";
198 tdb 1.1 }
199     else {
200 tdb 1.13 status += "queue"+i+"=\"[deleted]\" ";
201 tdb 1.1 }
202     }
203 tdb 1.14 status += "total=\""+_count+"\"";
204     if(_maxSize != -1) {
205     status += " maxSize=\""+_maxSize+"\"";
206     }
207 tdb 1.15 status += "></queue>";
208 tdb 1.1 return status;
209 tdb 1.4 }
210    
211     /**
212     * Returns the size of a given queue. A consumer can use
213     * this to see how big their queue is at any given time.
214     * they should use their queue number as the parameter.
215     *
216     * @param queue The queue number to query.
217     * @return the current size of the queue.
218 tdb 1.8 * @throws InvalidQueueException if the queue does not exist.
219 tdb 1.4 */
220     public int queueSize(int queue) throws InvalidQueueException {
221 tdb 1.5 if (queue >= _lists.size() || _lists.get(queue) == null) {
222 tdb 1.4 throw new InvalidQueueException("Requested queue "+queue+" does not exist");
223     }
224 tdb 1.5 return ((LinkedList) _lists.get(queue)).size();
225 tdb 1.4 }
226    
227     /**
228     * Returns the total numer of elements to have passed
229     * through this queue (ie. a counter on the add method).
230     *
231 tdb 1.8 * @return the element-ometer.
232 tdb 1.4 */
233     public int elementCount() {
234 tdb 1.5 return _count;
235 tdb 1.1 }
236    
237     /**
238     * This method assigns a queue to a consumer. The idea behind
239     * this is to ensure that only 1 consumer can be associated with
240     * a given queue, otherwise the whole "blocking" thing fails
241     * miserably. Queues are created upon requested.
242     *
243     * It is IMPORTANT that removeQueue() is used when the queue is
244     * no longer required.
245     *
246     * @return An integer to be passed to the get() method.
247     */
248 tdb 1.19 public synchronized int getQueue() {
249 tdb 1.1 int pos = -1;
250     for(int i=0; i < _lists.size(); i++) {
251     if(_lists.get(i) == null) {
252     // found a gap, re-use it
253     pos = i;
254     _lists.set(i, new LinkedList());
255     }
256     }
257     if(pos == -1) {
258     //we didn't find a gap, add at end
259     pos = _lists.size();
260     _lists.add(pos, new LinkedList());
261     }
262     return pos;
263     }
264    
265     /**
266     * This method sets a entry to null in the list. This ensures
267     * that it will no longer be added to after it is no longer
268     * required be a consumer.
269     *
270     * @param queue The integer identifier for the queue, given by getQueue().
271     */
272 tdb 1.19 public synchronized void removeQueue(int queue) {
273 tdb 1.1 _lists.set(queue, null);
274     }
275    
276     /**
277 tdb 1.9 * Start a monitor on our own Queue. This will log XML
278     * statistics about our Queue to a given Queue (could be
279     * the one being monitored).
280     *
281     * @param interval The long interval, in milliseconds, at which to take samples
282     * @param destQueue The queue to monitor to
283     * @param name A name to identify this Queue with
284     * @return whether we succeeded
285     */
286     public boolean startMonitor(long interval, Queue destQueue, String name) {
287     if(_queueMon == null) {
288     // start a monitor
289     _queueMon = new QueueMonitor(this, destQueue, interval, name);
290     _queueMon.start();
291     return true;
292     }
293     else {
294     // already have a monitor running
295     return false;
296     }
297     }
298    
299     /**
300     * Start a monitor on our own Queue. This will log XML
301     * statistics about our Queue to this Queue.
302     *
303     * @param interval The long interval, in milliseconds, at which to take samples
304     * @param name A name to identify this Queue with
305     * @return whether we succeeded
306     */
307     public boolean startMonitor(long interval, String name) {
308     return startMonitor(interval, this, name);
309     }
310    
311     /**
312     * Stop a monitor on our Queue if we have on running.
313     *
314     * @return whether we succeeded
315     */
316     public boolean stopMonitor() {
317     if(_queueMon != null) {
318     // stop a monitor
319     _queueMon.shutdown();
320     _queueMon = null;
321     return true;
322     }
323     else {
324     // no monitor running
325     return false;
326     }
327     }
328    
329     /**
330 tdb 1.1 * Overrides the {@link java.lang.Object#toString() Object.toString()}
331     * method to provide clean logging (every class should have this).
332     *
333 tdb 1.16 * This uses the uk.org.iscream.util.FormatName class
334 tdb 1.1 * to format the toString()
335     *
336 tdb 1.8 * @return the name of this class and its CVS revision.
337 tdb 1.1 */
338 tdb 1.3 public String toString() {
339 tdb 1.1 return FormatName.getName(
340     _name,
341     getClass().getName(),
342     REVISION);
343 tdb 1.3 }
344 tdb 1.1
345     //---PRIVATE METHODS---
346 tdb 1.14
347     /**
348     * This method removes an item from a Queue, using a method
349     * requested at construction.
350     *
351     * @param list The LinkedList from which to remove an item.
352     */
353     private void removeQueueItem(LinkedList list) {
354     // look at our algorithm
355     // remove a random item from the list
356     if(_removeAlgorithm==RANDOM) {
357     // new Random, with a good seed
358     Random rand = new Random(System.currentTimeMillis());
359     int i = rand.nextInt(_maxSize);
360     synchronized(this) {
361     list.remove(i);
362     }
363     }
364     // remove the first item from the list
365     else if(_removeAlgorithm==FIRST) {
366     synchronized(this) {
367     list.removeFirst();
368     }
369     }
370     // remove the last item from the list
371     else if(_removeAlgorithm==LAST) {
372     synchronized(this) {
373     list.removeLast();
374     }
375     }
376     }
377    
378 tdb 1.1 //---ACCESSOR/MUTATOR METHODS---
379    
380     //---ATTRIBUTES---
381    
382     /**
383     * The LinkedLists of queues.
384     */
385     private LinkedList _lists = new LinkedList();
386    
387     /**
388     * A counter so we know how many data items have been
389     * passed through, for statistical purposes.
390     */
391     private int _count = 0;
392 tdb 1.9
393     /**
394     * A reference to our QueueMonitor, if we have one.
395     */
396     private QueueMonitor _queueMon = null;
397 tdb 1.14
398     /**
399     * The maximum size of any Queue.
400     */
401     private int _maxSize = -1;
402    
403     /**
404     * The remove algorithm to use upon a Queue reaching
405     * it's maximum size.
406     */
407     private int _removeAlgorithm = -1;
408 tdb 1.1
409     /**
410     * This is the friendly identifier of the
411     * component this class is running in.
412     * eg, a Filter may be called "filter1",
413     * If this class does not have an owning
414     * component, a name from the configuration
415     * can be placed here. This name could also
416     * be changed to null for utility classes.
417     */
418 tdb 1.3 private String _name = null;
419 tdb 1.1
420     //---STATIC ATTRIBUTES---
421    
422     }