ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/cms/source/server/uk/org/iscream/cms/server/clientinterface/PacketSorter.java
Revision: 1.15
Committed: Tue Mar 13 18:37:08 2001 UTC (23 years, 2 months ago) by tdb
Branch: MAIN
Changes since 1.14: +14 -8 lines
Log Message:
Now makes use of the ConfigurationProxy.

File Contents

# User Rev Content
1 tdb 1.1 //---PACKAGE DECLARATION---
2     package uk.ac.ukc.iscream.clientinterface;
3    
4     //---IMPORTS---
5     import uk.ac.ukc.iscream.util.*;
6     import uk.ac.ukc.iscream.componentmanager.*;
7     import uk.ac.ukc.iscream.core.*;
8     import java.util.*;
9    
10     /**
11     * Receives data from the incoming CORBA servant, places
12     * it in a Queue, and then arranges distribution to the
13 tdb 1.4 * DataHandlers.
14     * Has extra functionality to send data to DataHandlers
15     * on a per-host basis - ie. the Client can request which
16     * hosts it would like to listen for.
17 tdb 1.1 *
18 tdb 1.2 * @author $Author: tdb1 $
19 tdb 1.15 * @version $Id: PacketSorter.java,v 1.14 2001/03/13 02:19:44 tdb1 Exp $
20 tdb 1.1 */
21     class PacketSorter extends Thread {
22    
23     //---FINAL ATTRIBUTES---
24    
25     /**
26     * The current CVS revision of this class
27     */
28 tdb 1.15 public final String REVISION = "$Revision: 1.14 $";
29 tdb 1.1
30     //---STATIC METHODS---
31    
32     //---CONSTRUCTORS---
33    
34     /**
35     * Creates a new PacketSorter.
36     */
37 tdb 1.15 public PacketSorter() {
38 tdb 1.14 // set the Thread name
39     setName("clientinterface.PacketSorter");
40    
41 tdb 1.1 _queue = new Queue();
42 tdb 1.15 // startup a monitor on this queue
43     try {
44     // try to get the interval, if this fails, we won't start up the monitor
45     ConfigurationProxy cp = ConfigurationProxy.getInstance();
46     int queueMonitorInterval = Integer.parseInt(cp.getProperty("ClientInterface", "Queue.MonitorInterval"));
47     String queueName = _name + " PacketSorterQueue";
48     _queue.startMonitor(queueMonitorInterval*1000, queueName);
49     } catch (PropertyNotFoundException e) {
50     _logger.write(toString(), Logger.WARNING, "failed to find queue monitor config, disabling. " + e);
51     }
52    
53 tdb 1.11 _hostMap = new HashMap();
54     _allHostDataList = new LinkedList();
55     _allHostsList = new LinkedList();
56 tdb 1.1 _logger.write(toString(), Logger.SYSINIT, "created");
57     }
58    
59     //---PUBLIC METHODS---
60    
61 tdb 1.4 /**
62     * Method to start the PacketSorter running. This method will
63     * loop forever processing and sending data.
64     */
65 tdb 1.1 public void run() {
66 tdb 1.13 XMLPacketMaker xmlPacketMaker = new XMLPacketMaker();
67 tdb 1.1 int qID = _queue.getQueue();
68     while(true) {
69 tdb 1.4 // attempt to get some data from the Queue
70 tdb 1.1 String xml = "";
71     try {
72     xml = (String) _queue.get(qID);
73     }
74     catch(InvalidQueueException e) {
75     _logger.write(toString(), Logger.ERROR, "Queue failure: "+e);
76     }
77    
78 tdb 1.10 XMLPacket packet = null;
79    
80     try {
81 tdb 1.13 packet = xmlPacketMaker.createXMLPacket(xml);
82 tdb 1.10 } catch(InvalidXMLException e) {
83     _logger.write(toString(), Logger.ERROR, "Invalid XML: "+e);
84     // skip the rest of this loop iteration
85     continue;
86     }
87 tdb 1.1
88 tdb 1.8 String packetType = packet.getParam("packet.attributes.type");
89     // check if we need to send it regardless
90     if(packetType.equals("data") || packetType.equals("heartbeat")) {
91     String host = packet.getParam("packet.attributes.machine_name");
92    
93     // look in the hostMap to see if anyone wants this data
94 tdb 1.11 synchronized(this) {
95     if(_hostMap.containsKey(host)) {
96     LinkedList list = (LinkedList) _hostMap.get(host);
97     Iterator i = list.iterator();
98     // push the data to the listening Handler's queue
99     while(i.hasNext()) {
100     ((Queue) i.next()).add(xml);
101     }
102 tdb 1.8 }
103     }
104    
105     // any handler in this list wants all packets, so send
106     // it on to them regardless
107 tdb 1.11 synchronized(this) {
108     Iterator i = _allHostDataList.iterator();
109     while(i.hasNext()) {
110     ((Queue) i.next()).add(xml);
111     }
112     }
113 tdb 1.1 }
114 tdb 1.8 else {
115 tdb 1.10 // always send this packet to all hosts, because it's
116     // "extra" data, not host data
117 tdb 1.11 synchronized(this) {
118     Iterator i = _allHostsList.iterator();
119     while(i.hasNext()) {
120     ((Queue) i.next()).add(xml);
121     }
122     }
123 tdb 1.2 }
124 tdb 1.1 }
125     }
126    
127 tdb 1.4 /**
128     * Register a DataHandler in the system. This method
129     * actually takes a reference to a Queue, which should be
130     * a Queue that the DataHandler is making use of.
131     * It also takes a hostList, this being a semi-colon
132     * seperated list of hosts that the Client the DataHandler
133     * is serving has requested. If this list is simply an empty
134     * String, it is assumed the Client wants to listen to all
135     * host information.
136     *
137     * @param dhQueue a Queue being used by the DataHandler that is registering
138     * @param hostList a semi-colon seperated list of hosts
139     */
140 tdb 1.12 public void register(Queue dhQueue, String hostList) {
141 tdb 1.4 // check to see if we want all hosts
142 tdb 1.2 if(hostList.equals("")) {
143 tdb 1.12 synchronized(this) {
144     _allHostDataList.add(dhQueue);
145     }
146 tdb 1.2 _logger.write(toString(), Logger.SYSMSG, "registered DataHandler for all hosts");
147     }
148     else {
149 tdb 1.4 // go through the list of hosts
150 tdb 1.2 StringTokenizer st = new StringTokenizer(hostList, ";");
151     while(st.hasMoreTokens()) {
152     String host = st.nextToken();
153 tdb 1.12 synchronized(this) {
154     // see if we already have a list in the map for this host
155     if(_hostMap.containsKey(host)) {
156     // we do, so add to it
157     List list = (List) _hostMap.get(host);
158     list.add(dhQueue);
159     }
160     else {
161     // we don't, so create a list and put it in the map
162     LinkedList list = new LinkedList();
163     list.add(dhQueue);
164     _hostMap.put(host, list);
165     }
166 tdb 1.2 }
167 tdb 1.1 }
168 tdb 1.2 _logger.write(toString(), Logger.SYSMSG, "registered DataHandler for hosts: "+hostList);
169 tdb 1.1 }
170 tdb 1.10 // always add host to our complete host list
171 tdb 1.12 synchronized(this) {
172     _allHostsList.add(dhQueue);
173     }
174 tdb 1.1 }
175    
176 tdb 1.4 /**
177     * Deregister a DataHandler. The DataHandler should give a reference
178     * to the Queue it's using, and the *same* hostList it gave when it
179     * register. It is imperative that the hostList is the same, otherwise
180     * there will be all sorts of problems with lists getting out of sync.
181     *
182     * NB: Possible future addition would be recording of hostList's.
183     *
184     * @param dhQueue a Queue being used by the DataHandler that is deregistering
185     * @param hostList a semi-colon seperated list of hosts
186     */
187 tdb 1.12 public void deregister(Queue dhQueue, String hostList) {
188 tdb 1.4 // go through the list of hosts
189 tdb 1.2 if(hostList.equals("")) {
190 tdb 1.12 synchronized(this) {
191     _allHostDataList.remove(dhQueue);
192     }
193 tdb 1.2 _logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for all hosts");
194     }
195     else {
196     StringTokenizer st = new StringTokenizer(hostList, ";");
197     while(st.hasMoreTokens()) {
198     String host = st.nextToken();
199 tdb 1.12 synchronized(this) {
200     // this should in reality always be true, but best check
201     if(_hostMap.containsKey(host)) {
202     // get the list and remove the host in question
203     LinkedList list = (LinkedList) _hostMap.get(host);
204     list.remove(dhQueue);
205     // if the list is now empty, we might as well remove it
206     if(list.size()==0) {
207     _hostMap.remove(host);
208     }
209 tdb 1.2 }
210 tdb 1.1 }
211     }
212 tdb 1.2 _logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for hosts: "+hostList);
213 tdb 1.1 }
214 tdb 1.10 // always remove host from our complete host list
215 tdb 1.12 synchronized(this) {
216     _allHostsList.remove(dhQueue);
217     }
218 tdb 1.1 }
219    
220     /**
221     * Overrides the {@link java.lang.Object#toString() Object.toString()}
222     * method to provide clean logging (every class should have this).
223     *
224     * This uses the uk.ac.ukc.iscream.util.NameFormat class
225     * to format the toString()
226     *
227     * @return the name of this class and its CVS revision
228     */
229     public String toString() {
230     return FormatName.getName(
231     _name,
232     getClass().getName(),
233     REVISION);
234     }
235    
236     //---PRIVATE METHODS---
237    
238     //---ACCESSOR/MUTATOR METHODS---
239 tdb 1.4
240     /**
241     * Accessor to return a reference to the Queue object. This
242     * is needed so the ClientInterfaceServant can get add data
243     * easily.
244     *
245     * @return a reference to our Queue object.
246     */
247 tdb 1.1 public Queue getQueue() {
248     return _queue;
249     }
250    
251     //---ATTRIBUTES---
252    
253     /**
254     * This is the friendly identifier of the
255     * component this class is running in.
256     * eg, a Filter may be called "filter1",
257     * If this class does not have an owning
258     * component, a name from the configuration
259     * can be placed here. This name could also
260     * be changed to null for utility classes.
261     */
262     private String _name = ClientInterfaceMain.NAME;
263    
264     /**
265     * This holds a reference to the
266     * system logger that is being used.
267     */
268     private Logger _logger = ReferenceManager.getInstance().getLogger();
269    
270 tdb 1.4 /**
271     * A reference to the Queue we're using.
272     */
273 tdb 1.1 private Queue _queue;
274    
275 tdb 1.4 /**
276     * A HashMap to store lists of Queue's (in the DataHandlers)
277     * in a way that can be easily accessed when data comes in.
278     */
279 tdb 1.11 private HashMap _hostMap;
280 tdb 1.4
281     /**
282     * A list specifically for a Queue's associated with DataHandlers
283     * that want all host information.
284 tdb 1.10 */
285 tdb 1.11 private LinkedList _allHostDataList;
286 tdb 1.10
287     /**
288     * A list of all hosts.
289 tdb 1.4 */
290 tdb 1.11 private LinkedList _allHostsList;
291 tdb 1.1
292     //---STATIC ATTRIBUTES---
293    
294     }