ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/cms/source/server/uk/org/iscream/cms/server/clientinterface/PacketSorter.java
Revision: 1.17
Committed: Fri Mar 16 16:11:31 2001 UTC (23 years, 2 months ago) by tdb
Branch: MAIN
CVS Tags: PROJECT_COMPLETION
Changes since 1.16: +32 -5 lines
Log Message:
Implemented Queue size limiting.

File Contents

# User Rev Content
1 tdb 1.1 //---PACKAGE DECLARATION---
2 tdb 1.16 package uk.org.iscream.clientinterface;
3 tdb 1.1
4     //---IMPORTS---
5 tdb 1.16 import uk.org.iscream.util.*;
6     import uk.org.iscream.componentmanager.*;
7     import uk.org.iscream.core.*;
8 tdb 1.1 import java.util.*;
9    
10     /**
11     * Receives data from the incoming CORBA servant, places
12     * it in a Queue, and then arranges distribution to the
13 tdb 1.4 * DataHandlers.
14     * Has extra functionality to send data to DataHandlers
15     * on a per-host basis - ie. the Client can request which
16     * hosts it would like to listen for.
17 tdb 1.1 *
18 tdb 1.2 * @author $Author: tdb1 $
19 tdb 1.17 * @version $Id: PacketSorter.java,v 1.16 2001/03/14 23:25:29 tdb1 Exp $
20 tdb 1.1 */
21     class PacketSorter extends Thread {
22    
23     //---FINAL ATTRIBUTES---
24    
25     /**
26     * The current CVS revision of this class
27     */
28 tdb 1.17 public final String REVISION = "$Revision: 1.16 $";
29 tdb 1.1
30     //---STATIC METHODS---
31    
32     //---CONSTRUCTORS---
33    
34     /**
35     * Creates a new PacketSorter.
36     */
37 tdb 1.15 public PacketSorter() {
38 tdb 1.14 // set the Thread name
39     setName("clientinterface.PacketSorter");
40    
41 tdb 1.17 ConfigurationProxy cp = ConfigurationProxy.getInstance();
42     String configName = "ClientInterface";
43    
44     // see if this Queue needs a size limit
45     try {
46     int queueSizeLimit = Integer.parseInt(cp.getProperty(configName, "Queue.SizeLimit"));
47     String queueRemoveAlgorithm = cp.getProperty(configName, "Queue.RemoveAlgorithm");
48     int algorithm = StringUtils.getStringPos(queueRemoveAlgorithm, Queue.algorithms);
49     if(algorithm != -1) {
50     _logger.write(toString(), Logger.DEBUG, "Starting Queue with size limit of "+queueSizeLimit+", using remove algorithm "+queueRemoveAlgorithm);
51     // we have valid values, so lets start it.
52     _queue = new Queue(queueSizeLimit, algorithm);
53     }
54     else {
55     _logger.write(toString(), Logger.WARNING, "Bad Queue Algorithm configuration, not known: "+queueRemoveAlgorithm);
56     // just don't activate a limit
57     _queue = new Queue();
58     }
59    
60     } catch (PropertyNotFoundException e) {
61     _logger.write(toString(), Logger.DEBUG, "Optional config not set: "+e);
62     // just don't activate a limit
63     _queue = new Queue();
64     } catch (NumberFormatException e) {
65     _logger.write(toString(), Logger.WARNING, "Bad Queue SizeLimit configuration: "+e);
66     // just don't activate a limit
67     _queue = new Queue();
68     }
69    
70 tdb 1.15 // startup a monitor on this queue
71     try {
72     // try to get the interval, if this fails, we won't start up the monitor
73 tdb 1.17 int queueMonitorInterval = Integer.parseInt(cp.getProperty(configName, "Queue.MonitorInterval"));
74 tdb 1.15 String queueName = _name + " PacketSorterQueue";
75     _queue.startMonitor(queueMonitorInterval*1000, queueName);
76     } catch (PropertyNotFoundException e) {
77     _logger.write(toString(), Logger.WARNING, "failed to find queue monitor config, disabling. " + e);
78     }
79    
80 tdb 1.11 _hostMap = new HashMap();
81     _allHostDataList = new LinkedList();
82     _allHostsList = new LinkedList();
83 tdb 1.1 _logger.write(toString(), Logger.SYSINIT, "created");
84     }
85    
86     //---PUBLIC METHODS---
87    
88 tdb 1.4 /**
89     * Method to start the PacketSorter running. This method will
90     * loop forever processing and sending data.
91     */
92 tdb 1.1 public void run() {
93 tdb 1.13 XMLPacketMaker xmlPacketMaker = new XMLPacketMaker();
94 tdb 1.1 int qID = _queue.getQueue();
95     while(true) {
96 tdb 1.4 // attempt to get some data from the Queue
97 tdb 1.1 String xml = "";
98     try {
99     xml = (String) _queue.get(qID);
100     }
101     catch(InvalidQueueException e) {
102     _logger.write(toString(), Logger.ERROR, "Queue failure: "+e);
103     }
104    
105 tdb 1.10 XMLPacket packet = null;
106    
107     try {
108 tdb 1.13 packet = xmlPacketMaker.createXMLPacket(xml);
109 tdb 1.10 } catch(InvalidXMLException e) {
110     _logger.write(toString(), Logger.ERROR, "Invalid XML: "+e);
111     // skip the rest of this loop iteration
112     continue;
113     }
114 tdb 1.1
115 tdb 1.8 String packetType = packet.getParam("packet.attributes.type");
116     // check if we need to send it regardless
117     if(packetType.equals("data") || packetType.equals("heartbeat")) {
118     String host = packet.getParam("packet.attributes.machine_name");
119    
120     // look in the hostMap to see if anyone wants this data
121 tdb 1.11 synchronized(this) {
122     if(_hostMap.containsKey(host)) {
123     LinkedList list = (LinkedList) _hostMap.get(host);
124     Iterator i = list.iterator();
125     // push the data to the listening Handler's queue
126     while(i.hasNext()) {
127     ((Queue) i.next()).add(xml);
128     }
129 tdb 1.8 }
130     }
131    
132     // any handler in this list wants all packets, so send
133     // it on to them regardless
134 tdb 1.11 synchronized(this) {
135     Iterator i = _allHostDataList.iterator();
136     while(i.hasNext()) {
137     ((Queue) i.next()).add(xml);
138     }
139     }
140 tdb 1.1 }
141 tdb 1.8 else {
142 tdb 1.10 // always send this packet to all hosts, because it's
143     // "extra" data, not host data
144 tdb 1.11 synchronized(this) {
145     Iterator i = _allHostsList.iterator();
146     while(i.hasNext()) {
147     ((Queue) i.next()).add(xml);
148     }
149     }
150 tdb 1.2 }
151 tdb 1.1 }
152     }
153    
154 tdb 1.4 /**
155     * Register a DataHandler in the system. This method
156     * actually takes a reference to a Queue, which should be
157     * a Queue that the DataHandler is making use of.
158     * It also takes a hostList, this being a semi-colon
159     * seperated list of hosts that the Client the DataHandler
160     * is serving has requested. If this list is simply an empty
161     * String, it is assumed the Client wants to listen to all
162     * host information.
163     *
164     * @param dhQueue a Queue being used by the DataHandler that is registering
165     * @param hostList a semi-colon seperated list of hosts
166     */
167 tdb 1.12 public void register(Queue dhQueue, String hostList) {
168 tdb 1.4 // check to see if we want all hosts
169 tdb 1.2 if(hostList.equals("")) {
170 tdb 1.12 synchronized(this) {
171     _allHostDataList.add(dhQueue);
172     }
173 tdb 1.2 _logger.write(toString(), Logger.SYSMSG, "registered DataHandler for all hosts");
174     }
175     else {
176 tdb 1.4 // go through the list of hosts
177 tdb 1.2 StringTokenizer st = new StringTokenizer(hostList, ";");
178     while(st.hasMoreTokens()) {
179     String host = st.nextToken();
180 tdb 1.12 synchronized(this) {
181     // see if we already have a list in the map for this host
182     if(_hostMap.containsKey(host)) {
183     // we do, so add to it
184     List list = (List) _hostMap.get(host);
185     list.add(dhQueue);
186     }
187     else {
188     // we don't, so create a list and put it in the map
189     LinkedList list = new LinkedList();
190     list.add(dhQueue);
191     _hostMap.put(host, list);
192     }
193 tdb 1.2 }
194 tdb 1.1 }
195 tdb 1.2 _logger.write(toString(), Logger.SYSMSG, "registered DataHandler for hosts: "+hostList);
196 tdb 1.1 }
197 tdb 1.10 // always add host to our complete host list
198 tdb 1.12 synchronized(this) {
199     _allHostsList.add(dhQueue);
200     }
201 tdb 1.1 }
202    
203 tdb 1.4 /**
204     * Deregister a DataHandler. The DataHandler should give a reference
205     * to the Queue it's using, and the *same* hostList it gave when it
206     * register. It is imperative that the hostList is the same, otherwise
207     * there will be all sorts of problems with lists getting out of sync.
208     *
209     * NB: Possible future addition would be recording of hostList's.
210     *
211     * @param dhQueue a Queue being used by the DataHandler that is deregistering
212     * @param hostList a semi-colon seperated list of hosts
213     */
214 tdb 1.12 public void deregister(Queue dhQueue, String hostList) {
215 tdb 1.4 // go through the list of hosts
216 tdb 1.2 if(hostList.equals("")) {
217 tdb 1.12 synchronized(this) {
218     _allHostDataList.remove(dhQueue);
219     }
220 tdb 1.2 _logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for all hosts");
221     }
222     else {
223     StringTokenizer st = new StringTokenizer(hostList, ";");
224     while(st.hasMoreTokens()) {
225     String host = st.nextToken();
226 tdb 1.12 synchronized(this) {
227     // this should in reality always be true, but best check
228     if(_hostMap.containsKey(host)) {
229     // get the list and remove the host in question
230     LinkedList list = (LinkedList) _hostMap.get(host);
231     list.remove(dhQueue);
232     // if the list is now empty, we might as well remove it
233     if(list.size()==0) {
234     _hostMap.remove(host);
235     }
236 tdb 1.2 }
237 tdb 1.1 }
238     }
239 tdb 1.2 _logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for hosts: "+hostList);
240 tdb 1.1 }
241 tdb 1.10 // always remove host from our complete host list
242 tdb 1.12 synchronized(this) {
243     _allHostsList.remove(dhQueue);
244     }
245 tdb 1.1 }
246    
247     /**
248     * Overrides the {@link java.lang.Object#toString() Object.toString()}
249     * method to provide clean logging (every class should have this).
250     *
251 tdb 1.16 * This uses the uk.org.iscream.util.NameFormat class
252 tdb 1.1 * to format the toString()
253     *
254     * @return the name of this class and its CVS revision
255     */
256     public String toString() {
257     return FormatName.getName(
258     _name,
259     getClass().getName(),
260     REVISION);
261     }
262    
263     //---PRIVATE METHODS---
264    
265     //---ACCESSOR/MUTATOR METHODS---
266 tdb 1.4
267     /**
268     * Accessor to return a reference to the Queue object. This
269     * is needed so the ClientInterfaceServant can get add data
270     * easily.
271     *
272     * @return a reference to our Queue object.
273     */
274 tdb 1.1 public Queue getQueue() {
275     return _queue;
276     }
277    
278     //---ATTRIBUTES---
279    
280     /**
281     * This is the friendly identifier of the
282     * component this class is running in.
283     * eg, a Filter may be called "filter1",
284     * If this class does not have an owning
285     * component, a name from the configuration
286     * can be placed here. This name could also
287     * be changed to null for utility classes.
288     */
289     private String _name = ClientInterfaceMain.NAME;
290    
291     /**
292     * This holds a reference to the
293     * system logger that is being used.
294     */
295     private Logger _logger = ReferenceManager.getInstance().getLogger();
296    
297 tdb 1.4 /**
298     * A reference to the Queue we're using.
299     */
300 tdb 1.1 private Queue _queue;
301    
302 tdb 1.4 /**
303     * A HashMap to store lists of Queue's (in the DataHandlers)
304     * in a way that can be easily accessed when data comes in.
305     */
306 tdb 1.11 private HashMap _hostMap;
307 tdb 1.4
308     /**
309     * A list specifically for a Queue's associated with DataHandlers
310     * that want all host information.
311 tdb 1.10 */
312 tdb 1.11 private LinkedList _allHostDataList;
313 tdb 1.10
314     /**
315     * A list of all hosts.
316 tdb 1.4 */
317 tdb 1.11 private LinkedList _allHostsList;
318 tdb 1.1
319     //---STATIC ATTRIBUTES---
320    
321     }