ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/experimental/server/Queue/Queue.java
Revision: 1.2
Committed: Thu Dec 28 03:49:15 2000 UTC (23 years, 4 months ago) by tdb
Branch: MAIN
Changes since 1.1: +193 -23 lines
Log Message:
Added support for multiple consumers.

File Contents

# User Rev Content
1 tdb 1.2 //---PACKAGE DECLARATION---
2    
3     //---IMPORTS---
4 tdb 1.1 import java.util.LinkedList;
5     import java.util.NoSuchElementException;
6 tdb 1.2 //import uk.ac.ukc.iscream.util.*;
7 tdb 1.1
8 tdb 1.2 /**
9     * A Queue class designed to operate in a multi-threaded environment, with
10     * added support for multiple "consumer" threads. Also offers blocking on
11     * the get() methods, which ensures the consumer waits until the queue
12     * actually contains some elements.
13     *
14     * @author $Author$
15     * @version $Id$
16     */
17 tdb 1.1 class Queue {
18 tdb 1.2
19     //---FINAL ATTRIBUTES---
20    
21     /**
22     * The current CVS revision of this class
23     */
24     public static final String REVISION = "$Revision: 1.1 $";
25    
26     //---STATIC METHODS---
27    
28     //---CONSTRUCTORS---
29    
30     /**
31     * This constructor sets up a given number of queues, all of which
32     * will be populated with data using the add() method. It is very
33     * important that the correct number is given, otherwise redundant
34     * queues will build up with large amounts of data in them.
35     *
36     * @param consumers The number of queues to be created.
37     */
38     public Queue(int consumers) {
39     // constuct and initialise the queues
40     _lists = new LinkedList[consumers];
41     for(int i=0; i < _lists.length; i++) {
42     _lists[i] = new LinkedList();
43     }
44     }
45 tdb 1.1
46 tdb 1.2 /**
47     * This constructor is intended for an environment with a single
48     * consumer. This should be used in conjunction with the no-args
49     * get() method.
50     */
51 tdb 1.1 public Queue() {
52 tdb 1.2 // call the proper constructor
53     this(1);
54 tdb 1.1 }
55    
56 tdb 1.2 //---PUBLIC METHODS---
57    
58     /**
59     * This method adds a given object to every queue. It will notify
60     * any waiting consumers (on an empty queue) during this process.
61     *
62     * @param o An Object to be added to the queues.
63     */
64     public void add(Object o) {
65     for(int i=0; i < _lists.length; i++) {
66     int s = _lists[i].size();
67     synchronized(this) {
68     // add() does the same thing, but this ensures behaviour
69     _lists[i].addLast(o);
70     }
71     // if the queue was empty before the add it is possible
72     // that a consumer is waiting... so we notify them
73     if (s == 0) {
74     synchronized(_lists[i]) {
75     _lists[i].notifyAll();
76     }
77     }
78 tdb 1.1 }
79 tdb 1.2 // we keep track of the total additions for the status() method
80 tdb 1.1 _count++;
81     }
82    
83 tdb 1.2 /**
84     * This method returns an object from the front of a given queue.
85     * It will block until data exists in the queue if required.
86     *
87     * @return The object from the front of the queue.
88     */
89     public Object get(int queue) {
90     // block if the queue is empty
91     if (_lists[queue].size() == 0) {
92     synchronized(_lists[queue]) {
93     try { _lists[queue].wait(); } catch(Exception e) {}
94     }
95 tdb 1.1 }
96 tdb 1.2 // get an item, it should never be null due to the blocking above
97 tdb 1.1 Object o = null;
98 tdb 1.2 synchronized(this) {
99     try {
100     o = _lists[queue].removeFirst();
101     }
102     catch (NoSuchElementException e) {
103     // This should never happen !
104     }
105 tdb 1.1 }
106     return o;
107     }
108    
109 tdb 1.2 /**
110     * This method is intended for an environment where there is
111     * only a single consumer. It simply gets the item from the
112     * first (and presumably only) queue.
113     *
114     * @return The object from the front of the queue.
115     */
116     public Object get() {
117     return get(0);
118     }
119    
120     /**
121     * This method returns a textual status of the queues. It
122     * is merely for observation, and would most likely be used
123     * by a larger "monitoring" component. Information returned
124     * includes the current size of each queue, and the total
125     * items passed through.
126     *
127     * @return A String message containing status information.
128     */
129 tdb 1.1 public String status() {
130     String status = "";
131 tdb 1.2 for(int i=0; i < _lists.length; i++) {
132     status += "Queue number "+i+" contains "+_lists[i].size()+" elements";
133     status += "\n";
134     }
135     status += "A total of "+_count+" elements have been added to the queues";
136 tdb 1.1 return status;
137     }
138    
139 tdb 1.2 /**
140     * This method assigns a queue to a consumer. The idea behind
141     * this is to ensure that only 1 consumer can be associated with
142     * a given queue, otherwise the whole "blocking" thing fails
143     * miserably.
144     *
145     * @return An integer to be passed to the get() method.
146     * @throws NoQueueException if there are no un-assigned queue's.
147     */
148     public int getQueue() throws NoQueueException {
149     if(_index < _lists.length) {
150     return _index++;
151     }
152     else {
153     throw new NoQueueException("Too many consumers, there are already "+_lists.length+" running");
154     }
155     }
156    
157     /**
158     * Overrides the {@link java.lang.Object#toString() Object.toString()}
159     * method to provide clean logging (every class should have this).
160     *
161     * This uses the uk.ac.ukc.iscream.util.FormatName class
162     * to format the toString()
163     *
164     * @return the name of this class and its CVS revision
165     */
166     /*public String toString() {
167     return FormatName.getName(
168     _name,
169     getClass().getName(),
170     REVISION);
171     }*/
172    
173     //---PRIVATE METHODS---
174    
175     //---ACCESSOR/MUTATOR METHODS---
176    
177     //---ATTRIBUTES---
178    
179     /**
180     * The array of lists, which the underlying queue data
181     * is stored in.
182     */
183     private LinkedList[] _lists;
184    
185     /**
186     * A counter so we know how many data items have been
187     * passed through, for statistical purposes.
188     */
189     private int _count = 0;
190    
191     /**
192     * An index of the next available queue. Used by the
193     * getQueue() method.
194     */
195     private int _index = 0;
196    
197     /**
198     * This is the friendly identifier of the
199     * component this class is running in.
200     * eg, a Filter may be called "filter1",
201     * If this class does not have an owning
202     * component, a name from the configuration
203     * can be placed here. This name could also
204     * be changed to null for utility classes.
205     */
206     //private String _name = <!THIS SHOULD CALL A STATIC NAME IN THE COMPONENT CLASS FOR THIS OBJECT!>;
207    
208     /**
209     * This holds a reference to the
210     * system logger that is being used.
211     */
212     //private Logger _logger = ReferenceManager.getInstance().getLogger();
213    
214     //---STATIC ATTRIBUTES---
215    
216     }