--- experimental/server/Queue/Queue.java 2000/12/28 00:58:43 1.1 +++ experimental/server/Queue/Queue.java 2001/03/01 03:04:28 1.9 @@ -1,46 +1,419 @@ +//---PACKAGE DECLARATION--- +//package uk.ac.ukc.iscream.util; + +//---IMPORTS--- import java.util.LinkedList; import java.util.NoSuchElementException; +import java.util.Random; +//import uk.ac.ukc.iscream.util.*; -class Queue { +/** + * A Queue class designed to operate in a multi-threaded environment, with + * added support for multiple "consumer" threads. Also offers blocking on + * the get() methods, which ensures the consumer waits until the queue + * actually contains some elements. + * + * @author $Author: tdb $ + * @version $Id: Queue.java,v 1.9 2001/03/01 03:04:28 tdb Exp $ + */ +public class Queue { + +//---FINAL ATTRIBUTES--- + + /** + * The current CVS revision of this class + */ + public static final String REVISION = "$Revision: 1.9 $"; + /** + * Pass to constructor to remove a RANDOM item from + * the Queue upon reaching the maximum limit. + */ + public static final int RANDOM = 0; + + /** + * Pass to constructor to remove the FIRST item from + * the Queue upon reaching the maximum limit. + */ + public static final int FIRST = 1; + + /** + * Pass to constructor to remove the LAST item from + * the Queue upon reaching the maximum limit. + */ + public static final int LAST = 2; + + /** + * Pass to constructor to drop the new item upon reaching + * the maximum Queue limit. + */ + public static final int DROP = 3; + +//---STATIC METHODS--- + +//---CONSTRUCTORS--- + + /** + * Constructs a new Queue with a maximum size limit on + * any individual queue. This should be used to stop + * conditions where the Queue cannot be guaranteed to + * be emptied as quick as it's filled. + * + * An algorithm will be used to remove data when new data + * arrives. There may be choices of algorithms later on. + * + * @param maxSize the upper limit for a queue + * @param removeAlgorithm the remove algorithm to use upon reaching the maxSize + */ + public Queue(int maxSize, int removeAlgorithm) { + _maxSize = maxSize; + _removeAlgorithm = removeAlgorithm; + } + + /** + * Constructs a Queue with no maximum size. + */ public Queue() { - // Possible use this method instead ? - //_list = Collections.synchronizedList(new LinkedList(...)); - _list = new LinkedList(); + _maxSize = -1; } + +//---PUBLIC METHODS--- - public synchronized void add(Object o) { - int s = _list.size(); - // add() does the same thing, but this ensures behaviour - _list.addLast(o); - if (s == 0) { - notifyAll(); + /** + * This method adds a given object to every queue. It will notify + * any waiting consumers (on an empty queue) during this process. + * + * @param o An Object to be added to the queues. + */ + public void add(Object o) { + for(int i=0; i < _lists.size(); i++) { + // skip over any gaps left in the list + if(_lists.get(i) != null) { + // get size before adding to the Queue + int s = ((LinkedList) _lists.get(i)).size(); + // check whether we need to remove an item from the current Queue + if(_maxSize!=-1 && s==_maxSize && _removeAlgorithm!=DROP) { + // we need to remove an item + removeQueueItem((LinkedList) _lists.get(i)); + } + // check if we should add (not if Queue full, and using DROP algorithm) + if(!(s==_maxSize && _removeAlgorithm==DROP)) { + // add the next item, ensuring we lock + synchronized(this) { + // LinkedList.add() does the same thing, but this ensures behaviour + ((LinkedList) _lists.get(i)).addLast(o); + } + } + // if the queue was empty before the add it is possible + // that a consumer is waiting... so we notify them + if (s == 0) { + synchronized(((LinkedList) _lists.get(i))) { + ((LinkedList) _lists.get(i)).notifyAll(); + } + } + } } + // we keep track of the total additions for the status() method _count++; } - public synchronized Object get() { - if (_list.size() == 0) { - try { wait(); } catch(Exception e) {} + /** + * This method returns an object from the front of a given queue. + * It will block until data exists in the queue if required. + * + * @param The queue to retrieve data from. + * @return The object from the front of the queue. + * @throws InvalidQueueException if the queue does not exist. + */ + public Object get(int queue) throws InvalidQueueException { + // make sure queue exists + if (queue >= _lists.size() || _lists.get(queue) == null) { + throw new InvalidQueueException("Requested queue "+queue+" does not exist"); } + // block if the queue is empty + if (((LinkedList) _lists.get(queue)).size() == 0) { + synchronized(((LinkedList) _lists.get(queue))) { + try { ((LinkedList) _lists.get(queue)).wait(); } catch(Exception e) {} + } + } + // get an item, it should never be null due to the blocking above Object o = null; - try { - o = _list.removeFirst(); + synchronized(this) { + try { + o = ((LinkedList) _lists.get(queue)).removeFirst(); + } + catch (NoSuchElementException e) { + // This should never happen ! + } } - catch (NoSuchElementException e) { - // no element... null already... so just leave - } return o; } - public String status() { - String status = ""; - status += "Current queue size = "+_list.size(); - status += "\n"; - status += "Queue-ometer = "+_count; + /** + * This method releases a get() method that's currently + * waiting on an empty queue. This was designed for + * shutdown() type methods that may have problems closing + * if the thread of control is waiting on a queue. + * + * @param queue the queue to release. + */ + public void releaseQueue(int queue) { + synchronized(((LinkedList) _lists.get(queue))) { + ((LinkedList) _lists.get(queue)).notifyAll(); + } + } + + /** + * This method erases the contents of a given queue. This + * method should be used with care. It can only empty one + * internal queue, not all of them. This must be called + * multiple times to empty all queues. + * + * @param queue the queue to empty. + */ + public void clearQueue(int queue) { + synchronized(this) { + ((LinkedList) _lists.get(queue)).clear(); + } + } + + /** + * This method returns an XML textual status of the queues. + * It is merely for observation, and would most likely be + * used by a larger "monitoring" component. Information + * returned includes the current size of each queue, and + * the total items passed through. + * + * @return A String message containing status information in XML format + */ + public String xmlStatus() { + String status = "= _lists.size() || _lists.get(queue) == null) { + throw new InvalidQueueException("Requested queue "+queue+" does not exist"); + } + return ((LinkedList) _lists.get(queue)).size(); + } + + /** + * Returns the total numer of elements to have passed + * through this queue (ie. a counter on the add method). + * + * @return the element-ometer. + */ + public int elementCount() { + return _count; + } + + /** + * This method assigns a queue to a consumer. The idea behind + * this is to ensure that only 1 consumer can be associated with + * a given queue, otherwise the whole "blocking" thing fails + * miserably. Queues are created upon requested. + * + * It is IMPORTANT that removeQueue() is used when the queue is + * no longer required. + * + * @return An integer to be passed to the get() method. + */ + public int getQueue() { + int pos = -1; + for(int i=0; i < _lists.size(); i++) { + if(_lists.get(i) == null) { + // found a gap, re-use it + pos = i; + _lists.set(i, new LinkedList()); + } + } + if(pos == -1) { + //we didn't find a gap, add at end + pos = _lists.size(); + _lists.add(pos, new LinkedList()); + } + return pos; + } + + /** + * This method sets a entry to null in the list. This ensures + * that it will no longer be added to after it is no longer + * required be a consumer. + * + * @param queue The integer identifier for the queue, given by getQueue(). + */ + public void removeQueue(int queue) { + _lists.set(queue, null); + } + + /** + * Start a monitor on our own Queue. This will log XML + * statistics about our Queue to a given Queue (could be + * the one being monitored). + * + * @param interval The long interval, in milliseconds, at which to take samples + * @param destQueue The queue to monitor to + * @param name A name to identify this Queue with + * @return whether we succeeded + */ + public boolean startMonitor(long interval, Queue destQueue, String name) { + if(_queueMon == null) { + // start a monitor + _queueMon = new QueueMonitor(this, destQueue, interval, name); + _queueMon.start(); + return true; + } + else { + // already have a monitor running + return false; + } + } + + /** + * Start a monitor on our own Queue. This will log XML + * statistics about our Queue to this Queue. + * + * @param interval The long interval, in milliseconds, at which to take samples + * @param name A name to identify this Queue with + * @return whether we succeeded + */ + public boolean startMonitor(long interval, String name) { + return startMonitor(interval, this, name); + } + + /** + * Stop a monitor on our Queue if we have on running. + * + * @return whether we succeeded + */ + public boolean stopMonitor() { + if(_queueMon != null) { + // stop a monitor + _queueMon.shutdown(); + _queueMon = null; + return true; + } + else { + // no monitor running + return false; + } + } + + /** + * Overrides the {@link java.lang.Object#toString() Object.toString()} + * method to provide clean logging (every class should have this). + * + * This uses the uk.ac.ukc.iscream.util.FormatName class + * to format the toString() + * + * @return the name of this class and its CVS revision. + */ + public String toString() { + return FormatName.getName( + _name, + getClass().getName(), + REVISION); + } + +//---PRIVATE METHODS--- + + /** + * This method removes an item from a Queue, using a method + * requested at construction. + * + * @param list The LinkedList from which to remove an item. + */ + private void removeQueueItem(LinkedList list) { + // look at our algorithm + // remove a random item from the list + if(_removeAlgorithm==RANDOM) { + // new Random, with a good seed + Random rand = new Random(System.currentTimeMillis()); + int i = rand.nextInt(_maxSize); + synchronized(this) { + list.remove(i); + } + } + // remove the first item from the list + else if(_removeAlgorithm==FIRST) { + synchronized(this) { + list.removeFirst(); + } + } + // remove the last item from the list + else if(_removeAlgorithm==LAST) { + synchronized(this) { + list.removeLast(); + } + } + } + +//---ACCESSOR/MUTATOR METHODS--- + +//---ATTRIBUTES--- + + /** + * The LinkedLists of queues. + */ + private LinkedList _lists = new LinkedList(); + + /** + * A counter so we know how many data items have been + * passed through, for statistical purposes. + */ + private int _count = 0; + + /** + * A reference to our QueueMonitor, if we have one. + */ + private QueueMonitor _queueMon = null; + + /** + * The maximum size of any Queue. + */ + private int _maxSize = -1; + + /** + * The remove algorithm to use upon a Queue reaching + * it's maximum size. + */ + private int _removeAlgorithm = -1; + + /** + * This is the friendly identifier of the + * component this class is running in. + * eg, a Filter may be called "filter1", + * If this class does not have an owning + * component, a name from the configuration + * can be placed here. This name could also + * be changed to null for utility classes. + */ + private String _name = null; + +//---STATIC ATTRIBUTES--- + +}