ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/cms/source/server/uk/org/iscream/cms/server/clientinterface/PacketSorter.java
(Generate patch)

Comparing projects/cms/source/server/uk/org/iscream/cms/server/clientinterface/PacketSorter.java (file contents):
Revision 1.3 by tdb, Tue Jan 23 18:23:03 2001 UTC vs.
Revision 1.15 by tdb, Tue Mar 13 18:37:08 2001 UTC

# Line 10 | Line 10 | import java.util.*;
10   /**
11   * Receives data from the incoming CORBA servant, places
12   * it in a Queue, and then arranges distribution to the
13 < * ClientHandlers.
13 > * DataHandlers.
14 > * Has extra functionality to send data to DataHandlers
15 > * on a per-host basis - ie. the Client can request which
16 > * hosts it would like to listen for.
17   *
18   * @author  $Author$
19   * @version $Id$
# Line 32 | Line 35 | class PacketSorter extends Thread {
35       * Creates a new PacketSorter.
36       */
37      public PacketSorter() {
38 +        // set the Thread name
39 +        setName("clientinterface.PacketSorter");
40 +        
41          _queue = new Queue();
42 +        // startup a monitor on this queue
43 +        try {
44 +            // try to get the interval, if this fails, we won't start up the monitor
45 +            ConfigurationProxy cp = ConfigurationProxy.getInstance();
46 +            int queueMonitorInterval = Integer.parseInt(cp.getProperty("ClientInterface", "Queue.MonitorInterval"));
47 +            String queueName = _name + " PacketSorterQueue";
48 +            _queue.startMonitor(queueMonitorInterval*1000, queueName);
49 +        } catch (PropertyNotFoundException e) {
50 +            _logger.write(toString(), Logger.WARNING, "failed to find queue monitor config, disabling. " + e);
51 +        }
52 +        
53          _hostMap = new HashMap();
54 +        _allHostDataList = new LinkedList();
55          _allHostsList = new LinkedList();
56          _logger.write(toString(), Logger.SYSINIT, "created");
57      }
58      
59   //---PUBLIC METHODS---
60  
61 +    /**
62 +     * Method to start the PacketSorter running. This method will
63 +     * loop forever processing and sending data.
64 +     */
65      public void run() {
66 +        XMLPacketMaker xmlPacketMaker = new XMLPacketMaker();
67          int qID = _queue.getQueue();
68          while(true) {
69 +            // attempt to get some data from the Queue
70              String xml = "";
71              try {
72                  xml = (String) _queue.get(qID);
# Line 51 | Line 75 | class PacketSorter extends Thread {
75                  _logger.write(toString(), Logger.ERROR, "Queue failure: "+e);
76              }
77              
78 <            // look at host map
78 >            XMLPacket packet = null;
79              
80 <            // should be a neater way to do this
81 <            XMLPacketMaker xmlPacketMaker = new XMLPacketMaker(xml);
82 <            XMLPacket packet = xmlPacketMaker.createXMLPacket();
83 <            String host = packet.getParam("packet.attributes.machine_name");
80 >            try {
81 >                packet = xmlPacketMaker.createXMLPacket(xml);
82 >            } catch(InvalidXMLException e) {
83 >                _logger.write(toString(), Logger.ERROR, "Invalid XML: "+e);
84 >                // skip the rest of this loop iteration
85 >                continue;
86 >            }
87              
88 <            if(_hostMap.containsKey(host)) {
89 <                LinkedList list = (LinkedList) _hostMap.get(host);
90 <                Iterator i = list.iterator();
91 <                while(i.hasNext()) {
92 <                    ((Queue) i.next()).add(xml);
88 >            String packetType = packet.getParam("packet.attributes.type");
89 >            // check if we need to send it regardless
90 >            if(packetType.equals("data") || packetType.equals("heartbeat")) {
91 >                String host = packet.getParam("packet.attributes.machine_name");
92 >                
93 >                // look in the hostMap to see if anyone wants this data
94 >                synchronized(this) {
95 >                    if(_hostMap.containsKey(host)) {
96 >                        LinkedList list = (LinkedList) _hostMap.get(host);
97 >                        Iterator i = list.iterator();
98 >                        // push the data to the listening Handler's queue
99 >                        while(i.hasNext()) {
100 >                            ((Queue) i.next()).add(xml);
101 >                        }
102 >                    }
103                  }
104 +                
105 +                // any handler in this list wants all packets, so send
106 +                // it on to them regardless
107 +                synchronized(this) {
108 +                    Iterator i = _allHostDataList.iterator();
109 +                    while(i.hasNext()) {
110 +                        ((Queue) i.next()).add(xml);
111 +                    }
112 +                }
113              }
114 <            
115 <            //  look at all hosts
116 <            
117 <            Iterator j = _allHostsList.iterator();
118 <            while(j.hasNext()) {
119 <                ((Queue) j.next()).add(xml);
114 >            else {
115 >                // always send this packet to all hosts, because it's
116 >                // "extra" data, not host data
117 >                synchronized(this) {
118 >                    Iterator i = _allHostsList.iterator();
119 >                    while(i.hasNext()) {
120 >                        ((Queue) i.next()).add(xml);
121 >                    }
122 >                }
123              }
124          }
125      }
126      
127 <    // MUST DEAL WITH hostList="" implying "all hosts"
128 <    
127 >    /**
128 >     * Register a DataHandler in the system. This method
129 >     * actually takes a reference to a Queue, which should be
130 >     * a Queue that the DataHandler is making use of.
131 >     * It also takes a hostList, this being a semi-colon
132 >     * seperated list of hosts that the Client the DataHandler
133 >     * is serving has requested. If this list is simply an empty
134 >     * String, it is assumed the Client wants to listen to all
135 >     * host information.
136 >     *
137 >     * @param dhQueue a Queue being used by the DataHandler that is registering
138 >     * @param hostList a semi-colon seperated list of hosts
139 >     */
140      public void register(Queue dhQueue, String hostList) {
141 +        // check to see if we want all hosts
142          if(hostList.equals("")) {
143 <            _allHostsList.add(dhQueue);
143 >            synchronized(this) {
144 >                _allHostDataList.add(dhQueue);
145 >            }
146              _logger.write(toString(), Logger.SYSMSG, "registered DataHandler for all hosts");
147          }
148          else {
149 +            // go through the list of hosts
150              StringTokenizer st = new StringTokenizer(hostList, ";");
151              while(st.hasMoreTokens()) {
152                  String host = st.nextToken();
153 <                if(_hostMap.containsKey(host)) {
154 <                    LinkedList list = (LinkedList) _hostMap.get(host);
155 <                    list.add(dhQueue);
153 >                synchronized(this) {
154 >                    // see if we already have a list in the map for this host
155 >                    if(_hostMap.containsKey(host)) {
156 >                        // we do, so add to it
157 >                        List list = (List) _hostMap.get(host);
158 >                        list.add(dhQueue);
159 >                    }
160 >                    else {
161 >                        // we don't, so create a list and put it in the map
162 >                        LinkedList list = new LinkedList();
163 >                        list.add(dhQueue);
164 >                        _hostMap.put(host, list);
165 >                    }
166                  }
93                else {
94                    LinkedList list = new LinkedList();
95                    list.add(dhQueue);
96                    _hostMap.put(host, list);
97                }
167              }
168              _logger.write(toString(), Logger.SYSMSG, "registered DataHandler for hosts: "+hostList);
169          }
170 +        // always add host to our complete host list
171 +        synchronized(this) {
172 +            _allHostsList.add(dhQueue);
173 +        }
174      }
175      
176 +    /**
177 +     * Deregister a DataHandler. The DataHandler should give a reference
178 +     * to the Queue it's using, and the *same* hostList it gave when it
179 +     * register. It is imperative that the hostList is the same, otherwise
180 +     * there will be all sorts of problems with lists getting out of sync.
181 +     *
182 +     * NB: Possible future addition would be recording of hostList's.
183 +     *
184 +     * @param dhQueue a Queue being used by the DataHandler that is deregistering
185 +     * @param hostList a semi-colon seperated list of hosts
186 +     */
187      public void deregister(Queue dhQueue, String hostList) {
188 +        // go through the list of hosts
189          if(hostList.equals("")) {
190 <            _allHostsList.remove(dhQueue);
190 >            synchronized(this) {
191 >                _allHostDataList.remove(dhQueue);
192 >            }
193              _logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for all hosts");
194          }
195          else {
196              StringTokenizer st = new StringTokenizer(hostList, ";");
197              while(st.hasMoreTokens()) {
198                  String host = st.nextToken();
199 <                if(_hostMap.containsKey(host)) {
200 <                    LinkedList list = (LinkedList) _hostMap.get(host);
201 <                    list.remove(dhQueue);
202 <                    if(list.size()==0) {
203 <                        _hostMap.remove(host);
199 >                synchronized(this) {
200 >                    // this should in reality always be true, but best check
201 >                    if(_hostMap.containsKey(host)) {
202 >                        // get the list and remove the host in question
203 >                        LinkedList list = (LinkedList) _hostMap.get(host);
204 >                        list.remove(dhQueue);
205 >                        // if the list is now empty, we might as well remove it
206 >                        if(list.size()==0) {
207 >                            _hostMap.remove(host);
208 >                        }
209                      }
210                  }
211              }
212              _logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for hosts: "+hostList);
213          }
214 +        // always remove host from our complete host list
215 +        synchronized(this) {
216 +            _allHostsList.remove(dhQueue);
217 +        }
218      }
219      
220      /**
# Line 140 | Line 236 | class PacketSorter extends Thread {
236   //---PRIVATE METHODS---
237  
238   //---ACCESSOR/MUTATOR METHODS---
239 <
239 >    
240 >    /**
241 >     * Accessor to return a reference to the Queue object. This
242 >     * is needed so the ClientInterfaceServant can get add data
243 >     * easily.
244 >     *
245 >     * @return a reference to our Queue object.
246 >     */
247      public Queue getQueue() {
248          return _queue;
249      }
# Line 164 | Line 267 | class PacketSorter extends Thread {
267       */
268      private Logger _logger = ReferenceManager.getInstance().getLogger();
269      
270 +    /**
271 +     * A reference to the Queue we're using.
272 +     */
273      private Queue _queue;
274      
275 +    /**
276 +     * A HashMap to store lists of Queue's (in the DataHandlers)
277 +     * in a way that can be easily accessed when data comes in.
278 +     */
279      private HashMap _hostMap;
280 +    
281 +    /**
282 +     * A list specifically for a Queue's associated with DataHandlers
283 +     * that want all host information.
284 +     */
285 +    private LinkedList _allHostDataList;
286 +    
287 +    /**
288 +     * A list of all hosts.
289 +     */
290      private LinkedList _allHostsList;
291      
292   //---STATIC ATTRIBUTES---

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines