ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/cms/source/util/uk/org/iscream/cms/util/Queue.java
Revision: 1.25
Committed: Tue May 21 16:47:19 2002 UTC (21 years, 11 months ago) by tdb
Branch: MAIN
Changes since 1.24: +3 -2 lines
Log Message:
Added URL to GPL headers.

File Contents

# User Rev Content
1 tdb 1.24 /*
2     * i-scream central monitoring system
3 tdb 1.25 * http://www.i-scream.org.uk
4 tdb 1.24 * Copyright (C) 2000-2002 i-scream
5     *
6     * This program is free software; you can redistribute it and/or
7     * modify it under the terms of the GNU General Public License
8     * as published by the Free Software Foundation; either version 2
9     * of the License, or (at your option) any later version.
10     *
11     * This program is distributed in the hope that it will be useful,
12     * but WITHOUT ANY WARRANTY; without even the implied warranty of
13     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14     * GNU General Public License for more details.
15     *
16     * You should have received a copy of the GNU General Public License
17     * along with this program; if not, write to the Free Software
18     * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
19     */
20    
21 tdb 1.1 //---PACKAGE DECLARATION---
22 tdb 1.22 package uk.org.iscream.cms.server.util;
23 tdb 1.1
24     //---IMPORTS---
25     import java.util.LinkedList;
26     import java.util.NoSuchElementException;
27 tdb 1.14 import java.util.Random;
28 tdb 1.22 import uk.org.iscream.cms.server.util.*;
29 tdb 1.1
30     /**
31     * A Queue class designed to operate in a multi-threaded environment, with
32     * added support for multiple "consumer" threads. Also offers blocking on
33     * the get() methods, which ensures the consumer waits until the queue
34     * actually contains some elements.
35     *
36 tdb 1.24 * @author $Author: tdb $
37 tdb 1.25 * @version $Id: Queue.java,v 1.24 2002/05/18 18:16:03 tdb Exp $
38 tdb 1.1 */
39 tdb 1.2 public class Queue {
40 tdb 1.1
41     //---FINAL ATTRIBUTES---
42    
43     /**
44     * The current CVS revision of this class
45     */
46 tdb 1.25 public static final String REVISION = "$Revision: 1.24 $";
47 tdb 1.14
48     /**
49     * Pass to constructor to remove a RANDOM item from
50     * the Queue upon reaching the maximum limit.
51     */
52     public static final int RANDOM = 0;
53    
54     /**
55     * Pass to constructor to remove the FIRST item from
56     * the Queue upon reaching the maximum limit.
57     */
58     public static final int FIRST = 1;
59    
60     /**
61     * Pass to constructor to remove the LAST item from
62     * the Queue upon reaching the maximum limit.
63     */
64     public static final int LAST = 2;
65    
66     /**
67     * Pass to constructor to drop the new item upon reaching
68     * the maximum Queue limit.
69     */
70     public static final int DROP = 3;
71 tdb 1.17
72     /**
73     * To allow opposite lookups.
74     */
75     public static final String[] algorithms = {"RANDOM", "FIRST", "LAST", "DROP"};
76 tdb 1.1
77     //---STATIC METHODS---
78    
79     //---CONSTRUCTORS---
80 tdb 1.14
81     /**
82     * Constructs a new Queue with a maximum size limit on
83     * any individual queue. This should be used to stop
84     * conditions where the Queue cannot be guaranteed to
85     * be emptied as quick as it's filled.
86     *
87     * An algorithm will be used to remove data when new data
88     * arrives. There may be choices of algorithms later on.
89     *
90     * @param maxSize the upper limit for a queue
91     * @param removeAlgorithm the remove algorithm to use upon reaching the maxSize
92     */
93     public Queue(int maxSize, int removeAlgorithm) {
94     _maxSize = maxSize;
95     _removeAlgorithm = removeAlgorithm;
96     }
97    
98     /**
99     * Constructs a Queue with no maximum size.
100     */
101     public Queue() {
102     _maxSize = -1;
103     }
104 tdb 1.1
105     //---PUBLIC METHODS---
106    
107     /**
108     * This method adds a given object to every queue. It will notify
109     * any waiting consumers (on an empty queue) during this process.
110     *
111     * @param o An Object to be added to the queues.
112     */
113     public void add(Object o) {
114     for(int i=0; i < _lists.size(); i++) {
115     // skip over any gaps left in the list
116     if(_lists.get(i) != null) {
117 tdb 1.21 synchronized(((LinkedList) _lists.get(i))) {
118     // get size before adding to the Queue
119     int s = ((LinkedList) _lists.get(i)).size();
120     // check whether we need to remove an item from the current Queue
121     if(_maxSize!=-1 && s>=_maxSize && _removeAlgorithm!=DROP) {
122     // we need to remove an item
123     removeQueueItem((LinkedList) _lists.get(i));
124     }
125     // check if we should add (not if Queue full, and using DROP algorithm)
126     if(!(s>=_maxSize && _removeAlgorithm==DROP)) {
127     // add the next item, ensuring we lock
128     synchronized(this) {
129     // LinkedList.add() does the same thing, but this ensures behaviour
130     ((LinkedList) _lists.get(i)).addLast(o);
131     }
132     // if the queue was empty before the add it is possible
133     // that a consumer is waiting... so we notify them
134     //synchronized(((LinkedList) _lists.get(i))) {
135     ((LinkedList) _lists.get(i)).notifyAll();
136     //}
137 tdb 1.14 }
138 tdb 1.1 }
139     }
140     }
141     // we keep track of the total additions for the status() method
142     _count++;
143     }
144    
145     /**
146     * This method returns an object from the front of a given queue.
147     * It will block until data exists in the queue if required.
148     *
149 tdb 1.8 * @param The queue to retrieve data from.
150 tdb 1.1 * @return The object from the front of the queue.
151     * @throws InvalidQueueException if the queue does not exist.
152     */
153     public Object get(int queue) throws InvalidQueueException {
154     // make sure queue exists
155     if (queue >= _lists.size() || _lists.get(queue) == null) {
156     throw new InvalidQueueException("Requested queue "+queue+" does not exist");
157     }
158     // block if the queue is empty
159 tdb 1.18 synchronized(((LinkedList) _lists.get(queue))) {
160     if (((LinkedList) _lists.get(queue)).size() == 0) {
161 tdb 1.1 try { ((LinkedList) _lists.get(queue)).wait(); } catch(Exception e) {}
162     }
163     }
164     // get an item, it should never be null due to the blocking above
165     Object o = null;
166     synchronized(this) {
167     try {
168     o = ((LinkedList) _lists.get(queue)).removeFirst();
169     }
170     catch (NoSuchElementException e) {
171     // This should never happen !
172     }
173     }
174     return o;
175     }
176 tdb 1.6
177     /**
178     * This method releases a get() method that's currently
179     * waiting on an empty queue. This was designed for
180     * shutdown() type methods that may have problems closing
181     * if the thread of control is waiting on a queue.
182     *
183 tdb 1.8 * @param queue the queue to release.
184 tdb 1.6 */
185     public void releaseQueue(int queue) {
186     synchronized(((LinkedList) _lists.get(queue))) {
187     ((LinkedList) _lists.get(queue)).notifyAll();
188 tdb 1.7 }
189     }
190    
191     /**
192     * This method erases the contents of a given queue. This
193     * method should be used with care. It can only empty one
194     * internal queue, not all of them. This must be called
195     * multiple times to empty all queues.
196     *
197     * @param queue the queue to empty.
198     */
199     public void clearQueue(int queue) {
200     synchronized(this) {
201     ((LinkedList) _lists.get(queue)).clear();
202 tdb 1.6 }
203     }
204    
205 tdb 1.1 /**
206 tdb 1.9 * This method returns an XML textual status of the queues.
207     * It is merely for observation, and would most likely be
208     * used by a larger "monitoring" component. Information
209     * returned includes the current size of each queue, and
210     * the total items passed through.
211 tdb 1.1 *
212 tdb 1.9 * @return A String message containing status information in XML format
213 tdb 1.1 */
214 tdb 1.9 public String xmlStatus() {
215     String status = "<queue ";
216 tdb 1.1 for(int i=0; i < _lists.size(); i++) {
217     // check for null entries
218     if(_lists.get(i) != null) {
219 tdb 1.9 status += "queue"+i+"=\""+((LinkedList) _lists.get(i)).size()+"\" ";
220 tdb 1.1 }
221     else {
222 tdb 1.13 status += "queue"+i+"=\"[deleted]\" ";
223 tdb 1.1 }
224     }
225 tdb 1.14 status += "total=\""+_count+"\"";
226     if(_maxSize != -1) {
227     status += " maxSize=\""+_maxSize+"\"";
228     }
229 tdb 1.15 status += "></queue>";
230 tdb 1.1 return status;
231 tdb 1.4 }
232    
233     /**
234     * Returns the size of a given queue. A consumer can use
235     * this to see how big their queue is at any given time.
236     * they should use their queue number as the parameter.
237     *
238     * @param queue The queue number to query.
239     * @return the current size of the queue.
240 tdb 1.8 * @throws InvalidQueueException if the queue does not exist.
241 tdb 1.4 */
242     public int queueSize(int queue) throws InvalidQueueException {
243 tdb 1.5 if (queue >= _lists.size() || _lists.get(queue) == null) {
244 tdb 1.4 throw new InvalidQueueException("Requested queue "+queue+" does not exist");
245     }
246 tdb 1.5 return ((LinkedList) _lists.get(queue)).size();
247 tdb 1.4 }
248    
249     /**
250     * Returns the total numer of elements to have passed
251 tdb 1.23 * through this queue. This is essentially a counter
252     * on the add method.
253 tdb 1.4 *
254 tdb 1.8 * @return the element-ometer.
255 tdb 1.4 */
256     public int elementCount() {
257 tdb 1.5 return _count;
258 tdb 1.1 }
259    
260     /**
261     * This method assigns a queue to a consumer. The idea behind
262     * this is to ensure that only 1 consumer can be associated with
263     * a given queue, otherwise the whole "blocking" thing fails
264     * miserably. Queues are created upon requested.
265     *
266     * It is IMPORTANT that removeQueue() is used when the queue is
267     * no longer required.
268     *
269     * @return An integer to be passed to the get() method.
270     */
271 tdb 1.19 public synchronized int getQueue() {
272 tdb 1.1 int pos = -1;
273     for(int i=0; i < _lists.size(); i++) {
274     if(_lists.get(i) == null) {
275     // found a gap, re-use it
276     pos = i;
277     _lists.set(i, new LinkedList());
278     }
279     }
280     if(pos == -1) {
281     //we didn't find a gap, add at end
282     pos = _lists.size();
283     _lists.add(pos, new LinkedList());
284     }
285     return pos;
286     }
287    
288     /**
289     * This method sets a entry to null in the list. This ensures
290     * that it will no longer be added to after it is no longer
291     * required be a consumer.
292     *
293     * @param queue The integer identifier for the queue, given by getQueue().
294     */
295 tdb 1.19 public synchronized void removeQueue(int queue) {
296 tdb 1.1 _lists.set(queue, null);
297     }
298    
299     /**
300 tdb 1.9 * Start a monitor on our own Queue. This will log XML
301     * statistics about our Queue to a given Queue (could be
302     * the one being monitored).
303     *
304     * @param interval The long interval, in milliseconds, at which to take samples
305     * @param destQueue The queue to monitor to
306     * @param name A name to identify this Queue with
307     * @return whether we succeeded
308     */
309     public boolean startMonitor(long interval, Queue destQueue, String name) {
310     if(_queueMon == null) {
311     // start a monitor
312     _queueMon = new QueueMonitor(this, destQueue, interval, name);
313     _queueMon.start();
314     return true;
315     }
316     else {
317     // already have a monitor running
318     return false;
319     }
320     }
321    
322     /**
323     * Start a monitor on our own Queue. This will log XML
324     * statistics about our Queue to this Queue.
325     *
326     * @param interval The long interval, in milliseconds, at which to take samples
327     * @param name A name to identify this Queue with
328     * @return whether we succeeded
329     */
330     public boolean startMonitor(long interval, String name) {
331     return startMonitor(interval, this, name);
332     }
333    
334     /**
335     * Stop a monitor on our Queue if we have on running.
336     *
337     * @return whether we succeeded
338     */
339     public boolean stopMonitor() {
340     if(_queueMon != null) {
341     // stop a monitor
342     _queueMon.shutdown();
343     _queueMon = null;
344     return true;
345     }
346     else {
347     // no monitor running
348     return false;
349     }
350     }
351    
352     /**
353 tdb 1.1 * Overrides the {@link java.lang.Object#toString() Object.toString()}
354     * method to provide clean logging (every class should have this).
355     *
356 tdb 1.22 * This uses the uk.org.iscream.cms.server.util.FormatName class
357 tdb 1.1 * to format the toString()
358     *
359 tdb 1.8 * @return the name of this class and its CVS revision.
360 tdb 1.1 */
361 tdb 1.3 public String toString() {
362 tdb 1.1 return FormatName.getName(
363     _name,
364     getClass().getName(),
365     REVISION);
366 tdb 1.3 }
367 tdb 1.1
368     //---PRIVATE METHODS---
369 tdb 1.14
370     /**
371     * This method removes an item from a Queue, using a method
372     * requested at construction.
373     *
374     * @param list The LinkedList from which to remove an item.
375     */
376     private void removeQueueItem(LinkedList list) {
377     // look at our algorithm
378     // remove a random item from the list
379     if(_removeAlgorithm==RANDOM) {
380     // new Random, with a good seed
381     Random rand = new Random(System.currentTimeMillis());
382     int i = rand.nextInt(_maxSize);
383     synchronized(this) {
384     list.remove(i);
385     }
386     }
387     // remove the first item from the list
388     else if(_removeAlgorithm==FIRST) {
389     synchronized(this) {
390     list.removeFirst();
391     }
392     }
393     // remove the last item from the list
394     else if(_removeAlgorithm==LAST) {
395     synchronized(this) {
396     list.removeLast();
397     }
398     }
399     }
400    
401 tdb 1.1 //---ACCESSOR/MUTATOR METHODS---
402    
403     //---ATTRIBUTES---
404    
405     /**
406     * The LinkedLists of queues.
407     */
408     private LinkedList _lists = new LinkedList();
409    
410     /**
411     * A counter so we know how many data items have been
412     * passed through, for statistical purposes.
413     */
414     private int _count = 0;
415 tdb 1.9
416     /**
417     * A reference to our QueueMonitor, if we have one.
418     */
419     private QueueMonitor _queueMon = null;
420 tdb 1.14
421     /**
422     * The maximum size of any Queue.
423     */
424     private int _maxSize = -1;
425    
426     /**
427     * The remove algorithm to use upon a Queue reaching
428     * it's maximum size.
429     */
430     private int _removeAlgorithm = -1;
431 tdb 1.1
432     /**
433     * This is the friendly identifier of the
434     * component this class is running in.
435     * eg, a Filter may be called "filter1",
436     * If this class does not have an owning
437     * component, a name from the configuration
438     * can be placed here. This name could also
439     * be changed to null for utility classes.
440     */
441 tdb 1.3 private String _name = null;
442 tdb 1.1
443     //---STATIC ATTRIBUTES---
444    
445     }