ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/cms/source/server/uk/org/iscream/cms/server/clientinterface/PacketSorter.java
Revision: 1.11
Committed: Thu Mar 1 17:29:46 2001 UTC (23 years, 3 months ago) by tdb
Branch: MAIN
Changes since 1.10: +30 -24 lines
Log Message:
Just for the record, Collections.synchronizedList() doesn't work :)
I've now manually synchronized the methods that access the lists and map, and
put the sections of the run method that do the same in synchronized blocks.
This should work, I hope. A lot of hoping seems to be going on here :)

File Contents

# Content
1 //---PACKAGE DECLARATION---
2 package uk.ac.ukc.iscream.clientinterface;
3
4 //---IMPORTS---
5 import uk.ac.ukc.iscream.util.*;
6 import uk.ac.ukc.iscream.componentmanager.*;
7 import uk.ac.ukc.iscream.core.*;
8 import java.util.*;
9
10 /**
11 * Receives data from the incoming CORBA servant, places
12 * it in a Queue, and then arranges distribution to the
13 * DataHandlers.
14 * Has extra functionality to send data to DataHandlers
15 * on a per-host basis - ie. the Client can request which
16 * hosts it would like to listen for.
17 *
18 * @author $Author: tdb1 $
19 * @version $Id: PacketSorter.java,v 1.10 2001/03/01 16:53:16 tdb1 Exp $
20 */
21 class PacketSorter extends Thread {
22
23 //---FINAL ATTRIBUTES---
24
25 /**
26 * The current CVS revision of this class
27 */
28 public final String REVISION = "$Revision: 1.10 $";
29
30 //---STATIC METHODS---
31
32 //---CONSTRUCTORS---
33
34 /**
35 * Creates a new PacketSorter.
36 *
37 * @param queueMonitorInterval The interval at which to monitor the Queue
38 */
39 public PacketSorter(int queueMonitorInterval) {
40 _queue = new Queue();
41 // startup a monitor on this queue, every minute
42 String queueName = _name + " PacketSorterQueue";
43 _queue.startMonitor(queueMonitorInterval*1000, queueName);
44 _hostMap = new HashMap();
45 _allHostDataList = new LinkedList();
46 _allHostsList = new LinkedList();
47 _logger.write(toString(), Logger.SYSINIT, "created");
48 }
49
50 //---PUBLIC METHODS---
51
52 /**
53 * Method to start the PacketSorter running. This method will
54 * loop forever processing and sending data.
55 */
56 public void run() {
57 int qID = _queue.getQueue();
58 while(true) {
59 // attempt to get some data from the Queue
60 String xml = "";
61 try {
62 xml = (String) _queue.get(qID);
63 }
64 catch(InvalidQueueException e) {
65 _logger.write(toString(), Logger.ERROR, "Queue failure: "+e);
66 }
67
68 XMLPacket packet = null;
69
70 try {
71 XMLPacketMaker xmlPacketMaker = new XMLPacketMaker(xml);
72 packet = xmlPacketMaker.createXMLPacket();
73 } catch(InvalidXMLException e) {
74 _logger.write(toString(), Logger.ERROR, "Invalid XML: "+e);
75 // skip the rest of this loop iteration
76 continue;
77 }
78
79 String packetType = packet.getParam("packet.attributes.type");
80 // check if we need to send it regardless
81 if(packetType.equals("data") || packetType.equals("heartbeat")) {
82 String host = packet.getParam("packet.attributes.machine_name");
83
84 // look in the hostMap to see if anyone wants this data
85 synchronized(this) {
86 if(_hostMap.containsKey(host)) {
87 LinkedList list = (LinkedList) _hostMap.get(host);
88 Iterator i = list.iterator();
89 // push the data to the listening Handler's queue
90 while(i.hasNext()) {
91 ((Queue) i.next()).add(xml);
92 }
93 }
94 }
95
96 // any handler in this list wants all packets, so send
97 // it on to them regardless
98 synchronized(this) {
99 Iterator i = _allHostDataList.iterator();
100 while(i.hasNext()) {
101 ((Queue) i.next()).add(xml);
102 }
103 }
104 }
105 else {
106 // always send this packet to all hosts, because it's
107 // "extra" data, not host data
108 synchronized(this) {
109 Iterator i = _allHostsList.iterator();
110 while(i.hasNext()) {
111 ((Queue) i.next()).add(xml);
112 }
113 }
114 }
115 }
116 }
117
118 /**
119 * Register a DataHandler in the system. This method
120 * actually takes a reference to a Queue, which should be
121 * a Queue that the DataHandler is making use of.
122 * It also takes a hostList, this being a semi-colon
123 * seperated list of hosts that the Client the DataHandler
124 * is serving has requested. If this list is simply an empty
125 * String, it is assumed the Client wants to listen to all
126 * host information.
127 *
128 * @param dhQueue a Queue being used by the DataHandler that is registering
129 * @param hostList a semi-colon seperated list of hosts
130 */
131 public synchronized void register(Queue dhQueue, String hostList) {
132 // check to see if we want all hosts
133 if(hostList.equals("")) {
134 _allHostDataList.add(dhQueue);
135 _logger.write(toString(), Logger.SYSMSG, "registered DataHandler for all hosts");
136 }
137 else {
138 // go through the list of hosts
139 StringTokenizer st = new StringTokenizer(hostList, ";");
140 while(st.hasMoreTokens()) {
141 String host = st.nextToken();
142 // see if we already have a list in the map for this host
143 if(_hostMap.containsKey(host)) {
144 // we do, so add to it
145 List list = (List) _hostMap.get(host);
146 list.add(dhQueue);
147 }
148 else {
149 // we don't, so create a list and put it in the map
150 LinkedList list = new LinkedList();
151 list.add(dhQueue);
152 _hostMap.put(host, list);
153 }
154 }
155 _logger.write(toString(), Logger.SYSMSG, "registered DataHandler for hosts: "+hostList);
156 }
157 // always add host to our complete host list
158 _allHostsList.add(dhQueue);
159 }
160
161 /**
162 * Deregister a DataHandler. The DataHandler should give a reference
163 * to the Queue it's using, and the *same* hostList it gave when it
164 * register. It is imperative that the hostList is the same, otherwise
165 * there will be all sorts of problems with lists getting out of sync.
166 *
167 * NB: Possible future addition would be recording of hostList's.
168 *
169 * @param dhQueue a Queue being used by the DataHandler that is deregistering
170 * @param hostList a semi-colon seperated list of hosts
171 */
172 public synchronized void deregister(Queue dhQueue, String hostList) {
173 // go through the list of hosts
174 if(hostList.equals("")) {
175 _allHostDataList.remove(dhQueue);
176 _logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for all hosts");
177 }
178 else {
179 StringTokenizer st = new StringTokenizer(hostList, ";");
180 while(st.hasMoreTokens()) {
181 String host = st.nextToken();
182 // this should in reality always be true, but best check
183 if(_hostMap.containsKey(host)) {
184 // get the list and remove the host in question
185 LinkedList list = (LinkedList) _hostMap.get(host);
186 list.remove(dhQueue);
187 // if the list is now empty, we might as well remove it
188 if(list.size()==0) {
189 _hostMap.remove(host);
190 }
191 }
192 }
193 _logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for hosts: "+hostList);
194 }
195 // always remove host from our complete host list
196 _allHostsList.remove(dhQueue);
197 }
198
199 /**
200 * Overrides the {@link java.lang.Object#toString() Object.toString()}
201 * method to provide clean logging (every class should have this).
202 *
203 * This uses the uk.ac.ukc.iscream.util.NameFormat class
204 * to format the toString()
205 *
206 * @return the name of this class and its CVS revision
207 */
208 public String toString() {
209 return FormatName.getName(
210 _name,
211 getClass().getName(),
212 REVISION);
213 }
214
215 //---PRIVATE METHODS---
216
217 //---ACCESSOR/MUTATOR METHODS---
218
219 /**
220 * Accessor to return a reference to the Queue object. This
221 * is needed so the ClientInterfaceServant can get add data
222 * easily.
223 *
224 * @return a reference to our Queue object.
225 */
226 public Queue getQueue() {
227 return _queue;
228 }
229
230 //---ATTRIBUTES---
231
232 /**
233 * This is the friendly identifier of the
234 * component this class is running in.
235 * eg, a Filter may be called "filter1",
236 * If this class does not have an owning
237 * component, a name from the configuration
238 * can be placed here. This name could also
239 * be changed to null for utility classes.
240 */
241 private String _name = ClientInterfaceMain.NAME;
242
243 /**
244 * This holds a reference to the
245 * system logger that is being used.
246 */
247 private Logger _logger = ReferenceManager.getInstance().getLogger();
248
249 /**
250 * A reference to the Queue we're using.
251 */
252 private Queue _queue;
253
254 /**
255 * A HashMap to store lists of Queue's (in the DataHandlers)
256 * in a way that can be easily accessed when data comes in.
257 */
258 private HashMap _hostMap;
259
260 /**
261 * A list specifically for a Queue's associated with DataHandlers
262 * that want all host information.
263 */
264 private LinkedList _allHostDataList;
265
266 /**
267 * A list of all hosts.
268 */
269 private LinkedList _allHostsList;
270
271 //---STATIC ATTRIBUTES---
272
273 }