ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/experimental/server/Queue/Queue.java
Revision: 1.9
Committed: Thu Mar 1 03:04:28 2001 UTC (23 years, 8 months ago) by tdb
Branch: MAIN
CVS Tags: PROJECT_COMPLETION, HEAD
Changes since 1.8: +112 -7 lines
Log Message:
Brought inline with main source tree.

File Contents

# User Rev Content
1 tdb 1.2 //---PACKAGE DECLARATION---
2 tdb 1.7 //package uk.ac.ukc.iscream.util;
3 tdb 1.2
4     //---IMPORTS---
5 tdb 1.1 import java.util.LinkedList;
6     import java.util.NoSuchElementException;
7 tdb 1.9 import java.util.Random;
8 tdb 1.7 //import uk.ac.ukc.iscream.util.*;
9 tdb 1.1
10 tdb 1.2 /**
11     * A Queue class designed to operate in a multi-threaded environment, with
12     * added support for multiple "consumer" threads. Also offers blocking on
13     * the get() methods, which ensures the consumer waits until the queue
14     * actually contains some elements.
15     *
16 tdb 1.3 * @author $Author: tdb1 $
17 tdb 1.9 * @version $Id: Queue.java,v 1.15 2001/03/01 01:18:09 tdb1 Exp $
18 tdb 1.2 */
19 tdb 1.4 public class Queue {
20 tdb 1.2
21     //---FINAL ATTRIBUTES---
22    
23     /**
24     * The current CVS revision of this class
25     */
26 tdb 1.9 public static final String REVISION = "$Revision: 1.15 $";
27    
28     /**
29     * Pass to constructor to remove a RANDOM item from
30     * the Queue upon reaching the maximum limit.
31     */
32     public static final int RANDOM = 0;
33    
34     /**
35     * Pass to constructor to remove the FIRST item from
36     * the Queue upon reaching the maximum limit.
37     */
38     public static final int FIRST = 1;
39    
40     /**
41     * Pass to constructor to remove the LAST item from
42     * the Queue upon reaching the maximum limit.
43     */
44     public static final int LAST = 2;
45    
46     /**
47     * Pass to constructor to drop the new item upon reaching
48     * the maximum Queue limit.
49     */
50     public static final int DROP = 3;
51 tdb 1.2
52     //---STATIC METHODS---
53    
54 tdb 1.3 //---CONSTRUCTORS---
55 tdb 1.9
56     /**
57     * Constructs a new Queue with a maximum size limit on
58     * any individual queue. This should be used to stop
59     * conditions where the Queue cannot be guaranteed to
60     * be emptied as quick as it's filled.
61     *
62     * An algorithm will be used to remove data when new data
63     * arrives. There may be choices of algorithms later on.
64     *
65     * @param maxSize the upper limit for a queue
66     * @param removeAlgorithm the remove algorithm to use upon reaching the maxSize
67     */
68     public Queue(int maxSize, int removeAlgorithm) {
69     _maxSize = maxSize;
70     _removeAlgorithm = removeAlgorithm;
71     }
72    
73     /**
74     * Constructs a Queue with no maximum size.
75     */
76     public Queue() {
77     _maxSize = -1;
78     }
79 tdb 1.3
80 tdb 1.2 //---PUBLIC METHODS---
81    
82     /**
83     * This method adds a given object to every queue. It will notify
84     * any waiting consumers (on an empty queue) during this process.
85     *
86     * @param o An Object to be added to the queues.
87     */
88     public void add(Object o) {
89 tdb 1.3 for(int i=0; i < _lists.size(); i++) {
90     // skip over any gaps left in the list
91     if(_lists.get(i) != null) {
92 tdb 1.9 // get size before adding to the Queue
93 tdb 1.3 int s = ((LinkedList) _lists.get(i)).size();
94 tdb 1.9 // check whether we need to remove an item from the current Queue
95     if(_maxSize!=-1 && s==_maxSize && _removeAlgorithm!=DROP) {
96     // we need to remove an item
97     removeQueueItem((LinkedList) _lists.get(i));
98     }
99     // check if we should add (not if Queue full, and using DROP algorithm)
100     if(!(s==_maxSize && _removeAlgorithm==DROP)) {
101     // add the next item, ensuring we lock
102     synchronized(this) {
103     // LinkedList.add() does the same thing, but this ensures behaviour
104     ((LinkedList) _lists.get(i)).addLast(o);
105     }
106 tdb 1.3 }
107     // if the queue was empty before the add it is possible
108     // that a consumer is waiting... so we notify them
109     if (s == 0) {
110     synchronized(((LinkedList) _lists.get(i))) {
111     ((LinkedList) _lists.get(i)).notifyAll();
112     }
113 tdb 1.2 }
114     }
115 tdb 1.1 }
116 tdb 1.2 // we keep track of the total additions for the status() method
117 tdb 1.1 _count++;
118     }
119    
120 tdb 1.2 /**
121     * This method returns an object from the front of a given queue.
122     * It will block until data exists in the queue if required.
123     *
124 tdb 1.6 * @param The queue to retrieve data from.
125 tdb 1.2 * @return The object from the front of the queue.
126 tdb 1.3 * @throws InvalidQueueException if the queue does not exist.
127 tdb 1.2 */
128 tdb 1.3 public Object get(int queue) throws InvalidQueueException {
129     // make sure queue exists
130     if (queue >= _lists.size() || _lists.get(queue) == null) {
131     throw new InvalidQueueException("Requested queue "+queue+" does not exist");
132     }
133 tdb 1.2 // block if the queue is empty
134 tdb 1.3 if (((LinkedList) _lists.get(queue)).size() == 0) {
135     synchronized(((LinkedList) _lists.get(queue))) {
136     try { ((LinkedList) _lists.get(queue)).wait(); } catch(Exception e) {}
137 tdb 1.2 }
138 tdb 1.1 }
139 tdb 1.2 // get an item, it should never be null due to the blocking above
140 tdb 1.1 Object o = null;
141 tdb 1.2 synchronized(this) {
142     try {
143 tdb 1.3 o = ((LinkedList) _lists.get(queue)).removeFirst();
144 tdb 1.2 }
145     catch (NoSuchElementException e) {
146     // This should never happen !
147     }
148 tdb 1.1 }
149     return o;
150     }
151 tdb 1.5
152     /**
153     * This method releases a get() method that's currently
154     * waiting on an empty queue. This was designed for
155     * shutdown() type methods that may have problems closing
156     * if the thread of control is waiting on a queue.
157     *
158 tdb 1.6 * @param queue the queue to release.
159 tdb 1.5 */
160     public void releaseQueue(int queue) {
161     synchronized(((LinkedList) _lists.get(queue))) {
162     ((LinkedList) _lists.get(queue)).notifyAll();
163     }
164     }
165 tdb 1.6
166     /**
167     * This method erases the contents of a given queue. This
168     * method should be used with care. It can only empty one
169     * internal queue, not all of them. This must be called
170     * multiple times to empty all queues.
171     *
172     * @param queue the queue to empty.
173     */
174     public void clearQueue(int queue) {
175     synchronized(this) {
176     ((LinkedList) _lists.get(queue)).clear();
177     }
178     }
179 tdb 1.5
180 tdb 1.2 /**
181 tdb 1.7 * This method returns an XML textual status of the queues.
182     * It is merely for observation, and would most likely be
183     * used by a larger "monitoring" component. Information
184     * returned includes the current size of each queue, and
185     * the total items passed through.
186 tdb 1.2 *
187 tdb 1.7 * @return A String message containing status information in XML format
188 tdb 1.2 */
189 tdb 1.7 public String xmlStatus() {
190     String status = "<queue ";
191 tdb 1.3 for(int i=0; i < _lists.size(); i++) {
192     // check for null entries
193     if(_lists.get(i) != null) {
194 tdb 1.7 status += "queue"+i+"=\""+((LinkedList) _lists.get(i)).size()+"\" ";
195 tdb 1.3 }
196     else {
197 tdb 1.8 status += "queue"+i+"=\"[deleted]\" ";
198 tdb 1.3 }
199 tdb 1.2 }
200 tdb 1.9 status += "total=\""+_count+"\"";
201     if(_maxSize != -1) {
202     status += " maxSize=\""+_maxSize+"\"";
203     }
204     status += "></queue>";
205 tdb 1.1 return status;
206     }
207    
208 tdb 1.2 /**
209 tdb 1.4 * Returns the size of a given queue. A consumer can use
210     * this to see how big their queue is at any given time.
211     * they should use their queue number as the parameter.
212     *
213     * @param queue The queue number to query.
214     * @return the current size of the queue.
215 tdb 1.6 * @throws InvalidQueueException if the queue does not exist.
216 tdb 1.4 */
217     public int queueSize(int queue) throws InvalidQueueException {
218 tdb 1.5 if (queue >= _lists.size() || _lists.get(queue) == null) {
219 tdb 1.4 throw new InvalidQueueException("Requested queue "+queue+" does not exist");
220     }
221 tdb 1.5 return ((LinkedList) _lists.get(queue)).size();
222 tdb 1.4 }
223    
224     /**
225     * Returns the total numer of elements to have passed
226     * through this queue (ie. a counter on the add method).
227     *
228 tdb 1.6 * @return the element-ometer.
229 tdb 1.4 */
230     public int elementCount() {
231 tdb 1.5 return _count;
232 tdb 1.4 }
233    
234     /**
235 tdb 1.2 * This method assigns a queue to a consumer. The idea behind
236     * this is to ensure that only 1 consumer can be associated with
237     * a given queue, otherwise the whole "blocking" thing fails
238 tdb 1.3 * miserably. Queues are created upon requested.
239     *
240     * It is IMPORTANT that removeQueue() is used when the queue is
241     * no longer required.
242 tdb 1.2 *
243     * @return An integer to be passed to the get() method.
244     */
245 tdb 1.3 public int getQueue() {
246     int pos = -1;
247     for(int i=0; i < _lists.size(); i++) {
248     if(_lists.get(i) == null) {
249     // found a gap, re-use it
250     pos = i;
251     _lists.set(i, new LinkedList());
252     }
253 tdb 1.2 }
254 tdb 1.3 if(pos == -1) {
255     //we didn't find a gap, add at end
256     pos = _lists.size();
257     _lists.add(pos, new LinkedList());
258 tdb 1.2 }
259 tdb 1.3 return pos;
260 tdb 1.2 }
261 tdb 1.3
262     /**
263     * This method sets a entry to null in the list. This ensures
264     * that it will no longer be added to after it is no longer
265     * required be a consumer.
266     *
267     * @param queue The integer identifier for the queue, given by getQueue().
268     */
269     public void removeQueue(int queue) {
270     _lists.set(queue, null);
271     }
272    
273 tdb 1.2 /**
274 tdb 1.7 * Start a monitor on our own Queue. This will log XML
275     * statistics about our Queue to a given Queue (could be
276     * the one being monitored).
277     *
278     * @param interval The long interval, in milliseconds, at which to take samples
279     * @param destQueue The queue to monitor to
280     * @param name A name to identify this Queue with
281     * @return whether we succeeded
282     */
283     public boolean startMonitor(long interval, Queue destQueue, String name) {
284     if(_queueMon == null) {
285     // start a monitor
286     _queueMon = new QueueMonitor(this, destQueue, interval, name);
287     _queueMon.start();
288     return true;
289     }
290     else {
291     // already have a monitor running
292     return false;
293     }
294     }
295    
296     /**
297     * Start a monitor on our own Queue. This will log XML
298     * statistics about our Queue to this Queue.
299     *
300     * @param interval The long interval, in milliseconds, at which to take samples
301     * @param name A name to identify this Queue with
302     * @return whether we succeeded
303     */
304     public boolean startMonitor(long interval, String name) {
305     return startMonitor(interval, this, name);
306     }
307    
308     /**
309     * Stop a monitor on our Queue if we have on running.
310     *
311     * @return whether we succeeded
312     */
313     public boolean stopMonitor() {
314     if(_queueMon != null) {
315     // stop a monitor
316     _queueMon.shutdown();
317     _queueMon = null;
318     return true;
319     }
320     else {
321     // no monitor running
322     return false;
323     }
324     }
325    
326     /**
327 tdb 1.2 * Overrides the {@link java.lang.Object#toString() Object.toString()}
328     * method to provide clean logging (every class should have this).
329     *
330     * This uses the uk.ac.ukc.iscream.util.FormatName class
331     * to format the toString()
332     *
333 tdb 1.6 * @return the name of this class and its CVS revision.
334 tdb 1.2 */
335 tdb 1.4 public String toString() {
336 tdb 1.2 return FormatName.getName(
337     _name,
338     getClass().getName(),
339     REVISION);
340 tdb 1.4 }
341 tdb 1.2
342     //---PRIVATE METHODS---
343 tdb 1.9
344     /**
345     * This method removes an item from a Queue, using a method
346     * requested at construction.
347     *
348     * @param list The LinkedList from which to remove an item.
349     */
350     private void removeQueueItem(LinkedList list) {
351     // look at our algorithm
352     // remove a random item from the list
353     if(_removeAlgorithm==RANDOM) {
354     // new Random, with a good seed
355     Random rand = new Random(System.currentTimeMillis());
356     int i = rand.nextInt(_maxSize);
357     synchronized(this) {
358     list.remove(i);
359     }
360     }
361     // remove the first item from the list
362     else if(_removeAlgorithm==FIRST) {
363     synchronized(this) {
364     list.removeFirst();
365     }
366     }
367     // remove the last item from the list
368     else if(_removeAlgorithm==LAST) {
369     synchronized(this) {
370     list.removeLast();
371     }
372     }
373     }
374    
375 tdb 1.2 //---ACCESSOR/MUTATOR METHODS---
376    
377     //---ATTRIBUTES---
378    
379     /**
380 tdb 1.3 * The LinkedLists of queues.
381 tdb 1.2 */
382 tdb 1.3 private LinkedList _lists = new LinkedList();
383 tdb 1.2
384     /**
385     * A counter so we know how many data items have been
386     * passed through, for statistical purposes.
387     */
388     private int _count = 0;
389 tdb 1.7
390     /**
391     * A reference to our QueueMonitor, if we have one.
392     */
393     private QueueMonitor _queueMon = null;
394 tdb 1.9
395     /**
396     * The maximum size of any Queue.
397     */
398     private int _maxSize = -1;
399    
400     /**
401     * The remove algorithm to use upon a Queue reaching
402     * it's maximum size.
403     */
404     private int _removeAlgorithm = -1;
405 tdb 1.2
406     /**
407     * This is the friendly identifier of the
408     * component this class is running in.
409     * eg, a Filter may be called "filter1",
410     * If this class does not have an owning
411     * component, a name from the configuration
412     * can be placed here. This name could also
413     * be changed to null for utility classes.
414     */
415 tdb 1.4 private String _name = null;
416 tdb 1.2
417     //---STATIC ATTRIBUTES---
418    
419     }