ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/cms/source/util/uk/org/iscream/cms/util/Queue.java
Revision: 1.27
Committed: Sun Aug 1 10:41:08 2004 UTC (19 years, 8 months ago) by tdb
Branch: MAIN
CVS Tags: HEAD
Changes since 1.26: +3 -3 lines
Log Message:
Catch a lot of old URL's and update them. Also remove a couple of old files
that aren't used.

File Contents

# Content
1 /*
2 * i-scream central monitoring system
3 * http://www.i-scream.org
4 * Copyright (C) 2000-2002 i-scream
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version 2
9 * of the License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
19 */
20
21 //---PACKAGE DECLARATION---
22 package uk.org.iscream.cms.util;
23
24 //---IMPORTS---
25 import java.util.LinkedList;
26 import java.util.NoSuchElementException;
27 import java.util.Random;
28 //import uk.org.iscream.cms.util.*;
29
30 /**
31 * A Queue class designed to operate in a multi-threaded environment, with
32 * added support for multiple "consumer" threads. Also offers blocking on
33 * the get() methods, which ensures the consumer waits until the queue
34 * actually contains some elements.
35 *
36 * @author $Author: tdb $
37 * @version $Id: Queue.java,v 1.26 2003/02/05 14:27:59 tdb Exp $
38 */
39 public class Queue {
40
41 //---FINAL ATTRIBUTES---
42
43 /**
44 * The current CVS revision of this class
45 */
46 public static final String REVISION = "$Revision: 1.26 $";
47
48 /**
49 * Pass to constructor to remove a RANDOM item from
50 * the Queue upon reaching the maximum limit.
51 */
52 public static final int RANDOM = 0;
53
54 /**
55 * Pass to constructor to remove the FIRST item from
56 * the Queue upon reaching the maximum limit.
57 */
58 public static final int FIRST = 1;
59
60 /**
61 * Pass to constructor to remove the LAST item from
62 * the Queue upon reaching the maximum limit.
63 */
64 public static final int LAST = 2;
65
66 /**
67 * Pass to constructor to drop the new item upon reaching
68 * the maximum Queue limit.
69 */
70 public static final int DROP = 3;
71
72 /**
73 * To allow opposite lookups.
74 */
75 public static final String[] algorithms = {"RANDOM", "FIRST", "LAST", "DROP"};
76
77 //---STATIC METHODS---
78
79 //---CONSTRUCTORS---
80
81 /**
82 * Constructs a new Queue with a maximum size limit on
83 * any individual queue. This should be used to stop
84 * conditions where the Queue cannot be guaranteed to
85 * be emptied as quick as it's filled.
86 *
87 * An algorithm will be used to remove data when new data
88 * arrives. There may be choices of algorithms later on.
89 *
90 * @param maxSize the upper limit for a queue
91 * @param removeAlgorithm the remove algorithm to use upon reaching the maxSize
92 */
93 public Queue(int maxSize, int removeAlgorithm) {
94 _maxSize = maxSize;
95 _removeAlgorithm = removeAlgorithm;
96 }
97
98 /**
99 * Constructs a Queue with no maximum size.
100 */
101 public Queue() {
102 _maxSize = -1;
103 }
104
105 //---PUBLIC METHODS---
106
107 /**
108 * This method adds a given object to every queue. It will notify
109 * any waiting consumers (on an empty queue) during this process.
110 *
111 * @param o An Object to be added to the queues.
112 */
113 public void add(Object o) {
114 for(int i=0; i < _lists.size(); i++) {
115 // skip over any gaps left in the list
116 if(_lists.get(i) != null) {
117 synchronized(((LinkedList) _lists.get(i))) {
118 // get size before adding to the Queue
119 int s = ((LinkedList) _lists.get(i)).size();
120 // check whether we need to remove an item from the current Queue
121 if(_maxSize!=-1 && s>=_maxSize && _removeAlgorithm!=DROP) {
122 // we need to remove an item
123 removeQueueItem((LinkedList) _lists.get(i));
124 }
125 // check if we should add (not if Queue full, and using DROP algorithm)
126 if(!(s>=_maxSize && _removeAlgorithm==DROP)) {
127 // add the next item, ensuring we lock
128 synchronized(this) {
129 // LinkedList.add() does the same thing, but this ensures behaviour
130 ((LinkedList) _lists.get(i)).addLast(o);
131 }
132 // if the queue was empty before the add it is possible
133 // that a consumer is waiting... so we notify them
134 //synchronized(((LinkedList) _lists.get(i))) {
135 ((LinkedList) _lists.get(i)).notifyAll();
136 //}
137 }
138 }
139 }
140 }
141 // we keep track of the total additions for the status() method
142 _count++;
143 }
144
145 /**
146 * This method returns an object from the front of a given queue.
147 * It will block until data exists in the queue if required.
148 *
149 * @param The queue to retrieve data from.
150 * @return The object from the front of the queue.
151 * @throws InvalidQueueException if the queue does not exist.
152 */
153 public Object get(int queue) throws InvalidQueueException {
154 // make sure queue exists
155 if (queue >= _lists.size() || _lists.get(queue) == null) {
156 throw new InvalidQueueException("Requested queue "+queue+" does not exist");
157 }
158 // block if the queue is empty
159 synchronized(((LinkedList) _lists.get(queue))) {
160 if (((LinkedList) _lists.get(queue)).size() == 0) {
161 try { ((LinkedList) _lists.get(queue)).wait(); } catch(Exception e) {}
162 }
163 }
164 // get an item, it should never be null due to the blocking above
165 Object o = null;
166 synchronized(this) {
167 try {
168 o = ((LinkedList) _lists.get(queue)).removeFirst();
169 }
170 catch (NoSuchElementException e) {
171 // This should never happen !
172 }
173 }
174 return o;
175 }
176
177 /**
178 * This method releases a get() method that's currently
179 * waiting on an empty queue. This was designed for
180 * shutdown() type methods that may have problems closing
181 * if the thread of control is waiting on a queue.
182 *
183 * @param queue the queue to release.
184 */
185 public void releaseQueue(int queue) {
186 synchronized(((LinkedList) _lists.get(queue))) {
187 ((LinkedList) _lists.get(queue)).notifyAll();
188 }
189 }
190
191 /**
192 * This method erases the contents of a given queue. This
193 * method should be used with care. It can only empty one
194 * internal queue, not all of them. This must be called
195 * multiple times to empty all queues.
196 *
197 * @param queue the queue to empty.
198 */
199 public void clearQueue(int queue) {
200 synchronized(this) {
201 ((LinkedList) _lists.get(queue)).clear();
202 }
203 }
204
205 /**
206 * This method returns an XML textual status of the queues.
207 * It is merely for observation, and would most likely be
208 * used by a larger "monitoring" component. Information
209 * returned includes the current size of each queue, and
210 * the total items passed through.
211 *
212 * @return A String message containing status information in XML format
213 */
214 public String xmlStatus() {
215 String status = "<queue ";
216 for(int i=0; i < _lists.size(); i++) {
217 // check for null entries
218 if(_lists.get(i) != null) {
219 status += "queue"+i+"=\""+((LinkedList) _lists.get(i)).size()+"\" ";
220 }
221 else {
222 status += "queue"+i+"=\"[deleted]\" ";
223 }
224 }
225 status += "total=\""+_count+"\"";
226 if(_maxSize != -1) {
227 status += " maxSize=\""+_maxSize+"\"";
228 }
229 status += "></queue>";
230 return status;
231 }
232
233 /**
234 * Returns the size of a given queue. A consumer can use
235 * this to see how big their queue is at any given time.
236 * they should use their queue number as the parameter.
237 *
238 * @param queue The queue number to query.
239 * @return the current size of the queue.
240 * @throws InvalidQueueException if the queue does not exist.
241 */
242 public int queueSize(int queue) throws InvalidQueueException {
243 if (queue >= _lists.size() || _lists.get(queue) == null) {
244 throw new InvalidQueueException("Requested queue "+queue+" does not exist");
245 }
246 return ((LinkedList) _lists.get(queue)).size();
247 }
248
249 /**
250 * Returns the total numer of elements to have passed
251 * through this queue. This is essentially a counter
252 * on the add method.
253 *
254 * @return the element-ometer.
255 */
256 public int elementCount() {
257 return _count;
258 }
259
260 /**
261 * This method assigns a queue to a consumer. The idea behind
262 * this is to ensure that only 1 consumer can be associated with
263 * a given queue, otherwise the whole "blocking" thing fails
264 * miserably. Queues are created upon requested.
265 *
266 * It is IMPORTANT that removeQueue() is used when the queue is
267 * no longer required.
268 *
269 * @return An integer to be passed to the get() method.
270 */
271 public synchronized int getQueue() {
272 int pos = -1;
273 for(int i=0; i < _lists.size(); i++) {
274 if(_lists.get(i) == null) {
275 // found a gap, re-use it
276 pos = i;
277 _lists.set(i, new LinkedList());
278 }
279 }
280 if(pos == -1) {
281 //we didn't find a gap, add at end
282 pos = _lists.size();
283 _lists.add(pos, new LinkedList());
284 }
285 return pos;
286 }
287
288 /**
289 * This method sets a entry to null in the list. This ensures
290 * that it will no longer be added to after it is no longer
291 * required be a consumer.
292 *
293 * @param queue The integer identifier for the queue, given by getQueue().
294 */
295 public synchronized void removeQueue(int queue) {
296 _lists.set(queue, null);
297 }
298
299 /**
300 * Start a monitor on our own Queue. This will log XML
301 * statistics about our Queue to a given Queue (could be
302 * the one being monitored).
303 *
304 * @param interval The long interval, in milliseconds, at which to take samples
305 * @param destQueue The queue to monitor to
306 * @param name A name to identify this Queue with
307 * @return whether we succeeded
308 */
309 public boolean startMonitor(long interval, Queue destQueue, String name) {
310 if(_queueMon == null) {
311 // start a monitor
312 _queueMon = new QueueMonitor(this, destQueue, interval, name);
313 _queueMon.start();
314 return true;
315 }
316 else {
317 // already have a monitor running
318 return false;
319 }
320 }
321
322 /**
323 * Start a monitor on our own Queue. This will log XML
324 * statistics about our Queue to this Queue.
325 *
326 * @param interval The long interval, in milliseconds, at which to take samples
327 * @param name A name to identify this Queue with
328 * @return whether we succeeded
329 */
330 public boolean startMonitor(long interval, String name) {
331 return startMonitor(interval, this, name);
332 }
333
334 /**
335 * Stop a monitor on our Queue if we have on running.
336 *
337 * @return whether we succeeded
338 */
339 public boolean stopMonitor() {
340 if(_queueMon != null) {
341 // stop a monitor
342 _queueMon.shutdown();
343 _queueMon = null;
344 return true;
345 }
346 else {
347 // no monitor running
348 return false;
349 }
350 }
351
352 /**
353 * Overrides the {@link java.lang.Object#toString() Object.toString()}
354 * method to provide clean logging (every class should have this).
355 *
356 * This uses the uk.org.iscream.cms.server.util.FormatName class
357 * to format the toString()
358 *
359 * @return the name of this class and its CVS revision.
360 */
361 public String toString() {
362 return FormatName.getName(
363 _name,
364 getClass().getName(),
365 REVISION);
366 }
367
368 //---PRIVATE METHODS---
369
370 /**
371 * This method removes an item from a Queue, using a method
372 * requested at construction.
373 *
374 * @param list The LinkedList from which to remove an item.
375 */
376 private void removeQueueItem(LinkedList list) {
377 // look at our algorithm
378 // remove a random item from the list
379 if(_removeAlgorithm==RANDOM) {
380 // new Random, with a good seed
381 Random rand = new Random(System.currentTimeMillis());
382 int i = rand.nextInt(_maxSize);
383 synchronized(this) {
384 list.remove(i);
385 }
386 }
387 // remove the first item from the list
388 else if(_removeAlgorithm==FIRST) {
389 synchronized(this) {
390 list.removeFirst();
391 }
392 }
393 // remove the last item from the list
394 else if(_removeAlgorithm==LAST) {
395 synchronized(this) {
396 list.removeLast();
397 }
398 }
399 }
400
401 //---ACCESSOR/MUTATOR METHODS---
402
403 //---ATTRIBUTES---
404
405 /**
406 * The LinkedLists of queues.
407 */
408 private LinkedList _lists = new LinkedList();
409
410 /**
411 * A counter so we know how many data items have been
412 * passed through, for statistical purposes.
413 */
414 private int _count = 0;
415
416 /**
417 * A reference to our QueueMonitor, if we have one.
418 */
419 private QueueMonitor _queueMon = null;
420
421 /**
422 * The maximum size of any Queue.
423 */
424 private int _maxSize = -1;
425
426 /**
427 * The remove algorithm to use upon a Queue reaching
428 * it's maximum size.
429 */
430 private int _removeAlgorithm = -1;
431
432 /**
433 * This is the friendly identifier of the
434 * component this class is running in.
435 * eg, a Filter may be called "filter1",
436 * If this class does not have an owning
437 * component, a name from the configuration
438 * can be placed here. This name could also
439 * be changed to null for utility classes.
440 */
441 private String _name = null;
442
443 //---STATIC ATTRIBUTES---
444
445 }