ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/cms/source/server/uk/org/iscream/cms/server/clientinterface/PacketSorter.java
(Generate patch)

Comparing projects/cms/source/server/uk/org/iscream/cms/server/clientinterface/PacketSorter.java (file contents):
Revision 1.2 by tdb, Tue Jan 23 17:22:01 2001 UTC vs.
Revision 1.18 by tdb, Tue May 29 17:02:34 2001 UTC

# Line 1 | Line 1
1   //---PACKAGE DECLARATION---
2 < package uk.ac.ukc.iscream.clientinterface;
2 > package uk.org.iscream.cms.server.clientinterface;
3  
4   //---IMPORTS---
5 < import uk.ac.ukc.iscream.util.*;
6 < import uk.ac.ukc.iscream.componentmanager.*;
7 < import uk.ac.ukc.iscream.core.*;
5 > import uk.org.iscream.cms.server.util.*;
6 > import uk.org.iscream.cms.server.componentmanager.*;
7 > import uk.org.iscream.cms.server.core.*;
8   import java.util.*;
9  
10   /**
11   * Receives data from the incoming CORBA servant, places
12   * it in a Queue, and then arranges distribution to the
13 < * ClientHandlers.
13 > * DataHandlers.
14 > * Has extra functionality to send data to DataHandlers
15 > * on a per-host basis - ie. the Client can request which
16 > * hosts it would like to listen for.
17   *
18   * @author  $Author$
19   * @version $Id$
# Line 32 | Line 35 | class PacketSorter extends Thread {
35       * Creates a new PacketSorter.
36       */
37      public PacketSorter() {
38 <        _queue = new Queue();
38 >        // set the Thread name
39 >        setName("clientinterface.PacketSorter");
40 >        
41 >        ConfigurationProxy cp = ConfigurationProxy.getInstance();
42 >        String configName = "ClientInterface";
43 >        
44 >        // see if this Queue needs a size limit
45 >        try {
46 >            int queueSizeLimit = Integer.parseInt(cp.getProperty(configName, "Queue.SizeLimit"));
47 >            String queueRemoveAlgorithm = cp.getProperty(configName, "Queue.RemoveAlgorithm");
48 >            int algorithm = StringUtils.getStringPos(queueRemoveAlgorithm, Queue.algorithms);
49 >            if(algorithm != -1) {
50 >                _logger.write(toString(), Logger.DEBUG, "Starting Queue with size limit of "+queueSizeLimit+", using remove algorithm "+queueRemoveAlgorithm);
51 >                // we have valid values, so lets start it.
52 >                _queue = new Queue(queueSizeLimit, algorithm);
53 >            }
54 >            else {
55 >                _logger.write(toString(), Logger.WARNING, "Bad Queue Algorithm configuration, not known: "+queueRemoveAlgorithm);
56 >                // just don't activate a limit
57 >                _queue = new Queue();
58 >            }
59 >            
60 >        } catch (PropertyNotFoundException e) {
61 >            _logger.write(toString(), Logger.DEBUG, "Optional config not set: "+e);
62 >            // just don't activate a limit
63 >            _queue = new Queue();
64 >        } catch (NumberFormatException e) {
65 >            _logger.write(toString(), Logger.WARNING, "Bad Queue SizeLimit configuration: "+e);
66 >            // just don't activate a limit
67 >            _queue = new Queue();
68 >        }
69 >        
70 >        // startup a monitor on this queue
71 >        try {
72 >            // try to get the interval, if this fails, we won't start up the monitor
73 >            int queueMonitorInterval = Integer.parseInt(cp.getProperty(configName, "Queue.MonitorInterval"));
74 >            String queueName = _name + " PacketSorterQueue";
75 >            _queue.startMonitor(queueMonitorInterval*1000, queueName);
76 >        } catch (PropertyNotFoundException e) {
77 >            _logger.write(toString(), Logger.WARNING, "failed to find queue monitor config, disabling. " + e);
78 >        }
79 >        
80          _hostMap = new HashMap();
81 +        _allHostDataList = new LinkedList();
82          _allHostsList = new LinkedList();
83          _logger.write(toString(), Logger.SYSINIT, "created");
84      }
85      
86   //---PUBLIC METHODS---
87  
88 +    /**
89 +     * Method to start the PacketSorter running. This method will
90 +     * loop forever processing and sending data.
91 +     */
92      public void run() {
93 +        XMLPacketMaker xmlPacketMaker = new XMLPacketMaker();
94          int qID = _queue.getQueue();
95          while(true) {
96 +            // attempt to get some data from the Queue
97              String xml = "";
98              try {
99                  xml = (String) _queue.get(qID);
# Line 51 | Line 102 | class PacketSorter extends Thread {
102                  _logger.write(toString(), Logger.ERROR, "Queue failure: "+e);
103              }
104              
105 <            // look at host map
105 >            XMLPacket packet = null;
106              
107 <            // should be a neater way to do this
108 <            XMLPacket packet = new XMLPacketMaker(xml).createXMLPacket();
109 <            String host = packet.getParam("packet.attributes.machine_name");
110 <            LinkedList list = (LinkedList) _hostMap.get(host);
111 <            
112 <            Iterator i = list.iterator();
62 <            while(i.hasNext()) {
63 <                ((Queue) i.next()).add(xml);
107 >            try {
108 >                packet = xmlPacketMaker.createXMLPacket(xml);
109 >            } catch(InvalidXMLException e) {
110 >                _logger.write(toString(), Logger.ERROR, "Invalid XML: "+e);
111 >                // skip the rest of this loop iteration
112 >                continue;
113              }
114              
115 <            //  look at all hosts
116 <            
117 <            Iterator j = _allHostsList.iterator();
118 <            while(j.hasNext()) {
119 <                ((Queue) j.next()).add(xml);
115 >            String packetType = packet.getParam("packet.attributes.type");
116 >            // check if we need to send it regardless
117 >            if(packetType.equals("data") || packetType.equals("heartbeat")) {
118 >                String host = packet.getParam("packet.attributes.machine_name");
119 >                
120 >                // look in the hostMap to see if anyone wants this data
121 >                synchronized(this) {
122 >                    if(_hostMap.containsKey(host)) {
123 >                        LinkedList list = (LinkedList) _hostMap.get(host);
124 >                        Iterator i = list.iterator();
125 >                        // push the data to the listening Handler's queue
126 >                        while(i.hasNext()) {
127 >                            ((Queue) i.next()).add(xml);
128 >                        }
129 >                    }
130 >                }
131 >                
132 >                // any handler in this list wants all packets, so send
133 >                // it on to them regardless
134 >                synchronized(this) {
135 >                    Iterator i = _allHostDataList.iterator();
136 >                    while(i.hasNext()) {
137 >                        ((Queue) i.next()).add(xml);
138 >                    }
139 >                }
140              }
141 +            else {
142 +                // always send this packet to all hosts, because it's
143 +                // "extra" data, not host data
144 +                synchronized(this) {
145 +                    Iterator i = _allHostsList.iterator();
146 +                    while(i.hasNext()) {
147 +                        ((Queue) i.next()).add(xml);
148 +                    }
149 +                }
150 +            }
151          }
152      }
153      
154 <    // MUST DEAL WITH hostList="" implying "all hosts"
155 <    
154 >    /**
155 >     * Register a DataHandler in the system. This method
156 >     * actually takes a reference to a Queue, which should be
157 >     * a Queue that the DataHandler is making use of.
158 >     * It also takes a hostList, this being a semi-colon
159 >     * seperated list of hosts that the Client the DataHandler
160 >     * is serving has requested. If this list is simply an empty
161 >     * String, it is assumed the Client wants to listen to all
162 >     * host information.
163 >     *
164 >     * @param dhQueue a Queue being used by the DataHandler that is registering
165 >     * @param hostList a semi-colon seperated list of hosts
166 >     */
167      public void register(Queue dhQueue, String hostList) {
168 +        // check to see if we want all hosts
169          if(hostList.equals("")) {
170 <            _allHostsList.add(dhQueue);
170 >            synchronized(this) {
171 >                _allHostDataList.add(dhQueue);
172 >            }
173              _logger.write(toString(), Logger.SYSMSG, "registered DataHandler for all hosts");
174          }
175          else {
176 +            // go through the list of hosts
177              StringTokenizer st = new StringTokenizer(hostList, ";");
178              while(st.hasMoreTokens()) {
179                  String host = st.nextToken();
180 <                if(_hostMap.containsKey(host)) {
181 <                    LinkedList list = (LinkedList) _hostMap.get(host);
182 <                    list.add(dhQueue);
180 >                synchronized(this) {
181 >                    // see if we already have a list in the map for this host
182 >                    if(_hostMap.containsKey(host)) {
183 >                        // we do, so add to it
184 >                        List list = (List) _hostMap.get(host);
185 >                        list.add(dhQueue);
186 >                    }
187 >                    else {
188 >                        // we don't, so create a list and put it in the map
189 >                        LinkedList list = new LinkedList();
190 >                        list.add(dhQueue);
191 >                        _hostMap.put(host, list);
192 >                    }
193                  }
90                else {
91                    LinkedList list = new LinkedList();
92                    list.add(dhQueue);
93                    _hostMap.put(host, list);
94                }
194              }
195              _logger.write(toString(), Logger.SYSMSG, "registered DataHandler for hosts: "+hostList);
196          }
197 +        // always add host to our complete host list
198 +        synchronized(this) {
199 +            _allHostsList.add(dhQueue);
200 +        }
201      }
202      
203 +    /**
204 +     * Deregister a DataHandler. The DataHandler should give a reference
205 +     * to the Queue it's using, and the *same* hostList it gave when it
206 +     * register. It is imperative that the hostList is the same, otherwise
207 +     * there will be all sorts of problems with lists getting out of sync.
208 +     *
209 +     * NB: Possible future addition would be recording of hostList's.
210 +     *
211 +     * @param dhQueue a Queue being used by the DataHandler that is deregistering
212 +     * @param hostList a semi-colon seperated list of hosts
213 +     */
214      public void deregister(Queue dhQueue, String hostList) {
215 +        // go through the list of hosts
216          if(hostList.equals("")) {
217 <            _allHostsList.remove(dhQueue);
217 >            synchronized(this) {
218 >                _allHostDataList.remove(dhQueue);
219 >            }
220              _logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for all hosts");
221          }
222          else {
223              StringTokenizer st = new StringTokenizer(hostList, ";");
224              while(st.hasMoreTokens()) {
225                  String host = st.nextToken();
226 <                if(_hostMap.containsKey(host)) {
227 <                    LinkedList list = (LinkedList) _hostMap.get(host);
228 <                    list.remove(dhQueue);
229 <                    if(list.size()==0) {
230 <                        _hostMap.remove(host);
226 >                synchronized(this) {
227 >                    // this should in reality always be true, but best check
228 >                    if(_hostMap.containsKey(host)) {
229 >                        // get the list and remove the host in question
230 >                        LinkedList list = (LinkedList) _hostMap.get(host);
231 >                        list.remove(dhQueue);
232 >                        // if the list is now empty, we might as well remove it
233 >                        if(list.size()==0) {
234 >                            _hostMap.remove(host);
235 >                        }
236                      }
237                  }
238              }
239              _logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for hosts: "+hostList);
240          }
241 +        // always remove host from our complete host list
242 +        synchronized(this) {
243 +            _allHostsList.remove(dhQueue);
244 +        }
245      }
246      
247      /**
248       * Overrides the {@link java.lang.Object#toString() Object.toString()}
249       * method to provide clean logging (every class should have this).
250       *
251 <     * This uses the uk.ac.ukc.iscream.util.NameFormat class
251 >     * This uses the uk.org.iscream.cms.server.util.NameFormat class
252       * to format the toString()
253       *
254       * @return the name of this class and its CVS revision
# Line 137 | Line 263 | class PacketSorter extends Thread {
263   //---PRIVATE METHODS---
264  
265   //---ACCESSOR/MUTATOR METHODS---
266 <
266 >    
267 >    /**
268 >     * Accessor to return a reference to the Queue object. This
269 >     * is needed so the ClientInterfaceServant can get add data
270 >     * easily.
271 >     *
272 >     * @return a reference to our Queue object.
273 >     */
274      public Queue getQueue() {
275          return _queue;
276      }
# Line 161 | Line 294 | class PacketSorter extends Thread {
294       */
295      private Logger _logger = ReferenceManager.getInstance().getLogger();
296      
297 +    /**
298 +     * A reference to the Queue we're using.
299 +     */
300      private Queue _queue;
301      
302 +    /**
303 +     * A HashMap to store lists of Queue's (in the DataHandlers)
304 +     * in a way that can be easily accessed when data comes in.
305 +     */
306      private HashMap _hostMap;
307 +    
308 +    /**
309 +     * A list specifically for a Queue's associated with DataHandlers
310 +     * that want all host information.
311 +     */
312 +    private LinkedList _allHostDataList;
313 +    
314 +    /**
315 +     * A list of all hosts.
316 +     */
317      private LinkedList _allHostsList;
318      
319   //---STATIC ATTRIBUTES---

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines