ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/cms/source/server/uk/org/iscream/cms/server/clientinterface/PacketSorter.java
(Generate patch)

Comparing projects/cms/source/server/uk/org/iscream/cms/server/clientinterface/PacketSorter.java (file contents):
Revision 1.7 by tdb, Mon Feb 26 23:41:35 2001 UTC vs.
Revision 1.24 by tdb, Sun Aug 1 10:40:48 2004 UTC

# Line 1 | Line 1
1 + /*
2 + * i-scream central monitoring system
3 + * http://www.i-scream.org
4 + * Copyright (C) 2000-2002 i-scream
5 + *
6 + * This program is free software; you can redistribute it and/or
7 + * modify it under the terms of the GNU General Public License
8 + * as published by the Free Software Foundation; either version 2
9 + * of the License, or (at your option) any later version.
10 + *
11 + * This program is distributed in the hope that it will be useful,
12 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 + * GNU General Public License for more details.
15 + *
16 + * You should have received a copy of the GNU General Public License
17 + * along with this program; if not, write to the Free Software
18 + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
19 + */
20 +
21   //---PACKAGE DECLARATION---
22 < package uk.ac.ukc.iscream.clientinterface;
22 > package uk.org.iscream.cms.server.clientinterface;
23  
24   //---IMPORTS---
25 < import uk.ac.ukc.iscream.util.*;
26 < import uk.ac.ukc.iscream.componentmanager.*;
27 < import uk.ac.ukc.iscream.core.*;
25 > import uk.org.iscream.cms.util.*;
26 > import uk.org.iscream.cms.server.componentmanager.*;
27 > import uk.org.iscream.cms.server.core.*;
28   import java.util.*;
29  
30   /**
# Line 33 | Line 53 | class PacketSorter extends Thread {
53  
54      /**
55       * Creates a new PacketSorter.
36     *
37     * @param queueMonitorInterval The interval at which to monitor the Queue
56       */
57 <    public PacketSorter(int queueMonitorInterval) {
58 <        _queue = new Queue();
59 <        // startup a monitor on this queue, every minute
60 <        String queueName = _name + " PacketSorterQueue";
61 <        _queue.startMonitor(queueMonitorInterval*1000, queueName);
62 <        _hostMap = Collections.synchronizedMap(new HashMap());
63 <        _allHostsList = Collections.synchronizedList(new LinkedList());
57 >    public PacketSorter() {
58 >        // set the Thread name
59 >        setName("clientinterface.PacketSorter");
60 >        
61 >        ConfigurationProxy cp = ConfigurationProxy.getInstance();
62 >        String configName = "ClientInterface";
63 >        
64 >        // see if this Queue needs a size limit
65 >        try {
66 >            int queueSizeLimit = Integer.parseInt(cp.getProperty(configName, "Queue.SizeLimit"));
67 >            String queueRemoveAlgorithm = cp.getProperty(configName, "Queue.RemoveAlgorithm");
68 >            int algorithm = StringUtils.getStringPos(queueRemoveAlgorithm, Queue.algorithms);
69 >            if(algorithm != -1) {
70 >                _logger.write(toString(), Logger.DEBUG, "Starting Queue with size limit of "+queueSizeLimit+", using remove algorithm "+queueRemoveAlgorithm);
71 >                // we have valid values, so lets start it.
72 >                _queue = new Queue(queueSizeLimit, algorithm);
73 >            }
74 >            else {
75 >                _logger.write(toString(), Logger.WARNING, "Bad Queue Algorithm configuration, not known: "+queueRemoveAlgorithm);
76 >                // just don't activate a limit
77 >                _queue = new Queue();
78 >            }
79 >            
80 >        } catch (PropertyNotFoundException e) {
81 >            _logger.write(toString(), Logger.DEBUG, "Optional config not set: "+e);
82 >            // just don't activate a limit
83 >            _queue = new Queue();
84 >        } catch (NumberFormatException e) {
85 >            _logger.write(toString(), Logger.WARNING, "Bad Queue SizeLimit configuration: "+e);
86 >            // just don't activate a limit
87 >            _queue = new Queue();
88 >        }
89 >        
90 >        // startup a monitor on this queue
91 >        try {
92 >            // try to get the interval, if this fails, we won't start up the monitor
93 >            int queueMonitorInterval = Integer.parseInt(cp.getProperty(configName, "Queue.MonitorInterval"));
94 >            String queueName = _name + " PacketSorterQueue";
95 >            _queue.startMonitor(queueMonitorInterval*1000, queueName);
96 >        } catch (PropertyNotFoundException e) {
97 >            _logger.write(toString(), Logger.WARNING, "failed to find queue monitor config, disabling. " + e);
98 >        }
99 >        
100 >        _hostMap = new HashMap();
101 >        _allHostDataList = new LinkedList();
102 >        _allHostsList = new LinkedList();
103          _logger.write(toString(), Logger.SYSINIT, "created");
104      }
105      
# Line 64 | Line 121 | class PacketSorter extends Thread {
121                  _logger.write(toString(), Logger.ERROR, "Queue failure: "+e);
122              }
123              
124 <            // find out which host this packet is actually for
68 <            XMLPacketMaker xmlPacketMaker = new XMLPacketMaker(xml);
69 <            XMLPacket packet = xmlPacketMaker.createXMLPacket();
70 <            String host = packet.getParam("packet.attributes.machine_name");
124 >            XMLPacket packet = null;
125              
126 <            // look in the hostMap to see if anyone wants this data
127 <            if(_hostMap.containsKey(host)) {
128 <                LinkedList list = (LinkedList) _hostMap.get(host);
129 <                Iterator i = list.iterator();
130 <                // push the data to the listening Handler's queue
131 <                while(i.hasNext()) {
78 <                    ((Queue) i.next()).add(xml);
79 <                }
126 >            try {
127 >                packet = _xmlCache.getXMLPacket(xml);
128 >            } catch(InvalidXMLException e) {
129 >                _logger.write(toString(), Logger.ERROR, "Invalid XML: "+e);
130 >                // skip the rest of this loop iteration
131 >                continue;
132              }
133              
134 <            // any handler in this list wants all packets, so send
135 <            // it on to them regardless
136 <            Iterator j = _allHostsList.iterator();
137 <            while(j.hasNext()) {
138 <                ((Queue) j.next()).add(xml);
134 >            String packetType = packet.getParam("packet.attributes.type");
135 >            // check if we need to send it regardless
136 >            if(packetType.equals("data")) {
137 >                String host = packet.getParam("packet.attributes.machine_name");
138 >                
139 >                // look in the hostMap to see if anyone wants this data
140 >                synchronized(this) {
141 >                    if(_hostMap.containsKey(host)) {
142 >                        LinkedList list = (LinkedList) _hostMap.get(host);
143 >                        Iterator i = list.iterator();
144 >                        // push the data to the listening Handler's queue
145 >                        while(i.hasNext()) {
146 >                            ((Queue) i.next()).add(xml);
147 >                        }
148 >                    }
149 >                }
150 >                
151 >                // any handler in this list wants all packets, so send
152 >                // it on to them regardless
153 >                synchronized(this) {
154 >                    Iterator i = _allHostDataList.iterator();
155 >                    while(i.hasNext()) {
156 >                        ((Queue) i.next()).add(xml);
157 >                    }
158 >                }
159              }
160 +            else {
161 +                // always send this packet to all hosts, because it's
162 +                // "extra" data, not host data
163 +                synchronized(this) {
164 +                    Iterator i = _allHostsList.iterator();
165 +                    while(i.hasNext()) {
166 +                        ((Queue) i.next()).add(xml);
167 +                    }
168 +                }
169 +            }
170          }
171      }
172      
# Line 104 | Line 186 | class PacketSorter extends Thread {
186      public void register(Queue dhQueue, String hostList) {
187          // check to see if we want all hosts
188          if(hostList.equals("")) {
189 <            _allHostsList.add(dhQueue);
189 >            synchronized(this) {
190 >                _allHostDataList.add(dhQueue);
191 >            }
192              _logger.write(toString(), Logger.SYSMSG, "registered DataHandler for all hosts");
193          }
194          else {
# Line 112 | Line 196 | class PacketSorter extends Thread {
196              StringTokenizer st = new StringTokenizer(hostList, ";");
197              while(st.hasMoreTokens()) {
198                  String host = st.nextToken();
199 <                // see if we already have a list in the map for this host
200 <                if(_hostMap.containsKey(host)) {
201 <                    // we do, so add to it
202 <                    LinkedList list = (LinkedList) _hostMap.get(host);
203 <                    list.add(dhQueue);
199 >                synchronized(this) {
200 >                    // see if we already have a list in the map for this host
201 >                    if(_hostMap.containsKey(host)) {
202 >                        // we do, so add to it
203 >                        List list = (List) _hostMap.get(host);
204 >                        list.add(dhQueue);
205 >                    }
206 >                    else {
207 >                        // we don't, so create a list and put it in the map
208 >                        LinkedList list = new LinkedList();
209 >                        list.add(dhQueue);
210 >                        _hostMap.put(host, list);
211 >                    }
212                  }
121                else {
122                    // we don't, so create a list and put it in the map
123                    List list = Collections.synchronizedList(new LinkedList());
124                    list.add(dhQueue);
125                    _hostMap.put(host, list);
126                }
213              }
214              _logger.write(toString(), Logger.SYSMSG, "registered DataHandler for hosts: "+hostList);
215          }
216 +        // always add host to our complete host list
217 +        synchronized(this) {
218 +            _allHostsList.add(dhQueue);
219 +        }
220      }
221      
222      /**
# Line 143 | Line 233 | class PacketSorter extends Thread {
233      public void deregister(Queue dhQueue, String hostList) {
234          // go through the list of hosts
235          if(hostList.equals("")) {
236 <            _allHostsList.remove(dhQueue);
236 >            synchronized(this) {
237 >                _allHostDataList.remove(dhQueue);
238 >            }
239              _logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for all hosts");
240          }
241          else {
242              StringTokenizer st = new StringTokenizer(hostList, ";");
243              while(st.hasMoreTokens()) {
244                  String host = st.nextToken();
245 <                // this should in reality always be true, but best check
246 <                if(_hostMap.containsKey(host)) {
247 <                    // get the list and remove the host in question
248 <                    LinkedList list = (LinkedList) _hostMap.get(host);
249 <                    list.remove(dhQueue);
250 <                    // if the list is now empty, we might as well remove it
251 <                    if(list.size()==0) {
252 <                        _hostMap.remove(host);
245 >                synchronized(this) {
246 >                    // this should in reality always be true, but best check
247 >                    if(_hostMap.containsKey(host)) {
248 >                        // get the list and remove the host in question
249 >                        LinkedList list = (LinkedList) _hostMap.get(host);
250 >                        list.remove(dhQueue);
251 >                        // if the list is now empty, we might as well remove it
252 >                        if(list.size()==0) {
253 >                            _hostMap.remove(host);
254 >                        }
255                      }
256                  }
257              }
258              _logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for hosts: "+hostList);
259          }
260 +        // always remove host from our complete host list
261 +        synchronized(this) {
262 +            _allHostsList.remove(dhQueue);
263 +        }
264      }
265      
266      /**
267       * Overrides the {@link java.lang.Object#toString() Object.toString()}
268       * method to provide clean logging (every class should have this).
269       *
270 <     * This uses the uk.ac.ukc.iscream.util.NameFormat class
270 >     * This uses the uk.org.iscream.cms.util.NameFormat class
271       * to format the toString()
272       *
273       * @return the name of this class and its CVS revision
# Line 224 | Line 322 | class PacketSorter extends Thread {
322       * A HashMap to store lists of Queue's (in the DataHandlers)
323       * in a way that can be easily accessed when data comes in.
324       */
325 <    private Map _hostMap;
325 >    private HashMap _hostMap;
326      
327      /**
328       * A list specifically for a Queue's associated with DataHandlers
329       * that want all host information.
330       */
331 <    private List _allHostsList;
331 >    private LinkedList _allHostDataList;
332 >    
333 >    /**
334 >     * A list of all hosts.
335 >     */
336 >    private LinkedList _allHostsList;
337 >    
338 >    /**
339 >     * A reference to the XMLCache in use
340 >     */
341 >    private XMLCache _xmlCache = XMLCache.getInstance();
342      
343   //---STATIC ATTRIBUTES---
344  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines