ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/cms/source/server/uk/org/iscream/cms/server/clientinterface/PacketSorter.java
Revision: 1.26
Committed: Sun Sep 25 09:57:41 2005 UTC (18 years, 7 months ago) by tdb
Branch: MAIN
CVS Tags: HEAD
Changes since 1.25: +7 -3 lines
Log Message:
Fix compile problems on j2se 1.5 - our Queue class conflicted with one in
java.util. Also fix an API issue when running the server on Windows - the
println method sends '\r\n' on Windows instead of '\n' on Unix, which
confuses applications such as ihost.

Patch provided by: skel

File Contents

# Content
1 /*
2 * i-scream central monitoring system
3 * http://www.i-scream.org
4 * Copyright (C) 2000-2002 i-scream
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version 2
9 * of the License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
19 */
20
21 //---PACKAGE DECLARATION---
22 package uk.org.iscream.cms.server.clientinterface;
23
24 //---IMPORTS---
25 import uk.org.iscream.cms.util.*;
26 import uk.org.iscream.cms.server.componentmanager.*;
27 import uk.org.iscream.cms.server.core.*;
28 import java.util.HashMap;
29 import java.util.Iterator;
30 import java.util.LinkedList;
31 import java.util.List;
32 import java.util.StringTokenizer;
33
34 /**
35 * Receives data from the incoming CORBA servant, places
36 * it in a Queue, and then arranges distribution to the
37 * DataHandlers.
38 * Has extra functionality to send data to DataHandlers
39 * on a per-host basis - ie. the Client can request which
40 * hosts it would like to listen for.
41 *
42 * @author $Author: tdb $
43 * @version $Id: PacketSorter.java,v 1.25 2004/12/08 14:50:32 tdb Exp $
44 */
45 class PacketSorter extends Thread {
46
47 //---FINAL ATTRIBUTES---
48
49 /**
50 * The current CVS revision of this class
51 */
52 public final String REVISION = "$Revision: 1.25 $";
53
54 //---STATIC METHODS---
55
56 //---CONSTRUCTORS---
57
58 /**
59 * Creates a new PacketSorter.
60 */
61 public PacketSorter() {
62 // set the Thread name
63 setName("clientinterface.PacketSorter");
64
65 ConfigurationProxy cp = ConfigurationProxy.getInstance();
66 String configName = "ClientInterface";
67
68 // see if this Queue needs a size limit
69 try {
70 int queueSizeLimit = Integer.parseInt(cp.getProperty(configName, "Queue.SizeLimit"));
71 String queueRemoveAlgorithm = cp.getProperty(configName, "Queue.RemoveAlgorithm");
72 int algorithm = StringUtils.getStringPos(queueRemoveAlgorithm, Queue.algorithms);
73 if(algorithm != -1) {
74 _logger.write(toString(), Logger.DEBUG, "Starting Queue with size limit of "+queueSizeLimit+", using remove algorithm "+queueRemoveAlgorithm);
75 // we have valid values, so lets start it.
76 _queue = new Queue(queueSizeLimit, algorithm);
77 }
78 else {
79 _logger.write(toString(), Logger.WARNING, "Bad Queue Algorithm configuration, not known: "+queueRemoveAlgorithm);
80 // just don't activate a limit
81 _queue = new Queue();
82 }
83
84 } catch (PropertyNotFoundException e) {
85 _logger.write(toString(), Logger.DEBUG, "Optional config not set: "+e);
86 // just don't activate a limit
87 _queue = new Queue();
88 } catch (NumberFormatException e) {
89 _logger.write(toString(), Logger.WARNING, "Bad Queue SizeLimit configuration: "+e);
90 // just don't activate a limit
91 _queue = new Queue();
92 }
93
94 // startup a monitor on this queue
95 try {
96 // try to get the interval, if this fails, we won't start up the monitor
97 int queueMonitorInterval = Integer.parseInt(cp.getProperty(configName, "Queue.MonitorInterval"));
98 String queueName = _name + " PacketSorterQueue";
99 _queue.startMonitor(queueMonitorInterval*1000, queueName);
100 } catch (PropertyNotFoundException e) {
101 _logger.write(toString(), Logger.WARNING, "failed to find queue monitor config, disabling. " + e);
102 }
103
104 _hostMap = new HashMap();
105 _allHostDataList = new LinkedList();
106 _allHostsList = new LinkedList();
107 _logger.write(toString(), Logger.SYSINIT, "created");
108 }
109
110 //---PUBLIC METHODS---
111
112 /**
113 * Method to start the PacketSorter running. This method will
114 * loop forever processing and sending data.
115 */
116 public void run() {
117 int qID = _queue.getQueue();
118 while(true) {
119 // attempt to get some data from the Queue
120 String xml = "";
121 try {
122 xml = (String) _queue.get(qID);
123 }
124 catch(InvalidQueueException e) {
125 _logger.write(toString(), Logger.ERROR, "Queue failure: "+e);
126 }
127
128 XMLPacket packet = null;
129
130 try {
131 packet = _xmlCache.getXMLPacket(xml);
132 } catch(InvalidXMLException e) {
133 _logger.write(toString(), Logger.ERROR, "Invalid XML: "+e);
134 // skip the rest of this loop iteration
135 continue;
136 }
137
138 if(packet == null) {
139 continue;
140 }
141
142 String packetType = packet.getParam("packet.attributes.type");
143 // check if we need to send it regardless
144 if(packetType.equals("data")) {
145 String host = packet.getParam("packet.attributes.machine_name");
146
147 // look in the hostMap to see if anyone wants this data
148 synchronized(this) {
149 if(_hostMap.containsKey(host)) {
150 LinkedList list = (LinkedList) _hostMap.get(host);
151 Iterator i = list.iterator();
152 // push the data to the listening Handler's queue
153 while(i.hasNext()) {
154 ((Queue) i.next()).add(xml);
155 }
156 }
157 }
158
159 // any handler in this list wants all packets, so send
160 // it on to them regardless
161 synchronized(this) {
162 Iterator i = _allHostDataList.iterator();
163 while(i.hasNext()) {
164 ((Queue) i.next()).add(xml);
165 }
166 }
167 }
168 else {
169 // always send this packet to all hosts, because it's
170 // "extra" data, not host data
171 synchronized(this) {
172 Iterator i = _allHostsList.iterator();
173 while(i.hasNext()) {
174 ((Queue) i.next()).add(xml);
175 }
176 }
177 }
178 }
179 }
180
181 /**
182 * Register a DataHandler in the system. This method
183 * actually takes a reference to a Queue, which should be
184 * a Queue that the DataHandler is making use of.
185 * It also takes a hostList, this being a semi-colon
186 * seperated list of hosts that the Client the DataHandler
187 * is serving has requested. If this list is simply an empty
188 * String, it is assumed the Client wants to listen to all
189 * host information.
190 *
191 * @param dhQueue a Queue being used by the DataHandler that is registering
192 * @param hostList a semi-colon seperated list of hosts
193 */
194 public void register(Queue dhQueue, String hostList) {
195 // check to see if we want all hosts
196 if(hostList.equals("")) {
197 synchronized(this) {
198 _allHostDataList.add(dhQueue);
199 }
200 _logger.write(toString(), Logger.SYSMSG, "registered DataHandler for all hosts");
201 }
202 else {
203 // go through the list of hosts
204 StringTokenizer st = new StringTokenizer(hostList, ";");
205 while(st.hasMoreTokens()) {
206 String host = st.nextToken();
207 synchronized(this) {
208 // see if we already have a list in the map for this host
209 if(_hostMap.containsKey(host)) {
210 // we do, so add to it
211 List list = (List) _hostMap.get(host);
212 list.add(dhQueue);
213 }
214 else {
215 // we don't, so create a list and put it in the map
216 LinkedList list = new LinkedList();
217 list.add(dhQueue);
218 _hostMap.put(host, list);
219 }
220 }
221 }
222 _logger.write(toString(), Logger.SYSMSG, "registered DataHandler for hosts: "+hostList);
223 }
224 // always add host to our complete host list
225 synchronized(this) {
226 _allHostsList.add(dhQueue);
227 }
228 }
229
230 /**
231 * Deregister a DataHandler. The DataHandler should give a reference
232 * to the Queue it's using, and the *same* hostList it gave when it
233 * register. It is imperative that the hostList is the same, otherwise
234 * there will be all sorts of problems with lists getting out of sync.
235 *
236 * NB: Possible future addition would be recording of hostList's.
237 *
238 * @param dhQueue a Queue being used by the DataHandler that is deregistering
239 * @param hostList a semi-colon seperated list of hosts
240 */
241 public void deregister(Queue dhQueue, String hostList) {
242 // go through the list of hosts
243 if(hostList.equals("")) {
244 synchronized(this) {
245 _allHostDataList.remove(dhQueue);
246 }
247 _logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for all hosts");
248 }
249 else {
250 StringTokenizer st = new StringTokenizer(hostList, ";");
251 while(st.hasMoreTokens()) {
252 String host = st.nextToken();
253 synchronized(this) {
254 // this should in reality always be true, but best check
255 if(_hostMap.containsKey(host)) {
256 // get the list and remove the host in question
257 LinkedList list = (LinkedList) _hostMap.get(host);
258 list.remove(dhQueue);
259 // if the list is now empty, we might as well remove it
260 if(list.size()==0) {
261 _hostMap.remove(host);
262 }
263 }
264 }
265 }
266 _logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for hosts: "+hostList);
267 }
268 // always remove host from our complete host list
269 synchronized(this) {
270 _allHostsList.remove(dhQueue);
271 }
272 }
273
274 /**
275 * Overrides the {@link java.lang.Object#toString() Object.toString()}
276 * method to provide clean logging (every class should have this).
277 *
278 * This uses the uk.org.iscream.cms.util.NameFormat class
279 * to format the toString()
280 *
281 * @return the name of this class and its CVS revision
282 */
283 public String toString() {
284 return FormatName.getName(
285 _name,
286 getClass().getName(),
287 REVISION);
288 }
289
290 //---PRIVATE METHODS---
291
292 //---ACCESSOR/MUTATOR METHODS---
293
294 /**
295 * Accessor to return a reference to the Queue object. This
296 * is needed so the ClientInterfaceServant can get add data
297 * easily.
298 *
299 * @return a reference to our Queue object.
300 */
301 public Queue getQueue() {
302 return _queue;
303 }
304
305 //---ATTRIBUTES---
306
307 /**
308 * This is the friendly identifier of the
309 * component this class is running in.
310 * eg, a Filter may be called "filter1",
311 * If this class does not have an owning
312 * component, a name from the configuration
313 * can be placed here. This name could also
314 * be changed to null for utility classes.
315 */
316 private String _name = ClientInterfaceMain.NAME;
317
318 /**
319 * This holds a reference to the
320 * system logger that is being used.
321 */
322 private Logger _logger = ReferenceManager.getInstance().getLogger();
323
324 /**
325 * A reference to the Queue we're using.
326 */
327 private Queue _queue;
328
329 /**
330 * A HashMap to store lists of Queue's (in the DataHandlers)
331 * in a way that can be easily accessed when data comes in.
332 */
333 private HashMap _hostMap;
334
335 /**
336 * A list specifically for a Queue's associated with DataHandlers
337 * that want all host information.
338 */
339 private LinkedList _allHostDataList;
340
341 /**
342 * A list of all hosts.
343 */
344 private LinkedList _allHostsList;
345
346 /**
347 * A reference to the XMLCache in use
348 */
349 private XMLCache _xmlCache = XMLCache.getInstance();
350
351 //---STATIC ATTRIBUTES---
352
353 }