ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/cms/source/util/uk/org/iscream/cms/util/Queue.java
Revision: 1.26
Committed: Wed Feb 5 14:27:59 2003 UTC (21 years, 2 months ago) by tdb
Branch: MAIN
Changes since 1.25: +4 -4 lines
Log Message:
Util package has been pulled out of the server. Next step will be to modify
the server and conient (and anything else?) to use this instead. New
package name is uk.org.iscream.cms.util. All the java files were moved with
a repo copy, so they retain their history.

File Contents

# User Rev Content
1 tdb 1.24 /*
2     * i-scream central monitoring system
3 tdb 1.25 * http://www.i-scream.org.uk
4 tdb 1.24 * Copyright (C) 2000-2002 i-scream
5     *
6     * This program is free software; you can redistribute it and/or
7     * modify it under the terms of the GNU General Public License
8     * as published by the Free Software Foundation; either version 2
9     * of the License, or (at your option) any later version.
10     *
11     * This program is distributed in the hope that it will be useful,
12     * but WITHOUT ANY WARRANTY; without even the implied warranty of
13     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14     * GNU General Public License for more details.
15     *
16     * You should have received a copy of the GNU General Public License
17     * along with this program; if not, write to the Free Software
18     * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
19     */
20    
21 tdb 1.1 //---PACKAGE DECLARATION---
22 tdb 1.26 package uk.org.iscream.cms.util;
23 tdb 1.1
24     //---IMPORTS---
25     import java.util.LinkedList;
26     import java.util.NoSuchElementException;
27 tdb 1.14 import java.util.Random;
28 tdb 1.26 //import uk.org.iscream.cms.util.*;
29 tdb 1.1
30     /**
31     * A Queue class designed to operate in a multi-threaded environment, with
32     * added support for multiple "consumer" threads. Also offers blocking on
33     * the get() methods, which ensures the consumer waits until the queue
34     * actually contains some elements.
35     *
36 tdb 1.24 * @author $Author: tdb $
37 tdb 1.26 * @version $Id: Queue.java,v 1.25 2002/05/21 16:47:19 tdb Exp $
38 tdb 1.1 */
39 tdb 1.2 public class Queue {
40 tdb 1.1
41     //---FINAL ATTRIBUTES---
42    
43     /**
44     * The current CVS revision of this class
45     */
46 tdb 1.26 public static final String REVISION = "$Revision: 1.25 $";
47 tdb 1.14
48     /**
49     * Pass to constructor to remove a RANDOM item from
50     * the Queue upon reaching the maximum limit.
51     */
52     public static final int RANDOM = 0;
53    
54     /**
55     * Pass to constructor to remove the FIRST item from
56     * the Queue upon reaching the maximum limit.
57     */
58     public static final int FIRST = 1;
59    
60     /**
61     * Pass to constructor to remove the LAST item from
62     * the Queue upon reaching the maximum limit.
63     */
64     public static final int LAST = 2;
65    
66     /**
67     * Pass to constructor to drop the new item upon reaching
68     * the maximum Queue limit.
69     */
70     public static final int DROP = 3;
71 tdb 1.17
72     /**
73     * To allow opposite lookups.
74     */
75     public static final String[] algorithms = {"RANDOM", "FIRST", "LAST", "DROP"};
76 tdb 1.1
77     //---STATIC METHODS---
78    
79     //---CONSTRUCTORS---
80 tdb 1.14
81     /**
82     * Constructs a new Queue with a maximum size limit on
83     * any individual queue. This should be used to stop
84     * conditions where the Queue cannot be guaranteed to
85     * be emptied as quick as it's filled.
86     *
87     * An algorithm will be used to remove data when new data
88     * arrives. There may be choices of algorithms later on.
89     *
90     * @param maxSize the upper limit for a queue
91     * @param removeAlgorithm the remove algorithm to use upon reaching the maxSize
92     */
93     public Queue(int maxSize, int removeAlgorithm) {
94     _maxSize = maxSize;
95     _removeAlgorithm = removeAlgorithm;
96     }
97    
98     /**
99     * Constructs a Queue with no maximum size.
100     */
101     public Queue() {
102     _maxSize = -1;
103     }
104 tdb 1.1
105     //---PUBLIC METHODS---
106    
107     /**
108     * This method adds a given object to every queue. It will notify
109     * any waiting consumers (on an empty queue) during this process.
110     *
111     * @param o An Object to be added to the queues.
112     */
113     public void add(Object o) {
114     for(int i=0; i < _lists.size(); i++) {
115     // skip over any gaps left in the list
116     if(_lists.get(i) != null) {
117 tdb 1.21 synchronized(((LinkedList) _lists.get(i))) {
118     // get size before adding to the Queue
119     int s = ((LinkedList) _lists.get(i)).size();
120     // check whether we need to remove an item from the current Queue
121     if(_maxSize!=-1 && s>=_maxSize && _removeAlgorithm!=DROP) {
122     // we need to remove an item
123     removeQueueItem((LinkedList) _lists.get(i));
124     }
125     // check if we should add (not if Queue full, and using DROP algorithm)
126     if(!(s>=_maxSize && _removeAlgorithm==DROP)) {
127     // add the next item, ensuring we lock
128     synchronized(this) {
129     // LinkedList.add() does the same thing, but this ensures behaviour
130     ((LinkedList) _lists.get(i)).addLast(o);
131     }
132     // if the queue was empty before the add it is possible
133     // that a consumer is waiting... so we notify them
134     //synchronized(((LinkedList) _lists.get(i))) {
135     ((LinkedList) _lists.get(i)).notifyAll();
136     //}
137 tdb 1.14 }
138 tdb 1.1 }
139     }
140     }
141     // we keep track of the total additions for the status() method
142     _count++;
143     }
144    
145     /**
146     * This method returns an object from the front of a given queue.
147     * It will block until data exists in the queue if required.
148     *
149 tdb 1.8 * @param The queue to retrieve data from.
150 tdb 1.1 * @return The object from the front of the queue.
151     * @throws InvalidQueueException if the queue does not exist.
152     */
153     public Object get(int queue) throws InvalidQueueException {
154     // make sure queue exists
155     if (queue >= _lists.size() || _lists.get(queue) == null) {
156     throw new InvalidQueueException("Requested queue "+queue+" does not exist");
157     }
158     // block if the queue is empty
159 tdb 1.18 synchronized(((LinkedList) _lists.get(queue))) {
160     if (((LinkedList) _lists.get(queue)).size() == 0) {
161 tdb 1.1 try { ((LinkedList) _lists.get(queue)).wait(); } catch(Exception e) {}
162     }
163     }
164     // get an item, it should never be null due to the blocking above
165     Object o = null;
166     synchronized(this) {
167     try {
168     o = ((LinkedList) _lists.get(queue)).removeFirst();
169     }
170     catch (NoSuchElementException e) {
171     // This should never happen !
172     }
173     }
174     return o;
175     }
176 tdb 1.6
177     /**
178     * This method releases a get() method that's currently
179     * waiting on an empty queue. This was designed for
180     * shutdown() type methods that may have problems closing
181     * if the thread of control is waiting on a queue.
182     *
183 tdb 1.8 * @param queue the queue to release.
184 tdb 1.6 */
185     public void releaseQueue(int queue) {
186     synchronized(((LinkedList) _lists.get(queue))) {
187     ((LinkedList) _lists.get(queue)).notifyAll();
188 tdb 1.7 }
189     }
190    
191     /**
192     * This method erases the contents of a given queue. This
193     * method should be used with care. It can only empty one
194     * internal queue, not all of them. This must be called
195     * multiple times to empty all queues.
196     *
197     * @param queue the queue to empty.
198     */
199     public void clearQueue(int queue) {
200     synchronized(this) {
201     ((LinkedList) _lists.get(queue)).clear();
202 tdb 1.6 }
203     }
204    
205 tdb 1.1 /**
206 tdb 1.9 * This method returns an XML textual status of the queues.
207     * It is merely for observation, and would most likely be
208     * used by a larger "monitoring" component. Information
209     * returned includes the current size of each queue, and
210     * the total items passed through.
211 tdb 1.1 *
212 tdb 1.9 * @return A String message containing status information in XML format
213 tdb 1.1 */
214 tdb 1.9 public String xmlStatus() {
215     String status = "<queue ";
216 tdb 1.1 for(int i=0; i < _lists.size(); i++) {
217     // check for null entries
218     if(_lists.get(i) != null) {
219 tdb 1.9 status += "queue"+i+"=\""+((LinkedList) _lists.get(i)).size()+"\" ";
220 tdb 1.1 }
221     else {
222 tdb 1.13 status += "queue"+i+"=\"[deleted]\" ";
223 tdb 1.1 }
224     }
225 tdb 1.14 status += "total=\""+_count+"\"";
226     if(_maxSize != -1) {
227     status += " maxSize=\""+_maxSize+"\"";
228     }
229 tdb 1.15 status += "></queue>";
230 tdb 1.1 return status;
231 tdb 1.4 }
232    
233     /**
234     * Returns the size of a given queue. A consumer can use
235     * this to see how big their queue is at any given time.
236     * they should use their queue number as the parameter.
237     *
238     * @param queue The queue number to query.
239     * @return the current size of the queue.
240 tdb 1.8 * @throws InvalidQueueException if the queue does not exist.
241 tdb 1.4 */
242     public int queueSize(int queue) throws InvalidQueueException {
243 tdb 1.5 if (queue >= _lists.size() || _lists.get(queue) == null) {
244 tdb 1.4 throw new InvalidQueueException("Requested queue "+queue+" does not exist");
245     }
246 tdb 1.5 return ((LinkedList) _lists.get(queue)).size();
247 tdb 1.4 }
248    
249     /**
250     * Returns the total numer of elements to have passed
251 tdb 1.23 * through this queue. This is essentially a counter
252     * on the add method.
253 tdb 1.4 *
254 tdb 1.8 * @return the element-ometer.
255 tdb 1.4 */
256     public int elementCount() {
257 tdb 1.5 return _count;
258 tdb 1.1 }
259    
260     /**
261     * This method assigns a queue to a consumer. The idea behind
262     * this is to ensure that only 1 consumer can be associated with
263     * a given queue, otherwise the whole "blocking" thing fails
264     * miserably. Queues are created upon requested.
265     *
266     * It is IMPORTANT that removeQueue() is used when the queue is
267     * no longer required.
268     *
269     * @return An integer to be passed to the get() method.
270     */
271 tdb 1.19 public synchronized int getQueue() {
272 tdb 1.1 int pos = -1;
273     for(int i=0; i < _lists.size(); i++) {
274     if(_lists.get(i) == null) {
275     // found a gap, re-use it
276     pos = i;
277     _lists.set(i, new LinkedList());
278     }
279     }
280     if(pos == -1) {
281     //we didn't find a gap, add at end
282     pos = _lists.size();
283     _lists.add(pos, new LinkedList());
284     }
285     return pos;
286     }
287    
288     /**
289     * This method sets a entry to null in the list. This ensures
290     * that it will no longer be added to after it is no longer
291     * required be a consumer.
292     *
293     * @param queue The integer identifier for the queue, given by getQueue().
294     */
295 tdb 1.19 public synchronized void removeQueue(int queue) {
296 tdb 1.1 _lists.set(queue, null);
297     }
298    
299     /**
300 tdb 1.9 * Start a monitor on our own Queue. This will log XML
301     * statistics about our Queue to a given Queue (could be
302     * the one being monitored).
303     *
304     * @param interval The long interval, in milliseconds, at which to take samples
305     * @param destQueue The queue to monitor to
306     * @param name A name to identify this Queue with
307     * @return whether we succeeded
308     */
309     public boolean startMonitor(long interval, Queue destQueue, String name) {
310     if(_queueMon == null) {
311     // start a monitor
312     _queueMon = new QueueMonitor(this, destQueue, interval, name);
313     _queueMon.start();
314     return true;
315     }
316     else {
317     // already have a monitor running
318     return false;
319     }
320     }
321    
322     /**
323     * Start a monitor on our own Queue. This will log XML
324     * statistics about our Queue to this Queue.
325     *
326     * @param interval The long interval, in milliseconds, at which to take samples
327     * @param name A name to identify this Queue with
328     * @return whether we succeeded
329     */
330     public boolean startMonitor(long interval, String name) {
331     return startMonitor(interval, this, name);
332     }
333    
334     /**
335     * Stop a monitor on our Queue if we have on running.
336     *
337     * @return whether we succeeded
338     */
339     public boolean stopMonitor() {
340     if(_queueMon != null) {
341     // stop a monitor
342     _queueMon.shutdown();
343     _queueMon = null;
344     return true;
345     }
346     else {
347     // no monitor running
348     return false;
349     }
350     }
351    
352     /**
353 tdb 1.1 * Overrides the {@link java.lang.Object#toString() Object.toString()}
354     * method to provide clean logging (every class should have this).
355     *
356 tdb 1.22 * This uses the uk.org.iscream.cms.server.util.FormatName class
357 tdb 1.1 * to format the toString()
358     *
359 tdb 1.8 * @return the name of this class and its CVS revision.
360 tdb 1.1 */
361 tdb 1.3 public String toString() {
362 tdb 1.1 return FormatName.getName(
363     _name,
364     getClass().getName(),
365     REVISION);
366 tdb 1.3 }
367 tdb 1.1
368     //---PRIVATE METHODS---
369 tdb 1.14
370     /**
371     * This method removes an item from a Queue, using a method
372     * requested at construction.
373     *
374     * @param list The LinkedList from which to remove an item.
375     */
376     private void removeQueueItem(LinkedList list) {
377     // look at our algorithm
378     // remove a random item from the list
379     if(_removeAlgorithm==RANDOM) {
380     // new Random, with a good seed
381     Random rand = new Random(System.currentTimeMillis());
382     int i = rand.nextInt(_maxSize);
383     synchronized(this) {
384     list.remove(i);
385     }
386     }
387     // remove the first item from the list
388     else if(_removeAlgorithm==FIRST) {
389     synchronized(this) {
390     list.removeFirst();
391     }
392     }
393     // remove the last item from the list
394     else if(_removeAlgorithm==LAST) {
395     synchronized(this) {
396     list.removeLast();
397     }
398     }
399     }
400    
401 tdb 1.1 //---ACCESSOR/MUTATOR METHODS---
402    
403     //---ATTRIBUTES---
404    
405     /**
406     * The LinkedLists of queues.
407     */
408     private LinkedList _lists = new LinkedList();
409    
410     /**
411     * A counter so we know how many data items have been
412     * passed through, for statistical purposes.
413     */
414     private int _count = 0;
415 tdb 1.9
416     /**
417     * A reference to our QueueMonitor, if we have one.
418     */
419     private QueueMonitor _queueMon = null;
420 tdb 1.14
421     /**
422     * The maximum size of any Queue.
423     */
424     private int _maxSize = -1;
425    
426     /**
427     * The remove algorithm to use upon a Queue reaching
428     * it's maximum size.
429     */
430     private int _removeAlgorithm = -1;
431 tdb 1.1
432     /**
433     * This is the friendly identifier of the
434     * component this class is running in.
435     * eg, a Filter may be called "filter1",
436     * If this class does not have an owning
437     * component, a name from the configuration
438     * can be placed here. This name could also
439     * be changed to null for utility classes.
440     */
441 tdb 1.3 private String _name = null;
442 tdb 1.1
443     //---STATIC ATTRIBUTES---
444    
445     }