--- experimental/server/Queue/Queue.java 2000/12/28 03:49:15 1.2 +++ experimental/server/Queue/Queue.java 2001/02/12 02:27:56 1.7 @@ -1,4 +1,5 @@ //---PACKAGE DECLARATION--- +//package uk.ac.ukc.iscream.util; //---IMPORTS--- import java.util.LinkedList; @@ -12,47 +13,21 @@ import java.util.NoSuchElementException; * actually contains some elements. * * @author $Author: tdb $ - * @version $Id: Queue.java,v 1.2 2000/12/28 03:49:15 tdb Exp $ + * @version $Id: Queue.java,v 1.7 2001/02/12 02:27:56 tdb Exp $ */ -class Queue { +public class Queue { //---FINAL ATTRIBUTES--- /** * The current CVS revision of this class */ - public static final String REVISION = "$Revision: 1.2 $"; + public static final String REVISION = "$Revision: 1.7 $"; //---STATIC METHODS--- -//---CONSTRUCTORS--- - - /** - * This constructor sets up a given number of queues, all of which - * will be populated with data using the add() method. It is very - * important that the correct number is given, otherwise redundant - * queues will build up with large amounts of data in them. - * - * @param consumers The number of queues to be created. - */ - public Queue(int consumers) { - // constuct and initialise the queues - _lists = new LinkedList[consumers]; - for(int i=0; i < _lists.length; i++) { - _lists[i] = new LinkedList(); - } - } - - /** - * This constructor is intended for an environment with a single - * consumer. This should be used in conjunction with the no-args - * get() method. - */ - public Queue() { - // call the proper constructor - this(1); - } - +//---CONSTRUCTORS--- + //---PUBLIC METHODS--- /** @@ -62,18 +37,21 @@ class Queue { * @param o An Object to be added to the queues. */ public void add(Object o) { - for(int i=0; i < _lists.length; i++) { - int s = _lists[i].size(); - synchronized(this) { - // add() does the same thing, but this ensures behaviour - _lists[i].addLast(o); - } - // if the queue was empty before the add it is possible - // that a consumer is waiting... so we notify them - if (s == 0) { - synchronized(_lists[i]) { - _lists[i].notifyAll(); + for(int i=0; i < _lists.size(); i++) { + // skip over any gaps left in the list + if(_lists.get(i) != null) { + int s = ((LinkedList) _lists.get(i)).size(); + synchronized(this) { + // add() does the same thing, but this ensures behaviour + ((LinkedList) _lists.get(i)).addLast(o); } + // if the queue was empty before the add it is possible + // that a consumer is waiting... so we notify them + if (s == 0) { + synchronized(((LinkedList) _lists.get(i))) { + ((LinkedList) _lists.get(i)).notifyAll(); + } + } } } // we keep track of the total additions for the status() method @@ -84,20 +62,26 @@ class Queue { * This method returns an object from the front of a given queue. * It will block until data exists in the queue if required. * + * @param The queue to retrieve data from. * @return The object from the front of the queue. + * @throws InvalidQueueException if the queue does not exist. */ - public Object get(int queue) { + public Object get(int queue) throws InvalidQueueException { + // make sure queue exists + if (queue >= _lists.size() || _lists.get(queue) == null) { + throw new InvalidQueueException("Requested queue "+queue+" does not exist"); + } // block if the queue is empty - if (_lists[queue].size() == 0) { - synchronized(_lists[queue]) { - try { _lists[queue].wait(); } catch(Exception e) {} + if (((LinkedList) _lists.get(queue)).size() == 0) { + synchronized(((LinkedList) _lists.get(queue))) { + try { ((LinkedList) _lists.get(queue)).wait(); } catch(Exception e) {} } } // get an item, it should never be null due to the blocking above Object o = null; synchronized(this) { try { - o = _lists[queue].removeFirst(); + o = ((LinkedList) _lists.get(queue)).removeFirst(); } catch (NoSuchElementException e) { // This should never happen ! @@ -107,68 +91,190 @@ class Queue { } /** - * This method is intended for an environment where there is - * only a single consumer. It simply gets the item from the - * first (and presumably only) queue. + * This method releases a get() method that's currently + * waiting on an empty queue. This was designed for + * shutdown() type methods that may have problems closing + * if the thread of control is waiting on a queue. * - * @return The object from the front of the queue. + * @param queue the queue to release. */ - public Object get() { - return get(0); + public void releaseQueue(int queue) { + synchronized(((LinkedList) _lists.get(queue))) { + ((LinkedList) _lists.get(queue)).notifyAll(); + } } + + /** + * This method erases the contents of a given queue. This + * method should be used with care. It can only empty one + * internal queue, not all of them. This must be called + * multiple times to empty all queues. + * + * @param queue the queue to empty. + */ + public void clearQueue(int queue) { + synchronized(this) { + ((LinkedList) _lists.get(queue)).clear(); + } + } /** - * This method returns a textual status of the queues. It - * is merely for observation, and would most likely be used - * by a larger "monitoring" component. Information returned - * includes the current size of each queue, and the total - * items passed through. + * This method returns an XML textual status of the queues. + * It is merely for observation, and would most likely be + * used by a larger "monitoring" component. Information + * returned includes the current size of each queue, and + * the total items passed through. * - * @return A String message containing status information. + * @return A String message containing status information in XML format */ - public String status() { - String status = ""; - for(int i=0; i < _lists.length; i++) { - status += "Queue number "+i+" contains "+_lists[i].size()+" elements"; - status += "\n"; + public String xmlStatus() { + String status = ""; return status; } /** + * Returns the size of a given queue. A consumer can use + * this to see how big their queue is at any given time. + * they should use their queue number as the parameter. + * + * @param queue The queue number to query. + * @return the current size of the queue. + * @throws InvalidQueueException if the queue does not exist. + */ + public int queueSize(int queue) throws InvalidQueueException { + if (queue >= _lists.size() || _lists.get(queue) == null) { + throw new InvalidQueueException("Requested queue "+queue+" does not exist"); + } + return ((LinkedList) _lists.get(queue)).size(); + } + + /** + * Returns the total numer of elements to have passed + * through this queue (ie. a counter on the add method). + * + * @return the element-ometer. + */ + public int elementCount() { + return _count; + } + + /** * This method assigns a queue to a consumer. The idea behind * this is to ensure that only 1 consumer can be associated with * a given queue, otherwise the whole "blocking" thing fails - * miserably. + * miserably. Queues are created upon requested. * + * It is IMPORTANT that removeQueue() is used when the queue is + * no longer required. + * * @return An integer to be passed to the get() method. - * @throws NoQueueException if there are no un-assigned queue's. */ - public int getQueue() throws NoQueueException { - if(_index < _lists.length) { - return _index++; + public int getQueue() { + int pos = -1; + for(int i=0; i < _lists.size(); i++) { + if(_lists.get(i) == null) { + // found a gap, re-use it + pos = i; + _lists.set(i, new LinkedList()); + } } + if(pos == -1) { + //we didn't find a gap, add at end + pos = _lists.size(); + _lists.add(pos, new LinkedList()); + } + return pos; + } + + /** + * This method sets a entry to null in the list. This ensures + * that it will no longer be added to after it is no longer + * required be a consumer. + * + * @param queue The integer identifier for the queue, given by getQueue(). + */ + public void removeQueue(int queue) { + _lists.set(queue, null); + } + + /** + * Start a monitor on our own Queue. This will log XML + * statistics about our Queue to a given Queue (could be + * the one being monitored). + * + * @param interval The long interval, in milliseconds, at which to take samples + * @param destQueue The queue to monitor to + * @param name A name to identify this Queue with + * @return whether we succeeded + */ + public boolean startMonitor(long interval, Queue destQueue, String name) { + if(_queueMon == null) { + // start a monitor + _queueMon = new QueueMonitor(this, destQueue, interval, name); + _queueMon.start(); + return true; + } else { - throw new NoQueueException("Too many consumers, there are already "+_lists.length+" running"); + // already have a monitor running + return false; } } - + /** + * Start a monitor on our own Queue. This will log XML + * statistics about our Queue to this Queue. + * + * @param interval The long interval, in milliseconds, at which to take samples + * @param name A name to identify this Queue with + * @return whether we succeeded + */ + public boolean startMonitor(long interval, String name) { + return startMonitor(interval, this, name); + } + + /** + * Stop a monitor on our Queue if we have on running. + * + * @return whether we succeeded + */ + public boolean stopMonitor() { + if(_queueMon != null) { + // stop a monitor + _queueMon.shutdown(); + _queueMon = null; + return true; + } + else { + // no monitor running + return false; + } + } + + /** * Overrides the {@link java.lang.Object#toString() Object.toString()} * method to provide clean logging (every class should have this). * * This uses the uk.ac.ukc.iscream.util.FormatName class * to format the toString() * - * @return the name of this class and its CVS revision + * @return the name of this class and its CVS revision. */ - /*public String toString() { + public String toString() { return FormatName.getName( _name, getClass().getName(), REVISION); - }*/ + } //---PRIVATE METHODS--- @@ -177,10 +283,9 @@ class Queue { //---ATTRIBUTES--- /** - * The array of lists, which the underlying queue data - * is stored in. + * The LinkedLists of queues. */ - private LinkedList[] _lists; + private LinkedList _lists = new LinkedList(); /** * A counter so we know how many data items have been @@ -189,11 +294,10 @@ class Queue { private int _count = 0; /** - * An index of the next available queue. Used by the - * getQueue() method. + * A reference to our QueueMonitor, if we have one. */ - private int _index = 0; - + private QueueMonitor _queueMon = null; + /** * This is the friendly identifier of the * component this class is running in. @@ -203,13 +307,7 @@ class Queue { * can be placed here. This name could also * be changed to null for utility classes. */ - //private String _name = ; - - /** - * This holds a reference to the - * system logger that is being used. - */ - //private Logger _logger = ReferenceManager.getInstance().getLogger(); + private String _name = null; //---STATIC ATTRIBUTES---