ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/cms/source/server/uk/org/iscream/cms/server/clientinterface/PacketSorter.java
(Generate patch)

Comparing projects/cms/source/server/uk/org/iscream/cms/server/clientinterface/PacketSorter.java (file contents):
Revision 1.3 by tdb, Tue Jan 23 18:23:03 2001 UTC vs.
Revision 1.14 by tdb, Tue Mar 13 02:19:44 2001 UTC

# Line 10 | Line 10 | import java.util.*;
10   /**
11   * Receives data from the incoming CORBA servant, places
12   * it in a Queue, and then arranges distribution to the
13 < * ClientHandlers.
13 > * DataHandlers.
14 > * Has extra functionality to send data to DataHandlers
15 > * on a per-host basis - ie. the Client can request which
16 > * hosts it would like to listen for.
17   *
18   * @author  $Author$
19   * @version $Id$
# Line 30 | Line 33 | class PacketSorter extends Thread {
33  
34      /**
35       * Creates a new PacketSorter.
36 +     *
37 +     * @param queueMonitorInterval The interval at which to monitor the Queue
38       */
39 <    public PacketSorter() {
39 >    public PacketSorter(int queueMonitorInterval) {
40 >        // set the Thread name
41 >        setName("clientinterface.PacketSorter");
42 >        
43          _queue = new Queue();
44 +        // startup a monitor on this queue, every minute
45 +        String queueName = _name + " PacketSorterQueue";
46 +        _queue.startMonitor(queueMonitorInterval*1000, queueName);
47          _hostMap = new HashMap();
48 +        _allHostDataList = new LinkedList();
49          _allHostsList = new LinkedList();
50          _logger.write(toString(), Logger.SYSINIT, "created");
51      }
52      
53   //---PUBLIC METHODS---
54  
55 +    /**
56 +     * Method to start the PacketSorter running. This method will
57 +     * loop forever processing and sending data.
58 +     */
59      public void run() {
60 +        XMLPacketMaker xmlPacketMaker = new XMLPacketMaker();
61          int qID = _queue.getQueue();
62          while(true) {
63 +            // attempt to get some data from the Queue
64              String xml = "";
65              try {
66                  xml = (String) _queue.get(qID);
# Line 51 | Line 69 | class PacketSorter extends Thread {
69                  _logger.write(toString(), Logger.ERROR, "Queue failure: "+e);
70              }
71              
72 <            // look at host map
72 >            XMLPacket packet = null;
73              
74 <            // should be a neater way to do this
75 <            XMLPacketMaker xmlPacketMaker = new XMLPacketMaker(xml);
76 <            XMLPacket packet = xmlPacketMaker.createXMLPacket();
77 <            String host = packet.getParam("packet.attributes.machine_name");
74 >            try {
75 >                packet = xmlPacketMaker.createXMLPacket(xml);
76 >            } catch(InvalidXMLException e) {
77 >                _logger.write(toString(), Logger.ERROR, "Invalid XML: "+e);
78 >                // skip the rest of this loop iteration
79 >                continue;
80 >            }
81              
82 <            if(_hostMap.containsKey(host)) {
83 <                LinkedList list = (LinkedList) _hostMap.get(host);
84 <                Iterator i = list.iterator();
85 <                while(i.hasNext()) {
86 <                    ((Queue) i.next()).add(xml);
82 >            String packetType = packet.getParam("packet.attributes.type");
83 >            // check if we need to send it regardless
84 >            if(packetType.equals("data") || packetType.equals("heartbeat")) {
85 >                String host = packet.getParam("packet.attributes.machine_name");
86 >                
87 >                // look in the hostMap to see if anyone wants this data
88 >                synchronized(this) {
89 >                    if(_hostMap.containsKey(host)) {
90 >                        LinkedList list = (LinkedList) _hostMap.get(host);
91 >                        Iterator i = list.iterator();
92 >                        // push the data to the listening Handler's queue
93 >                        while(i.hasNext()) {
94 >                            ((Queue) i.next()).add(xml);
95 >                        }
96 >                    }
97                  }
98 +                
99 +                // any handler in this list wants all packets, so send
100 +                // it on to them regardless
101 +                synchronized(this) {
102 +                    Iterator i = _allHostDataList.iterator();
103 +                    while(i.hasNext()) {
104 +                        ((Queue) i.next()).add(xml);
105 +                    }
106 +                }
107              }
108 <            
109 <            //  look at all hosts
110 <            
111 <            Iterator j = _allHostsList.iterator();
112 <            while(j.hasNext()) {
113 <                ((Queue) j.next()).add(xml);
108 >            else {
109 >                // always send this packet to all hosts, because it's
110 >                // "extra" data, not host data
111 >                synchronized(this) {
112 >                    Iterator i = _allHostsList.iterator();
113 >                    while(i.hasNext()) {
114 >                        ((Queue) i.next()).add(xml);
115 >                    }
116 >                }
117              }
118          }
119      }
120      
121 <    // MUST DEAL WITH hostList="" implying "all hosts"
122 <    
121 >    /**
122 >     * Register a DataHandler in the system. This method
123 >     * actually takes a reference to a Queue, which should be
124 >     * a Queue that the DataHandler is making use of.
125 >     * It also takes a hostList, this being a semi-colon
126 >     * seperated list of hosts that the Client the DataHandler
127 >     * is serving has requested. If this list is simply an empty
128 >     * String, it is assumed the Client wants to listen to all
129 >     * host information.
130 >     *
131 >     * @param dhQueue a Queue being used by the DataHandler that is registering
132 >     * @param hostList a semi-colon seperated list of hosts
133 >     */
134      public void register(Queue dhQueue, String hostList) {
135 +        // check to see if we want all hosts
136          if(hostList.equals("")) {
137 <            _allHostsList.add(dhQueue);
137 >            synchronized(this) {
138 >                _allHostDataList.add(dhQueue);
139 >            }
140              _logger.write(toString(), Logger.SYSMSG, "registered DataHandler for all hosts");
141          }
142          else {
143 +            // go through the list of hosts
144              StringTokenizer st = new StringTokenizer(hostList, ";");
145              while(st.hasMoreTokens()) {
146                  String host = st.nextToken();
147 <                if(_hostMap.containsKey(host)) {
148 <                    LinkedList list = (LinkedList) _hostMap.get(host);
149 <                    list.add(dhQueue);
147 >                synchronized(this) {
148 >                    // see if we already have a list in the map for this host
149 >                    if(_hostMap.containsKey(host)) {
150 >                        // we do, so add to it
151 >                        List list = (List) _hostMap.get(host);
152 >                        list.add(dhQueue);
153 >                    }
154 >                    else {
155 >                        // we don't, so create a list and put it in the map
156 >                        LinkedList list = new LinkedList();
157 >                        list.add(dhQueue);
158 >                        _hostMap.put(host, list);
159 >                    }
160                  }
93                else {
94                    LinkedList list = new LinkedList();
95                    list.add(dhQueue);
96                    _hostMap.put(host, list);
97                }
161              }
162              _logger.write(toString(), Logger.SYSMSG, "registered DataHandler for hosts: "+hostList);
163          }
164 +        // always add host to our complete host list
165 +        synchronized(this) {
166 +            _allHostsList.add(dhQueue);
167 +        }
168      }
169      
170 +    /**
171 +     * Deregister a DataHandler. The DataHandler should give a reference
172 +     * to the Queue it's using, and the *same* hostList it gave when it
173 +     * register. It is imperative that the hostList is the same, otherwise
174 +     * there will be all sorts of problems with lists getting out of sync.
175 +     *
176 +     * NB: Possible future addition would be recording of hostList's.
177 +     *
178 +     * @param dhQueue a Queue being used by the DataHandler that is deregistering
179 +     * @param hostList a semi-colon seperated list of hosts
180 +     */
181      public void deregister(Queue dhQueue, String hostList) {
182 +        // go through the list of hosts
183          if(hostList.equals("")) {
184 <            _allHostsList.remove(dhQueue);
184 >            synchronized(this) {
185 >                _allHostDataList.remove(dhQueue);
186 >            }
187              _logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for all hosts");
188          }
189          else {
190              StringTokenizer st = new StringTokenizer(hostList, ";");
191              while(st.hasMoreTokens()) {
192                  String host = st.nextToken();
193 <                if(_hostMap.containsKey(host)) {
194 <                    LinkedList list = (LinkedList) _hostMap.get(host);
195 <                    list.remove(dhQueue);
196 <                    if(list.size()==0) {
197 <                        _hostMap.remove(host);
193 >                synchronized(this) {
194 >                    // this should in reality always be true, but best check
195 >                    if(_hostMap.containsKey(host)) {
196 >                        // get the list and remove the host in question
197 >                        LinkedList list = (LinkedList) _hostMap.get(host);
198 >                        list.remove(dhQueue);
199 >                        // if the list is now empty, we might as well remove it
200 >                        if(list.size()==0) {
201 >                            _hostMap.remove(host);
202 >                        }
203                      }
204                  }
205              }
206              _logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for hosts: "+hostList);
207          }
208 +        // always remove host from our complete host list
209 +        synchronized(this) {
210 +            _allHostsList.remove(dhQueue);
211 +        }
212      }
213      
214      /**
# Line 140 | Line 230 | class PacketSorter extends Thread {
230   //---PRIVATE METHODS---
231  
232   //---ACCESSOR/MUTATOR METHODS---
233 <
233 >    
234 >    /**
235 >     * Accessor to return a reference to the Queue object. This
236 >     * is needed so the ClientInterfaceServant can get add data
237 >     * easily.
238 >     *
239 >     * @return a reference to our Queue object.
240 >     */
241      public Queue getQueue() {
242          return _queue;
243      }
# Line 164 | Line 261 | class PacketSorter extends Thread {
261       */
262      private Logger _logger = ReferenceManager.getInstance().getLogger();
263      
264 +    /**
265 +     * A reference to the Queue we're using.
266 +     */
267      private Queue _queue;
268      
269 +    /**
270 +     * A HashMap to store lists of Queue's (in the DataHandlers)
271 +     * in a way that can be easily accessed when data comes in.
272 +     */
273      private HashMap _hostMap;
274 +    
275 +    /**
276 +     * A list specifically for a Queue's associated with DataHandlers
277 +     * that want all host information.
278 +     */
279 +    private LinkedList _allHostDataList;
280 +    
281 +    /**
282 +     * A list of all hosts.
283 +     */
284      private LinkedList _allHostsList;
285      
286   //---STATIC ATTRIBUTES---

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines