ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/cms/source/server/uk/org/iscream/cms/server/clientinterface/PacketSorter.java
(Generate patch)

Comparing projects/cms/source/server/uk/org/iscream/cms/server/clientinterface/PacketSorter.java (file contents):
Revision 1.2 by tdb, Tue Jan 23 17:22:01 2001 UTC vs.
Revision 1.11 by tdb, Thu Mar 1 17:29:46 2001 UTC

# Line 10 | Line 10 | import java.util.*;
10   /**
11   * Receives data from the incoming CORBA servant, places
12   * it in a Queue, and then arranges distribution to the
13 < * ClientHandlers.
13 > * DataHandlers.
14 > * Has extra functionality to send data to DataHandlers
15 > * on a per-host basis - ie. the Client can request which
16 > * hosts it would like to listen for.
17   *
18   * @author  $Author$
19   * @version $Id$
# Line 30 | Line 33 | class PacketSorter extends Thread {
33  
34      /**
35       * Creates a new PacketSorter.
36 +     *
37 +     * @param queueMonitorInterval The interval at which to monitor the Queue
38       */
39 <    public PacketSorter() {
39 >    public PacketSorter(int queueMonitorInterval) {
40          _queue = new Queue();
41 +        // startup a monitor on this queue, every minute
42 +        String queueName = _name + " PacketSorterQueue";
43 +        _queue.startMonitor(queueMonitorInterval*1000, queueName);
44          _hostMap = new HashMap();
45 +        _allHostDataList = new LinkedList();
46          _allHostsList = new LinkedList();
47          _logger.write(toString(), Logger.SYSINIT, "created");
48      }
49      
50   //---PUBLIC METHODS---
51  
52 +    /**
53 +     * Method to start the PacketSorter running. This method will
54 +     * loop forever processing and sending data.
55 +     */
56      public void run() {
57          int qID = _queue.getQueue();
58          while(true) {
59 +            // attempt to get some data from the Queue
60              String xml = "";
61              try {
62                  xml = (String) _queue.get(qID);
# Line 51 | Line 65 | class PacketSorter extends Thread {
65                  _logger.write(toString(), Logger.ERROR, "Queue failure: "+e);
66              }
67              
68 <            // look at host map
68 >            XMLPacket packet = null;
69              
70 <            // should be a neater way to do this
71 <            XMLPacket packet = new XMLPacketMaker(xml).createXMLPacket();
72 <            String host = packet.getParam("packet.attributes.machine_name");
73 <            LinkedList list = (LinkedList) _hostMap.get(host);
74 <            
75 <            Iterator i = list.iterator();
76 <            while(i.hasNext()) {
63 <                ((Queue) i.next()).add(xml);
70 >            try {
71 >                XMLPacketMaker xmlPacketMaker = new XMLPacketMaker(xml);
72 >                packet = xmlPacketMaker.createXMLPacket();
73 >            } catch(InvalidXMLException e) {
74 >                _logger.write(toString(), Logger.ERROR, "Invalid XML: "+e);
75 >                // skip the rest of this loop iteration
76 >                continue;
77              }
78              
79 <            //  look at all hosts
80 <            
81 <            Iterator j = _allHostsList.iterator();
82 <            while(j.hasNext()) {
83 <                ((Queue) j.next()).add(xml);
79 >            String packetType = packet.getParam("packet.attributes.type");
80 >            // check if we need to send it regardless
81 >            if(packetType.equals("data") || packetType.equals("heartbeat")) {
82 >                String host = packet.getParam("packet.attributes.machine_name");
83 >                
84 >                // look in the hostMap to see if anyone wants this data
85 >                synchronized(this) {
86 >                    if(_hostMap.containsKey(host)) {
87 >                        LinkedList list = (LinkedList) _hostMap.get(host);
88 >                        Iterator i = list.iterator();
89 >                        // push the data to the listening Handler's queue
90 >                        while(i.hasNext()) {
91 >                            ((Queue) i.next()).add(xml);
92 >                        }
93 >                    }
94 >                }
95 >                
96 >                // any handler in this list wants all packets, so send
97 >                // it on to them regardless
98 >                synchronized(this) {
99 >                    Iterator i = _allHostDataList.iterator();
100 >                    while(i.hasNext()) {
101 >                        ((Queue) i.next()).add(xml);
102 >                    }
103 >                }
104              }
105 +            else {
106 +                // always send this packet to all hosts, because it's
107 +                // "extra" data, not host data
108 +                synchronized(this) {
109 +                    Iterator i = _allHostsList.iterator();
110 +                    while(i.hasNext()) {
111 +                        ((Queue) i.next()).add(xml);
112 +                    }
113 +                }
114 +            }
115          }
116      }
117      
118 <    // MUST DEAL WITH hostList="" implying "all hosts"
119 <    
120 <    public void register(Queue dhQueue, String hostList) {
118 >    /**
119 >     * Register a DataHandler in the system. This method
120 >     * actually takes a reference to a Queue, which should be
121 >     * a Queue that the DataHandler is making use of.
122 >     * It also takes a hostList, this being a semi-colon
123 >     * seperated list of hosts that the Client the DataHandler
124 >     * is serving has requested. If this list is simply an empty
125 >     * String, it is assumed the Client wants to listen to all
126 >     * host information.
127 >     *
128 >     * @param dhQueue a Queue being used by the DataHandler that is registering
129 >     * @param hostList a semi-colon seperated list of hosts
130 >     */
131 >    public synchronized void register(Queue dhQueue, String hostList) {
132 >        // check to see if we want all hosts
133          if(hostList.equals("")) {
134 <            _allHostsList.add(dhQueue);
134 >            _allHostDataList.add(dhQueue);
135              _logger.write(toString(), Logger.SYSMSG, "registered DataHandler for all hosts");
136          }
137          else {
138 +            // go through the list of hosts
139              StringTokenizer st = new StringTokenizer(hostList, ";");
140              while(st.hasMoreTokens()) {
141                  String host = st.nextToken();
142 +                // see if we already have a list in the map for this host
143                  if(_hostMap.containsKey(host)) {
144 <                    LinkedList list = (LinkedList) _hostMap.get(host);
144 >                    // we do, so add to it
145 >                    List list = (List) _hostMap.get(host);
146                      list.add(dhQueue);
147                  }
148                  else {
149 +                    // we don't, so create a list and put it in the map
150                      LinkedList list = new LinkedList();
151                      list.add(dhQueue);
152                      _hostMap.put(host, list);
# Line 95 | Line 154 | class PacketSorter extends Thread {
154              }
155              _logger.write(toString(), Logger.SYSMSG, "registered DataHandler for hosts: "+hostList);
156          }
157 +        // always add host to our complete host list
158 +        _allHostsList.add(dhQueue);
159      }
160      
161 <    public void deregister(Queue dhQueue, String hostList) {
161 >    /**
162 >     * Deregister a DataHandler. The DataHandler should give a reference
163 >     * to the Queue it's using, and the *same* hostList it gave when it
164 >     * register. It is imperative that the hostList is the same, otherwise
165 >     * there will be all sorts of problems with lists getting out of sync.
166 >     *
167 >     * NB: Possible future addition would be recording of hostList's.
168 >     *
169 >     * @param dhQueue a Queue being used by the DataHandler that is deregistering
170 >     * @param hostList a semi-colon seperated list of hosts
171 >     */
172 >    public synchronized void deregister(Queue dhQueue, String hostList) {
173 >        // go through the list of hosts
174          if(hostList.equals("")) {
175 <            _allHostsList.remove(dhQueue);
175 >            _allHostDataList.remove(dhQueue);
176              _logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for all hosts");
177          }
178          else {
179              StringTokenizer st = new StringTokenizer(hostList, ";");
180              while(st.hasMoreTokens()) {
181                  String host = st.nextToken();
182 +                // this should in reality always be true, but best check
183                  if(_hostMap.containsKey(host)) {
184 +                    // get the list and remove the host in question
185                      LinkedList list = (LinkedList) _hostMap.get(host);
186                      list.remove(dhQueue);
187 +                    // if the list is now empty, we might as well remove it
188                      if(list.size()==0) {
189                          _hostMap.remove(host);
190                      }
# Line 116 | Line 192 | class PacketSorter extends Thread {
192              }
193              _logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for hosts: "+hostList);
194          }
195 +        // always remove host from our complete host list
196 +        _allHostsList.remove(dhQueue);
197      }
198      
199      /**
# Line 137 | Line 215 | class PacketSorter extends Thread {
215   //---PRIVATE METHODS---
216  
217   //---ACCESSOR/MUTATOR METHODS---
218 <
218 >    
219 >    /**
220 >     * Accessor to return a reference to the Queue object. This
221 >     * is needed so the ClientInterfaceServant can get add data
222 >     * easily.
223 >     *
224 >     * @return a reference to our Queue object.
225 >     */
226      public Queue getQueue() {
227          return _queue;
228      }
# Line 161 | Line 246 | class PacketSorter extends Thread {
246       */
247      private Logger _logger = ReferenceManager.getInstance().getLogger();
248      
249 +    /**
250 +     * A reference to the Queue we're using.
251 +     */
252      private Queue _queue;
253      
254 +    /**
255 +     * A HashMap to store lists of Queue's (in the DataHandlers)
256 +     * in a way that can be easily accessed when data comes in.
257 +     */
258      private HashMap _hostMap;
259 +    
260 +    /**
261 +     * A list specifically for a Queue's associated with DataHandlers
262 +     * that want all host information.
263 +     */
264 +    private LinkedList _allHostDataList;
265 +    
266 +    /**
267 +     * A list of all hosts.
268 +     */
269      private LinkedList _allHostsList;
270      
271   //---STATIC ATTRIBUTES---

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines