ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/cms/source/server/uk/org/iscream/cms/server/clientinterface/PacketSorter.java
(Generate patch)

Comparing projects/cms/source/server/uk/org/iscream/cms/server/clientinterface/PacketSorter.java (file contents):
Revision 1.2 by tdb, Tue Jan 23 17:22:01 2001 UTC vs.
Revision 1.19 by tdb, Sat May 18 18:16:01 2002 UTC

# Line 1 | Line 1
1 + /*
2 + * i-scream central monitoring system
3 + * Copyright (C) 2000-2002 i-scream
4 + *
5 + * This program is free software; you can redistribute it and/or
6 + * modify it under the terms of the GNU General Public License
7 + * as published by the Free Software Foundation; either version 2
8 + * of the License, or (at your option) any later version.
9 + *
10 + * This program is distributed in the hope that it will be useful,
11 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 + * GNU General Public License for more details.
14 + *
15 + * You should have received a copy of the GNU General Public License
16 + * along with this program; if not, write to the Free Software
17 + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
18 + */
19 +
20   //---PACKAGE DECLARATION---
21 < package uk.ac.ukc.iscream.clientinterface;
21 > package uk.org.iscream.cms.server.clientinterface;
22  
23   //---IMPORTS---
24 < import uk.ac.ukc.iscream.util.*;
25 < import uk.ac.ukc.iscream.componentmanager.*;
26 < import uk.ac.ukc.iscream.core.*;
24 > import uk.org.iscream.cms.server.util.*;
25 > import uk.org.iscream.cms.server.componentmanager.*;
26 > import uk.org.iscream.cms.server.core.*;
27   import java.util.*;
28  
29   /**
30   * Receives data from the incoming CORBA servant, places
31   * it in a Queue, and then arranges distribution to the
32 < * ClientHandlers.
32 > * DataHandlers.
33 > * Has extra functionality to send data to DataHandlers
34 > * on a per-host basis - ie. the Client can request which
35 > * hosts it would like to listen for.
36   *
37   * @author  $Author$
38   * @version $Id$
# Line 32 | Line 54 | class PacketSorter extends Thread {
54       * Creates a new PacketSorter.
55       */
56      public PacketSorter() {
57 <        _queue = new Queue();
57 >        // set the Thread name
58 >        setName("clientinterface.PacketSorter");
59 >        
60 >        ConfigurationProxy cp = ConfigurationProxy.getInstance();
61 >        String configName = "ClientInterface";
62 >        
63 >        // see if this Queue needs a size limit
64 >        try {
65 >            int queueSizeLimit = Integer.parseInt(cp.getProperty(configName, "Queue.SizeLimit"));
66 >            String queueRemoveAlgorithm = cp.getProperty(configName, "Queue.RemoveAlgorithm");
67 >            int algorithm = StringUtils.getStringPos(queueRemoveAlgorithm, Queue.algorithms);
68 >            if(algorithm != -1) {
69 >                _logger.write(toString(), Logger.DEBUG, "Starting Queue with size limit of "+queueSizeLimit+", using remove algorithm "+queueRemoveAlgorithm);
70 >                // we have valid values, so lets start it.
71 >                _queue = new Queue(queueSizeLimit, algorithm);
72 >            }
73 >            else {
74 >                _logger.write(toString(), Logger.WARNING, "Bad Queue Algorithm configuration, not known: "+queueRemoveAlgorithm);
75 >                // just don't activate a limit
76 >                _queue = new Queue();
77 >            }
78 >            
79 >        } catch (PropertyNotFoundException e) {
80 >            _logger.write(toString(), Logger.DEBUG, "Optional config not set: "+e);
81 >            // just don't activate a limit
82 >            _queue = new Queue();
83 >        } catch (NumberFormatException e) {
84 >            _logger.write(toString(), Logger.WARNING, "Bad Queue SizeLimit configuration: "+e);
85 >            // just don't activate a limit
86 >            _queue = new Queue();
87 >        }
88 >        
89 >        // startup a monitor on this queue
90 >        try {
91 >            // try to get the interval, if this fails, we won't start up the monitor
92 >            int queueMonitorInterval = Integer.parseInt(cp.getProperty(configName, "Queue.MonitorInterval"));
93 >            String queueName = _name + " PacketSorterQueue";
94 >            _queue.startMonitor(queueMonitorInterval*1000, queueName);
95 >        } catch (PropertyNotFoundException e) {
96 >            _logger.write(toString(), Logger.WARNING, "failed to find queue monitor config, disabling. " + e);
97 >        }
98 >        
99          _hostMap = new HashMap();
100 +        _allHostDataList = new LinkedList();
101          _allHostsList = new LinkedList();
102          _logger.write(toString(), Logger.SYSINIT, "created");
103      }
104      
105   //---PUBLIC METHODS---
106  
107 +    /**
108 +     * Method to start the PacketSorter running. This method will
109 +     * loop forever processing and sending data.
110 +     */
111      public void run() {
112 +        XMLPacketMaker xmlPacketMaker = new XMLPacketMaker();
113          int qID = _queue.getQueue();
114          while(true) {
115 +            // attempt to get some data from the Queue
116              String xml = "";
117              try {
118                  xml = (String) _queue.get(qID);
# Line 51 | Line 121 | class PacketSorter extends Thread {
121                  _logger.write(toString(), Logger.ERROR, "Queue failure: "+e);
122              }
123              
124 <            // look at host map
124 >            XMLPacket packet = null;
125              
126 <            // should be a neater way to do this
127 <            XMLPacket packet = new XMLPacketMaker(xml).createXMLPacket();
128 <            String host = packet.getParam("packet.attributes.machine_name");
129 <            LinkedList list = (LinkedList) _hostMap.get(host);
130 <            
131 <            Iterator i = list.iterator();
62 <            while(i.hasNext()) {
63 <                ((Queue) i.next()).add(xml);
126 >            try {
127 >                packet = xmlPacketMaker.createXMLPacket(xml);
128 >            } catch(InvalidXMLException e) {
129 >                _logger.write(toString(), Logger.ERROR, "Invalid XML: "+e);
130 >                // skip the rest of this loop iteration
131 >                continue;
132              }
133              
134 <            //  look at all hosts
135 <            
136 <            Iterator j = _allHostsList.iterator();
137 <            while(j.hasNext()) {
138 <                ((Queue) j.next()).add(xml);
134 >            String packetType = packet.getParam("packet.attributes.type");
135 >            // check if we need to send it regardless
136 >            if(packetType.equals("data") || packetType.equals("heartbeat")) {
137 >                String host = packet.getParam("packet.attributes.machine_name");
138 >                
139 >                // look in the hostMap to see if anyone wants this data
140 >                synchronized(this) {
141 >                    if(_hostMap.containsKey(host)) {
142 >                        LinkedList list = (LinkedList) _hostMap.get(host);
143 >                        Iterator i = list.iterator();
144 >                        // push the data to the listening Handler's queue
145 >                        while(i.hasNext()) {
146 >                            ((Queue) i.next()).add(xml);
147 >                        }
148 >                    }
149 >                }
150 >                
151 >                // any handler in this list wants all packets, so send
152 >                // it on to them regardless
153 >                synchronized(this) {
154 >                    Iterator i = _allHostDataList.iterator();
155 >                    while(i.hasNext()) {
156 >                        ((Queue) i.next()).add(xml);
157 >                    }
158 >                }
159              }
160 +            else {
161 +                // always send this packet to all hosts, because it's
162 +                // "extra" data, not host data
163 +                synchronized(this) {
164 +                    Iterator i = _allHostsList.iterator();
165 +                    while(i.hasNext()) {
166 +                        ((Queue) i.next()).add(xml);
167 +                    }
168 +                }
169 +            }
170          }
171      }
172      
173 <    // MUST DEAL WITH hostList="" implying "all hosts"
174 <    
173 >    /**
174 >     * Register a DataHandler in the system. This method
175 >     * actually takes a reference to a Queue, which should be
176 >     * a Queue that the DataHandler is making use of.
177 >     * It also takes a hostList, this being a semi-colon
178 >     * seperated list of hosts that the Client the DataHandler
179 >     * is serving has requested. If this list is simply an empty
180 >     * String, it is assumed the Client wants to listen to all
181 >     * host information.
182 >     *
183 >     * @param dhQueue a Queue being used by the DataHandler that is registering
184 >     * @param hostList a semi-colon seperated list of hosts
185 >     */
186      public void register(Queue dhQueue, String hostList) {
187 +        // check to see if we want all hosts
188          if(hostList.equals("")) {
189 <            _allHostsList.add(dhQueue);
189 >            synchronized(this) {
190 >                _allHostDataList.add(dhQueue);
191 >            }
192              _logger.write(toString(), Logger.SYSMSG, "registered DataHandler for all hosts");
193          }
194          else {
195 +            // go through the list of hosts
196              StringTokenizer st = new StringTokenizer(hostList, ";");
197              while(st.hasMoreTokens()) {
198                  String host = st.nextToken();
199 <                if(_hostMap.containsKey(host)) {
200 <                    LinkedList list = (LinkedList) _hostMap.get(host);
201 <                    list.add(dhQueue);
199 >                synchronized(this) {
200 >                    // see if we already have a list in the map for this host
201 >                    if(_hostMap.containsKey(host)) {
202 >                        // we do, so add to it
203 >                        List list = (List) _hostMap.get(host);
204 >                        list.add(dhQueue);
205 >                    }
206 >                    else {
207 >                        // we don't, so create a list and put it in the map
208 >                        LinkedList list = new LinkedList();
209 >                        list.add(dhQueue);
210 >                        _hostMap.put(host, list);
211 >                    }
212                  }
90                else {
91                    LinkedList list = new LinkedList();
92                    list.add(dhQueue);
93                    _hostMap.put(host, list);
94                }
213              }
214              _logger.write(toString(), Logger.SYSMSG, "registered DataHandler for hosts: "+hostList);
215          }
216 +        // always add host to our complete host list
217 +        synchronized(this) {
218 +            _allHostsList.add(dhQueue);
219 +        }
220      }
221      
222 +    /**
223 +     * Deregister a DataHandler. The DataHandler should give a reference
224 +     * to the Queue it's using, and the *same* hostList it gave when it
225 +     * register. It is imperative that the hostList is the same, otherwise
226 +     * there will be all sorts of problems with lists getting out of sync.
227 +     *
228 +     * NB: Possible future addition would be recording of hostList's.
229 +     *
230 +     * @param dhQueue a Queue being used by the DataHandler that is deregistering
231 +     * @param hostList a semi-colon seperated list of hosts
232 +     */
233      public void deregister(Queue dhQueue, String hostList) {
234 +        // go through the list of hosts
235          if(hostList.equals("")) {
236 <            _allHostsList.remove(dhQueue);
236 >            synchronized(this) {
237 >                _allHostDataList.remove(dhQueue);
238 >            }
239              _logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for all hosts");
240          }
241          else {
242              StringTokenizer st = new StringTokenizer(hostList, ";");
243              while(st.hasMoreTokens()) {
244                  String host = st.nextToken();
245 <                if(_hostMap.containsKey(host)) {
246 <                    LinkedList list = (LinkedList) _hostMap.get(host);
247 <                    list.remove(dhQueue);
248 <                    if(list.size()==0) {
249 <                        _hostMap.remove(host);
245 >                synchronized(this) {
246 >                    // this should in reality always be true, but best check
247 >                    if(_hostMap.containsKey(host)) {
248 >                        // get the list and remove the host in question
249 >                        LinkedList list = (LinkedList) _hostMap.get(host);
250 >                        list.remove(dhQueue);
251 >                        // if the list is now empty, we might as well remove it
252 >                        if(list.size()==0) {
253 >                            _hostMap.remove(host);
254 >                        }
255                      }
256                  }
257              }
258              _logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for hosts: "+hostList);
259          }
260 +        // always remove host from our complete host list
261 +        synchronized(this) {
262 +            _allHostsList.remove(dhQueue);
263 +        }
264      }
265      
266      /**
267       * Overrides the {@link java.lang.Object#toString() Object.toString()}
268       * method to provide clean logging (every class should have this).
269       *
270 <     * This uses the uk.ac.ukc.iscream.util.NameFormat class
270 >     * This uses the uk.org.iscream.cms.server.util.NameFormat class
271       * to format the toString()
272       *
273       * @return the name of this class and its CVS revision
# Line 137 | Line 282 | class PacketSorter extends Thread {
282   //---PRIVATE METHODS---
283  
284   //---ACCESSOR/MUTATOR METHODS---
285 <
285 >    
286 >    /**
287 >     * Accessor to return a reference to the Queue object. This
288 >     * is needed so the ClientInterfaceServant can get add data
289 >     * easily.
290 >     *
291 >     * @return a reference to our Queue object.
292 >     */
293      public Queue getQueue() {
294          return _queue;
295      }
# Line 161 | Line 313 | class PacketSorter extends Thread {
313       */
314      private Logger _logger = ReferenceManager.getInstance().getLogger();
315      
316 +    /**
317 +     * A reference to the Queue we're using.
318 +     */
319      private Queue _queue;
320      
321 +    /**
322 +     * A HashMap to store lists of Queue's (in the DataHandlers)
323 +     * in a way that can be easily accessed when data comes in.
324 +     */
325      private HashMap _hostMap;
326 +    
327 +    /**
328 +     * A list specifically for a Queue's associated with DataHandlers
329 +     * that want all host information.
330 +     */
331 +    private LinkedList _allHostDataList;
332 +    
333 +    /**
334 +     * A list of all hosts.
335 +     */
336      private LinkedList _allHostsList;
337      
338   //---STATIC ATTRIBUTES---

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines