35 |
|
* Creates a new PacketSorter. |
36 |
|
*/ |
37 |
|
public PacketSorter() { |
38 |
+ |
// set the Thread name |
39 |
+ |
setName("clientinterface.PacketSorter"); |
40 |
+ |
|
41 |
|
_queue = new Queue(); |
42 |
< |
// startup a monitor on this queue, every minute |
43 |
< |
_queue.startMonitor(60*1000, _name); |
42 |
> |
// startup a monitor on this queue |
43 |
> |
try { |
44 |
> |
// try to get the interval, if this fails, we won't start up the monitor |
45 |
> |
ConfigurationProxy cp = ConfigurationProxy.getInstance(); |
46 |
> |
int queueMonitorInterval = Integer.parseInt(cp.getProperty("ClientInterface", "Queue.MonitorInterval")); |
47 |
> |
String queueName = _name + " PacketSorterQueue"; |
48 |
> |
_queue.startMonitor(queueMonitorInterval*1000, queueName); |
49 |
> |
} catch (PropertyNotFoundException e) { |
50 |
> |
_logger.write(toString(), Logger.WARNING, "failed to find queue monitor config, disabling. " + e); |
51 |
> |
} |
52 |
> |
|
53 |
|
_hostMap = new HashMap(); |
54 |
+ |
_allHostDataList = new LinkedList(); |
55 |
|
_allHostsList = new LinkedList(); |
56 |
|
_logger.write(toString(), Logger.SYSINIT, "created"); |
57 |
|
} |
63 |
|
* loop forever processing and sending data. |
64 |
|
*/ |
65 |
|
public void run() { |
66 |
+ |
XMLPacketMaker xmlPacketMaker = new XMLPacketMaker(); |
67 |
|
int qID = _queue.getQueue(); |
68 |
|
while(true) { |
69 |
|
// attempt to get some data from the Queue |
75 |
|
_logger.write(toString(), Logger.ERROR, "Queue failure: "+e); |
76 |
|
} |
77 |
|
|
78 |
< |
// find out which host this packet is actually for |
65 |
< |
XMLPacketMaker xmlPacketMaker = new XMLPacketMaker(xml); |
66 |
< |
XMLPacket packet = xmlPacketMaker.createXMLPacket(); |
67 |
< |
String host = packet.getParam("packet.attributes.machine_name"); |
78 |
> |
XMLPacket packet = null; |
79 |
|
|
80 |
< |
// look in the hostMap to see if anyone wants this data |
81 |
< |
if(_hostMap.containsKey(host)) { |
82 |
< |
LinkedList list = (LinkedList) _hostMap.get(host); |
83 |
< |
Iterator i = list.iterator(); |
84 |
< |
// push the data to the listening Handler's queue |
85 |
< |
while(i.hasNext()) { |
75 |
< |
((Queue) i.next()).add(xml); |
76 |
< |
} |
80 |
> |
try { |
81 |
> |
packet = xmlPacketMaker.createXMLPacket(xml); |
82 |
> |
} catch(InvalidXMLException e) { |
83 |
> |
_logger.write(toString(), Logger.ERROR, "Invalid XML: "+e); |
84 |
> |
// skip the rest of this loop iteration |
85 |
> |
continue; |
86 |
|
} |
87 |
|
|
88 |
< |
// any handler in this list wants all packets, so send |
89 |
< |
// it on to them regardless |
90 |
< |
Iterator j = _allHostsList.iterator(); |
91 |
< |
while(j.hasNext()) { |
92 |
< |
((Queue) j.next()).add(xml); |
88 |
> |
String packetType = packet.getParam("packet.attributes.type"); |
89 |
> |
// check if we need to send it regardless |
90 |
> |
if(packetType.equals("data") || packetType.equals("heartbeat")) { |
91 |
> |
String host = packet.getParam("packet.attributes.machine_name"); |
92 |
> |
|
93 |
> |
// look in the hostMap to see if anyone wants this data |
94 |
> |
synchronized(this) { |
95 |
> |
if(_hostMap.containsKey(host)) { |
96 |
> |
LinkedList list = (LinkedList) _hostMap.get(host); |
97 |
> |
Iterator i = list.iterator(); |
98 |
> |
// push the data to the listening Handler's queue |
99 |
> |
while(i.hasNext()) { |
100 |
> |
((Queue) i.next()).add(xml); |
101 |
> |
} |
102 |
> |
} |
103 |
> |
} |
104 |
> |
|
105 |
> |
// any handler in this list wants all packets, so send |
106 |
> |
// it on to them regardless |
107 |
> |
synchronized(this) { |
108 |
> |
Iterator i = _allHostDataList.iterator(); |
109 |
> |
while(i.hasNext()) { |
110 |
> |
((Queue) i.next()).add(xml); |
111 |
> |
} |
112 |
> |
} |
113 |
|
} |
114 |
+ |
else { |
115 |
+ |
// always send this packet to all hosts, because it's |
116 |
+ |
// "extra" data, not host data |
117 |
+ |
synchronized(this) { |
118 |
+ |
Iterator i = _allHostsList.iterator(); |
119 |
+ |
while(i.hasNext()) { |
120 |
+ |
((Queue) i.next()).add(xml); |
121 |
+ |
} |
122 |
+ |
} |
123 |
+ |
} |
124 |
|
} |
125 |
|
} |
126 |
|
|
140 |
|
public void register(Queue dhQueue, String hostList) { |
141 |
|
// check to see if we want all hosts |
142 |
|
if(hostList.equals("")) { |
143 |
< |
_allHostsList.add(dhQueue); |
143 |
> |
synchronized(this) { |
144 |
> |
_allHostDataList.add(dhQueue); |
145 |
> |
} |
146 |
|
_logger.write(toString(), Logger.SYSMSG, "registered DataHandler for all hosts"); |
147 |
|
} |
148 |
|
else { |
150 |
|
StringTokenizer st = new StringTokenizer(hostList, ";"); |
151 |
|
while(st.hasMoreTokens()) { |
152 |
|
String host = st.nextToken(); |
153 |
< |
// see if we already have a list in the map for this host |
154 |
< |
if(_hostMap.containsKey(host)) { |
155 |
< |
// we do, so add to it |
156 |
< |
LinkedList list = (LinkedList) _hostMap.get(host); |
157 |
< |
list.add(dhQueue); |
153 |
> |
synchronized(this) { |
154 |
> |
// see if we already have a list in the map for this host |
155 |
> |
if(_hostMap.containsKey(host)) { |
156 |
> |
// we do, so add to it |
157 |
> |
List list = (List) _hostMap.get(host); |
158 |
> |
list.add(dhQueue); |
159 |
> |
} |
160 |
> |
else { |
161 |
> |
// we don't, so create a list and put it in the map |
162 |
> |
LinkedList list = new LinkedList(); |
163 |
> |
list.add(dhQueue); |
164 |
> |
_hostMap.put(host, list); |
165 |
> |
} |
166 |
|
} |
118 |
– |
else { |
119 |
– |
// we don't, so create a list and put it in the map |
120 |
– |
LinkedList list = new LinkedList(); |
121 |
– |
list.add(dhQueue); |
122 |
– |
_hostMap.put(host, list); |
123 |
– |
} |
167 |
|
} |
168 |
|
_logger.write(toString(), Logger.SYSMSG, "registered DataHandler for hosts: "+hostList); |
169 |
|
} |
170 |
+ |
// always add host to our complete host list |
171 |
+ |
synchronized(this) { |
172 |
+ |
_allHostsList.add(dhQueue); |
173 |
+ |
} |
174 |
|
} |
175 |
|
|
176 |
|
/** |
187 |
|
public void deregister(Queue dhQueue, String hostList) { |
188 |
|
// go through the list of hosts |
189 |
|
if(hostList.equals("")) { |
190 |
< |
_allHostsList.remove(dhQueue); |
190 |
> |
synchronized(this) { |
191 |
> |
_allHostDataList.remove(dhQueue); |
192 |
> |
} |
193 |
|
_logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for all hosts"); |
194 |
|
} |
195 |
|
else { |
196 |
|
StringTokenizer st = new StringTokenizer(hostList, ";"); |
197 |
|
while(st.hasMoreTokens()) { |
198 |
|
String host = st.nextToken(); |
199 |
< |
// this should in reality always be true, but best check |
200 |
< |
if(_hostMap.containsKey(host)) { |
201 |
< |
// get the list and remove the host in question |
202 |
< |
LinkedList list = (LinkedList) _hostMap.get(host); |
203 |
< |
list.remove(dhQueue); |
204 |
< |
// if the list is now empty, we might as well remove it |
205 |
< |
if(list.size()==0) { |
206 |
< |
_hostMap.remove(host); |
199 |
> |
synchronized(this) { |
200 |
> |
// this should in reality always be true, but best check |
201 |
> |
if(_hostMap.containsKey(host)) { |
202 |
> |
// get the list and remove the host in question |
203 |
> |
LinkedList list = (LinkedList) _hostMap.get(host); |
204 |
> |
list.remove(dhQueue); |
205 |
> |
// if the list is now empty, we might as well remove it |
206 |
> |
if(list.size()==0) { |
207 |
> |
_hostMap.remove(host); |
208 |
> |
} |
209 |
|
} |
210 |
|
} |
211 |
|
} |
212 |
|
_logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for hosts: "+hostList); |
213 |
|
} |
214 |
+ |
// always remove host from our complete host list |
215 |
+ |
synchronized(this) { |
216 |
+ |
_allHostsList.remove(dhQueue); |
217 |
+ |
} |
218 |
|
} |
219 |
|
|
220 |
|
/** |
281 |
|
/** |
282 |
|
* A list specifically for a Queue's associated with DataHandlers |
283 |
|
* that want all host information. |
284 |
+ |
*/ |
285 |
+ |
private LinkedList _allHostDataList; |
286 |
+ |
|
287 |
+ |
/** |
288 |
+ |
* A list of all hosts. |
289 |
|
*/ |
290 |
|
private LinkedList _allHostsList; |
291 |
|
|