ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/cms/source/server/uk/org/iscream/cms/server/clientinterface/PacketSorter.java
Revision: 1.23
Committed: Mon Feb 24 20:18:48 2003 UTC (21 years, 3 months ago) by tdb
Branch: MAIN
Changes since 1.22: +3 -3 lines
Log Message:
Fairly major commit. This will break the current version of ihost, but this
had to be done really to give Pete something to test the new ihost against.

The main change here is removal of the TCP Heartbeat functionality from the
filter. This meant the following features stopped working :-
  - Heartbeat testing
  - Configuration checking
  - Service checks

The heartbeat testing, specifically the monitor, now looks at the presence
of UDP packets instead. Before it just looked for the presence of a TCP
heartbeat packet, so the change their is fairly negligible. Of course this
means heartbeat testing now relies on the UDP working... but I don't see
this as a problem.

Configuration checking has been repositioned in to the filtermanager. This
is a backwards compatible change - the filtermanager should still perform
as it should for older hosts. But now there's an extra command to check the
configuration is up-to-date, with a similar format to the old TCP protocol
in the filter. (although we may optimise this soon)

The service checks are broken. This isn't a major issue for us as they were
pretty useless in the first place. The concept is good, but the checks are
just far too primitive. I expect at some point I'll work on a seperate
component that just monitors services, which will replace this function.

Further changes in the server include removal of the key checking code,
as this relied on a bolt on to the TCP heartbeat protocol to ship the
key. This got more akward than originally planned, so I'm happy to drop the
idea. In the long term we hope to replace this with a public key systems
for signing and even encryption.

Finally, general tidy up to remove other bits of code that check for
TCP heartbeat packets when they don't need to any more.

File Contents

# Content
1 /*
2 * i-scream central monitoring system
3 * http://www.i-scream.org.uk
4 * Copyright (C) 2000-2002 i-scream
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version 2
9 * of the License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
19 */
20
21 //---PACKAGE DECLARATION---
22 package uk.org.iscream.cms.server.clientinterface;
23
24 //---IMPORTS---
25 import uk.org.iscream.cms.util.*;
26 import uk.org.iscream.cms.server.componentmanager.*;
27 import uk.org.iscream.cms.server.core.*;
28 import java.util.*;
29
30 /**
31 * Receives data from the incoming CORBA servant, places
32 * it in a Queue, and then arranges distribution to the
33 * DataHandlers.
34 * Has extra functionality to send data to DataHandlers
35 * on a per-host basis - ie. the Client can request which
36 * hosts it would like to listen for.
37 *
38 * @author $Author: tdb $
39 * @version $Id: PacketSorter.java,v 1.22 2003/02/05 16:43:46 tdb Exp $
40 */
41 class PacketSorter extends Thread {
42
43 //---FINAL ATTRIBUTES---
44
45 /**
46 * The current CVS revision of this class
47 */
48 public final String REVISION = "$Revision: 1.22 $";
49
50 //---STATIC METHODS---
51
52 //---CONSTRUCTORS---
53
54 /**
55 * Creates a new PacketSorter.
56 */
57 public PacketSorter() {
58 // set the Thread name
59 setName("clientinterface.PacketSorter");
60
61 ConfigurationProxy cp = ConfigurationProxy.getInstance();
62 String configName = "ClientInterface";
63
64 // see if this Queue needs a size limit
65 try {
66 int queueSizeLimit = Integer.parseInt(cp.getProperty(configName, "Queue.SizeLimit"));
67 String queueRemoveAlgorithm = cp.getProperty(configName, "Queue.RemoveAlgorithm");
68 int algorithm = StringUtils.getStringPos(queueRemoveAlgorithm, Queue.algorithms);
69 if(algorithm != -1) {
70 _logger.write(toString(), Logger.DEBUG, "Starting Queue with size limit of "+queueSizeLimit+", using remove algorithm "+queueRemoveAlgorithm);
71 // we have valid values, so lets start it.
72 _queue = new Queue(queueSizeLimit, algorithm);
73 }
74 else {
75 _logger.write(toString(), Logger.WARNING, "Bad Queue Algorithm configuration, not known: "+queueRemoveAlgorithm);
76 // just don't activate a limit
77 _queue = new Queue();
78 }
79
80 } catch (PropertyNotFoundException e) {
81 _logger.write(toString(), Logger.DEBUG, "Optional config not set: "+e);
82 // just don't activate a limit
83 _queue = new Queue();
84 } catch (NumberFormatException e) {
85 _logger.write(toString(), Logger.WARNING, "Bad Queue SizeLimit configuration: "+e);
86 // just don't activate a limit
87 _queue = new Queue();
88 }
89
90 // startup a monitor on this queue
91 try {
92 // try to get the interval, if this fails, we won't start up the monitor
93 int queueMonitorInterval = Integer.parseInt(cp.getProperty(configName, "Queue.MonitorInterval"));
94 String queueName = _name + " PacketSorterQueue";
95 _queue.startMonitor(queueMonitorInterval*1000, queueName);
96 } catch (PropertyNotFoundException e) {
97 _logger.write(toString(), Logger.WARNING, "failed to find queue monitor config, disabling. " + e);
98 }
99
100 _hostMap = new HashMap();
101 _allHostDataList = new LinkedList();
102 _allHostsList = new LinkedList();
103 _logger.write(toString(), Logger.SYSINIT, "created");
104 }
105
106 //---PUBLIC METHODS---
107
108 /**
109 * Method to start the PacketSorter running. This method will
110 * loop forever processing and sending data.
111 */
112 public void run() {
113 int qID = _queue.getQueue();
114 while(true) {
115 // attempt to get some data from the Queue
116 String xml = "";
117 try {
118 xml = (String) _queue.get(qID);
119 }
120 catch(InvalidQueueException e) {
121 _logger.write(toString(), Logger.ERROR, "Queue failure: "+e);
122 }
123
124 XMLPacket packet = null;
125
126 try {
127 packet = _xmlCache.getXMLPacket(xml);
128 } catch(InvalidXMLException e) {
129 _logger.write(toString(), Logger.ERROR, "Invalid XML: "+e);
130 // skip the rest of this loop iteration
131 continue;
132 }
133
134 String packetType = packet.getParam("packet.attributes.type");
135 // check if we need to send it regardless
136 if(packetType.equals("data")) {
137 String host = packet.getParam("packet.attributes.machine_name");
138
139 // look in the hostMap to see if anyone wants this data
140 synchronized(this) {
141 if(_hostMap.containsKey(host)) {
142 LinkedList list = (LinkedList) _hostMap.get(host);
143 Iterator i = list.iterator();
144 // push the data to the listening Handler's queue
145 while(i.hasNext()) {
146 ((Queue) i.next()).add(xml);
147 }
148 }
149 }
150
151 // any handler in this list wants all packets, so send
152 // it on to them regardless
153 synchronized(this) {
154 Iterator i = _allHostDataList.iterator();
155 while(i.hasNext()) {
156 ((Queue) i.next()).add(xml);
157 }
158 }
159 }
160 else {
161 // always send this packet to all hosts, because it's
162 // "extra" data, not host data
163 synchronized(this) {
164 Iterator i = _allHostsList.iterator();
165 while(i.hasNext()) {
166 ((Queue) i.next()).add(xml);
167 }
168 }
169 }
170 }
171 }
172
173 /**
174 * Register a DataHandler in the system. This method
175 * actually takes a reference to a Queue, which should be
176 * a Queue that the DataHandler is making use of.
177 * It also takes a hostList, this being a semi-colon
178 * seperated list of hosts that the Client the DataHandler
179 * is serving has requested. If this list is simply an empty
180 * String, it is assumed the Client wants to listen to all
181 * host information.
182 *
183 * @param dhQueue a Queue being used by the DataHandler that is registering
184 * @param hostList a semi-colon seperated list of hosts
185 */
186 public void register(Queue dhQueue, String hostList) {
187 // check to see if we want all hosts
188 if(hostList.equals("")) {
189 synchronized(this) {
190 _allHostDataList.add(dhQueue);
191 }
192 _logger.write(toString(), Logger.SYSMSG, "registered DataHandler for all hosts");
193 }
194 else {
195 // go through the list of hosts
196 StringTokenizer st = new StringTokenizer(hostList, ";");
197 while(st.hasMoreTokens()) {
198 String host = st.nextToken();
199 synchronized(this) {
200 // see if we already have a list in the map for this host
201 if(_hostMap.containsKey(host)) {
202 // we do, so add to it
203 List list = (List) _hostMap.get(host);
204 list.add(dhQueue);
205 }
206 else {
207 // we don't, so create a list and put it in the map
208 LinkedList list = new LinkedList();
209 list.add(dhQueue);
210 _hostMap.put(host, list);
211 }
212 }
213 }
214 _logger.write(toString(), Logger.SYSMSG, "registered DataHandler for hosts: "+hostList);
215 }
216 // always add host to our complete host list
217 synchronized(this) {
218 _allHostsList.add(dhQueue);
219 }
220 }
221
222 /**
223 * Deregister a DataHandler. The DataHandler should give a reference
224 * to the Queue it's using, and the *same* hostList it gave when it
225 * register. It is imperative that the hostList is the same, otherwise
226 * there will be all sorts of problems with lists getting out of sync.
227 *
228 * NB: Possible future addition would be recording of hostList's.
229 *
230 * @param dhQueue a Queue being used by the DataHandler that is deregistering
231 * @param hostList a semi-colon seperated list of hosts
232 */
233 public void deregister(Queue dhQueue, String hostList) {
234 // go through the list of hosts
235 if(hostList.equals("")) {
236 synchronized(this) {
237 _allHostDataList.remove(dhQueue);
238 }
239 _logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for all hosts");
240 }
241 else {
242 StringTokenizer st = new StringTokenizer(hostList, ";");
243 while(st.hasMoreTokens()) {
244 String host = st.nextToken();
245 synchronized(this) {
246 // this should in reality always be true, but best check
247 if(_hostMap.containsKey(host)) {
248 // get the list and remove the host in question
249 LinkedList list = (LinkedList) _hostMap.get(host);
250 list.remove(dhQueue);
251 // if the list is now empty, we might as well remove it
252 if(list.size()==0) {
253 _hostMap.remove(host);
254 }
255 }
256 }
257 }
258 _logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for hosts: "+hostList);
259 }
260 // always remove host from our complete host list
261 synchronized(this) {
262 _allHostsList.remove(dhQueue);
263 }
264 }
265
266 /**
267 * Overrides the {@link java.lang.Object#toString() Object.toString()}
268 * method to provide clean logging (every class should have this).
269 *
270 * This uses the uk.org.iscream.cms.util.NameFormat class
271 * to format the toString()
272 *
273 * @return the name of this class and its CVS revision
274 */
275 public String toString() {
276 return FormatName.getName(
277 _name,
278 getClass().getName(),
279 REVISION);
280 }
281
282 //---PRIVATE METHODS---
283
284 //---ACCESSOR/MUTATOR METHODS---
285
286 /**
287 * Accessor to return a reference to the Queue object. This
288 * is needed so the ClientInterfaceServant can get add data
289 * easily.
290 *
291 * @return a reference to our Queue object.
292 */
293 public Queue getQueue() {
294 return _queue;
295 }
296
297 //---ATTRIBUTES---
298
299 /**
300 * This is the friendly identifier of the
301 * component this class is running in.
302 * eg, a Filter may be called "filter1",
303 * If this class does not have an owning
304 * component, a name from the configuration
305 * can be placed here. This name could also
306 * be changed to null for utility classes.
307 */
308 private String _name = ClientInterfaceMain.NAME;
309
310 /**
311 * This holds a reference to the
312 * system logger that is being used.
313 */
314 private Logger _logger = ReferenceManager.getInstance().getLogger();
315
316 /**
317 * A reference to the Queue we're using.
318 */
319 private Queue _queue;
320
321 /**
322 * A HashMap to store lists of Queue's (in the DataHandlers)
323 * in a way that can be easily accessed when data comes in.
324 */
325 private HashMap _hostMap;
326
327 /**
328 * A list specifically for a Queue's associated with DataHandlers
329 * that want all host information.
330 */
331 private LinkedList _allHostDataList;
332
333 /**
334 * A list of all hosts.
335 */
336 private LinkedList _allHostsList;
337
338 /**
339 * A reference to the XMLCache in use
340 */
341 private XMLCache _xmlCache = XMLCache.getInstance();
342
343 //---STATIC ATTRIBUTES---
344
345 }