37 |
|
* @param queueMonitorInterval The interval at which to monitor the Queue |
38 |
|
*/ |
39 |
|
public PacketSorter(int queueMonitorInterval) { |
40 |
+ |
// set the Thread name |
41 |
+ |
setName("clientinterface.PacketSorter"); |
42 |
+ |
|
43 |
|
_queue = new Queue(); |
44 |
|
// startup a monitor on this queue, every minute |
45 |
|
String queueName = _name + " PacketSorterQueue"; |
46 |
|
_queue.startMonitor(queueMonitorInterval*1000, queueName); |
47 |
< |
_hostMap = Collections.synchronizedMap(new HashMap()); |
48 |
< |
_allHostsList = Collections.synchronizedList(new LinkedList()); |
47 |
> |
_hostMap = new HashMap(); |
48 |
> |
_allHostDataList = new LinkedList(); |
49 |
> |
_allHostsList = new LinkedList(); |
50 |
|
_logger.write(toString(), Logger.SYSINIT, "created"); |
51 |
|
} |
52 |
|
|
57 |
|
* loop forever processing and sending data. |
58 |
|
*/ |
59 |
|
public void run() { |
60 |
+ |
XMLPacketMaker xmlPacketMaker = new XMLPacketMaker(); |
61 |
|
int qID = _queue.getQueue(); |
62 |
|
while(true) { |
63 |
|
// attempt to get some data from the Queue |
69 |
|
_logger.write(toString(), Logger.ERROR, "Queue failure: "+e); |
70 |
|
} |
71 |
|
|
72 |
< |
// find out which host this packet is actually for |
68 |
< |
XMLPacketMaker xmlPacketMaker = new XMLPacketMaker(xml); |
69 |
< |
XMLPacket packet = xmlPacketMaker.createXMLPacket(); |
72 |
> |
XMLPacket packet = null; |
73 |
|
|
74 |
+ |
try { |
75 |
+ |
packet = xmlPacketMaker.createXMLPacket(xml); |
76 |
+ |
} catch(InvalidXMLException e) { |
77 |
+ |
_logger.write(toString(), Logger.ERROR, "Invalid XML: "+e); |
78 |
+ |
// skip the rest of this loop iteration |
79 |
+ |
continue; |
80 |
+ |
} |
81 |
+ |
|
82 |
|
String packetType = packet.getParam("packet.attributes.type"); |
83 |
|
// check if we need to send it regardless |
84 |
|
if(packetType.equals("data") || packetType.equals("heartbeat")) { |
85 |
|
String host = packet.getParam("packet.attributes.machine_name"); |
86 |
|
|
87 |
|
// look in the hostMap to see if anyone wants this data |
88 |
< |
if(_hostMap.containsKey(host)) { |
89 |
< |
LinkedList list = (LinkedList) _hostMap.get(host); |
90 |
< |
Iterator i = list.iterator(); |
91 |
< |
// push the data to the listening Handler's queue |
92 |
< |
while(i.hasNext()) { |
93 |
< |
((Queue) i.next()).add(xml); |
88 |
> |
synchronized(this) { |
89 |
> |
if(_hostMap.containsKey(host)) { |
90 |
> |
LinkedList list = (LinkedList) _hostMap.get(host); |
91 |
> |
Iterator i = list.iterator(); |
92 |
> |
// push the data to the listening Handler's queue |
93 |
> |
while(i.hasNext()) { |
94 |
> |
((Queue) i.next()).add(xml); |
95 |
> |
} |
96 |
|
} |
97 |
|
} |
98 |
|
|
99 |
|
// any handler in this list wants all packets, so send |
100 |
|
// it on to them regardless |
101 |
< |
Iterator i = _allHostsList.iterator(); |
102 |
< |
while(i.hasNext()) { |
103 |
< |
((Queue) i.next()).add(xml); |
104 |
< |
} |
101 |
> |
synchronized(this) { |
102 |
> |
Iterator i = _allHostDataList.iterator(); |
103 |
> |
while(i.hasNext()) { |
104 |
> |
((Queue) i.next()).add(xml); |
105 |
> |
} |
106 |
> |
} |
107 |
|
} |
108 |
|
else { |
109 |
< |
// send to all hosts because it's a special packet |
110 |
< |
Iterator i = _allHostsList.iterator(); |
111 |
< |
while(i.hasNext()) { |
112 |
< |
((Queue) i.next()).add(xml); |
113 |
< |
} |
109 |
> |
// always send this packet to all hosts, because it's |
110 |
> |
// "extra" data, not host data |
111 |
> |
synchronized(this) { |
112 |
> |
Iterator i = _allHostsList.iterator(); |
113 |
> |
while(i.hasNext()) { |
114 |
> |
((Queue) i.next()).add(xml); |
115 |
> |
} |
116 |
> |
} |
117 |
|
} |
118 |
|
} |
119 |
|
} |
134 |
|
public void register(Queue dhQueue, String hostList) { |
135 |
|
// check to see if we want all hosts |
136 |
|
if(hostList.equals("")) { |
137 |
< |
_allHostsList.add(dhQueue); |
137 |
> |
synchronized(this) { |
138 |
> |
_allHostDataList.add(dhQueue); |
139 |
> |
} |
140 |
|
_logger.write(toString(), Logger.SYSMSG, "registered DataHandler for all hosts"); |
141 |
|
} |
142 |
|
else { |
144 |
|
StringTokenizer st = new StringTokenizer(hostList, ";"); |
145 |
|
while(st.hasMoreTokens()) { |
146 |
|
String host = st.nextToken(); |
147 |
< |
// see if we already have a list in the map for this host |
148 |
< |
if(_hostMap.containsKey(host)) { |
149 |
< |
// we do, so add to it |
150 |
< |
LinkedList list = (LinkedList) _hostMap.get(host); |
151 |
< |
list.add(dhQueue); |
147 |
> |
synchronized(this) { |
148 |
> |
// see if we already have a list in the map for this host |
149 |
> |
if(_hostMap.containsKey(host)) { |
150 |
> |
// we do, so add to it |
151 |
> |
List list = (List) _hostMap.get(host); |
152 |
> |
list.add(dhQueue); |
153 |
> |
} |
154 |
> |
else { |
155 |
> |
// we don't, so create a list and put it in the map |
156 |
> |
LinkedList list = new LinkedList(); |
157 |
> |
list.add(dhQueue); |
158 |
> |
_hostMap.put(host, list); |
159 |
> |
} |
160 |
|
} |
133 |
– |
else { |
134 |
– |
// we don't, so create a list and put it in the map |
135 |
– |
List list = Collections.synchronizedList(new LinkedList()); |
136 |
– |
list.add(dhQueue); |
137 |
– |
_hostMap.put(host, list); |
138 |
– |
} |
161 |
|
} |
162 |
|
_logger.write(toString(), Logger.SYSMSG, "registered DataHandler for hosts: "+hostList); |
163 |
|
} |
164 |
+ |
// always add host to our complete host list |
165 |
+ |
synchronized(this) { |
166 |
+ |
_allHostsList.add(dhQueue); |
167 |
+ |
} |
168 |
|
} |
169 |
|
|
170 |
|
/** |
181 |
|
public void deregister(Queue dhQueue, String hostList) { |
182 |
|
// go through the list of hosts |
183 |
|
if(hostList.equals("")) { |
184 |
< |
_allHostsList.remove(dhQueue); |
184 |
> |
synchronized(this) { |
185 |
> |
_allHostDataList.remove(dhQueue); |
186 |
> |
} |
187 |
|
_logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for all hosts"); |
188 |
|
} |
189 |
|
else { |
190 |
|
StringTokenizer st = new StringTokenizer(hostList, ";"); |
191 |
|
while(st.hasMoreTokens()) { |
192 |
|
String host = st.nextToken(); |
193 |
< |
// this should in reality always be true, but best check |
194 |
< |
if(_hostMap.containsKey(host)) { |
195 |
< |
// get the list and remove the host in question |
196 |
< |
LinkedList list = (LinkedList) _hostMap.get(host); |
197 |
< |
list.remove(dhQueue); |
198 |
< |
// if the list is now empty, we might as well remove it |
199 |
< |
if(list.size()==0) { |
200 |
< |
_hostMap.remove(host); |
193 |
> |
synchronized(this) { |
194 |
> |
// this should in reality always be true, but best check |
195 |
> |
if(_hostMap.containsKey(host)) { |
196 |
> |
// get the list and remove the host in question |
197 |
> |
LinkedList list = (LinkedList) _hostMap.get(host); |
198 |
> |
list.remove(dhQueue); |
199 |
> |
// if the list is now empty, we might as well remove it |
200 |
> |
if(list.size()==0) { |
201 |
> |
_hostMap.remove(host); |
202 |
> |
} |
203 |
|
} |
204 |
|
} |
205 |
|
} |
206 |
|
_logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for hosts: "+hostList); |
207 |
|
} |
208 |
+ |
// always remove host from our complete host list |
209 |
+ |
synchronized(this) { |
210 |
+ |
_allHostsList.remove(dhQueue); |
211 |
+ |
} |
212 |
|
} |
213 |
|
|
214 |
|
/** |
270 |
|
* A HashMap to store lists of Queue's (in the DataHandlers) |
271 |
|
* in a way that can be easily accessed when data comes in. |
272 |
|
*/ |
273 |
< |
private Map _hostMap; |
273 |
> |
private HashMap _hostMap; |
274 |
|
|
275 |
|
/** |
276 |
|
* A list specifically for a Queue's associated with DataHandlers |
277 |
|
* that want all host information. |
278 |
|
*/ |
279 |
< |
private List _allHostsList; |
279 |
> |
private LinkedList _allHostDataList; |
280 |
> |
|
281 |
> |
/** |
282 |
> |
* A list of all hosts. |
283 |
> |
*/ |
284 |
> |
private LinkedList _allHostsList; |
285 |
|
|
286 |
|
//---STATIC ATTRIBUTES--- |
287 |
|
|