ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/cms/source/server/uk/org/iscream/cms/server/clientinterface/PacketSorter.java
(Generate patch)

Comparing projects/cms/source/server/uk/org/iscream/cms/server/clientinterface/PacketSorter.java (file contents):
Revision 1.2 by tdb, Tue Jan 23 17:22:01 2001 UTC vs.
Revision 1.8 by tdb, Wed Feb 28 10:26:47 2001 UTC

# Line 10 | Line 10 | import java.util.*;
10   /**
11   * Receives data from the incoming CORBA servant, places
12   * it in a Queue, and then arranges distribution to the
13 < * ClientHandlers.
13 > * DataHandlers.
14 > * Has extra functionality to send data to DataHandlers
15 > * on a per-host basis - ie. the Client can request which
16 > * hosts it would like to listen for.
17   *
18   * @author  $Author$
19   * @version $Id$
# Line 30 | Line 33 | class PacketSorter extends Thread {
33  
34      /**
35       * Creates a new PacketSorter.
36 +     *
37 +     * @param queueMonitorInterval The interval at which to monitor the Queue
38       */
39 <    public PacketSorter() {
39 >    public PacketSorter(int queueMonitorInterval) {
40          _queue = new Queue();
41 <        _hostMap = new HashMap();
42 <        _allHostsList = new LinkedList();
41 >        // startup a monitor on this queue, every minute
42 >        String queueName = _name + " PacketSorterQueue";
43 >        _queue.startMonitor(queueMonitorInterval*1000, queueName);
44 >        _hostMap = Collections.synchronizedMap(new HashMap());
45 >        _allHostsList = Collections.synchronizedList(new LinkedList());
46          _logger.write(toString(), Logger.SYSINIT, "created");
47      }
48      
49   //---PUBLIC METHODS---
50  
51 +    /**
52 +     * Method to start the PacketSorter running. This method will
53 +     * loop forever processing and sending data.
54 +     */
55      public void run() {
56          int qID = _queue.getQueue();
57          while(true) {
58 +            // attempt to get some data from the Queue
59              String xml = "";
60              try {
61                  xml = (String) _queue.get(qID);
# Line 51 | Line 64 | class PacketSorter extends Thread {
64                  _logger.write(toString(), Logger.ERROR, "Queue failure: "+e);
65              }
66              
67 <            // look at host map
67 >            // find out which host this packet is actually for
68 >            XMLPacketMaker xmlPacketMaker = new XMLPacketMaker(xml);
69 >            XMLPacket packet = xmlPacketMaker.createXMLPacket();
70              
71 <            // should be a neater way to do this
72 <            XMLPacket packet = new XMLPacketMaker(xml).createXMLPacket();
73 <            String host = packet.getParam("packet.attributes.machine_name");
74 <            LinkedList list = (LinkedList) _hostMap.get(host);
75 <            
76 <            Iterator i = list.iterator();
77 <            while(i.hasNext()) {
78 <                ((Queue) i.next()).add(xml);
71 >            String packetType = packet.getParam("packet.attributes.type");
72 >            // check if we need to send it regardless
73 >            if(packetType.equals("data") || packetType.equals("heartbeat")) {
74 >                String host = packet.getParam("packet.attributes.machine_name");
75 >                
76 >                // look in the hostMap to see if anyone wants this data
77 >                if(_hostMap.containsKey(host)) {
78 >                    LinkedList list = (LinkedList) _hostMap.get(host);
79 >                    Iterator i = list.iterator();
80 >                    // push the data to the listening Handler's queue
81 >                    while(i.hasNext()) {
82 >                        ((Queue) i.next()).add(xml);
83 >                    }
84 >                }
85 >                
86 >                // any handler in this list wants all packets, so send
87 >                // it on to them regardless
88 >                Iterator i = _allHostsList.iterator();
89 >                while(i.hasNext()) {
90 >                    ((Queue) i.next()).add(xml);
91 >                }
92              }
93 <            
94 <            //  look at all hosts
95 <            
96 <            Iterator j = _allHostsList.iterator();
97 <            while(j.hasNext()) {
98 <                ((Queue) j.next()).add(xml);
93 >            else {
94 >                // send to all hosts because it's a special packet
95 >                Iterator i = _allHostsList.iterator();
96 >                while(i.hasNext()) {
97 >                    ((Queue) i.next()).add(xml);
98 >                }
99              }
100          }
101      }
102      
103 <    // MUST DEAL WITH hostList="" implying "all hosts"
104 <    
103 >    /**
104 >     * Register a DataHandler in the system. This method
105 >     * actually takes a reference to a Queue, which should be
106 >     * a Queue that the DataHandler is making use of.
107 >     * It also takes a hostList, this being a semi-colon
108 >     * seperated list of hosts that the Client the DataHandler
109 >     * is serving has requested. If this list is simply an empty
110 >     * String, it is assumed the Client wants to listen to all
111 >     * host information.
112 >     *
113 >     * @param dhQueue a Queue being used by the DataHandler that is registering
114 >     * @param hostList a semi-colon seperated list of hosts
115 >     */
116      public void register(Queue dhQueue, String hostList) {
117 +        // check to see if we want all hosts
118          if(hostList.equals("")) {
119              _allHostsList.add(dhQueue);
120              _logger.write(toString(), Logger.SYSMSG, "registered DataHandler for all hosts");
121          }
122          else {
123 +            // go through the list of hosts
124              StringTokenizer st = new StringTokenizer(hostList, ";");
125              while(st.hasMoreTokens()) {
126                  String host = st.nextToken();
127 +                // see if we already have a list in the map for this host
128                  if(_hostMap.containsKey(host)) {
129 +                    // we do, so add to it
130                      LinkedList list = (LinkedList) _hostMap.get(host);
131                      list.add(dhQueue);
132                  }
133                  else {
134 <                    LinkedList list = new LinkedList();
134 >                    // we don't, so create a list and put it in the map
135 >                    List list = Collections.synchronizedList(new LinkedList());
136                      list.add(dhQueue);
137                      _hostMap.put(host, list);
138                  }
# Line 97 | Line 141 | class PacketSorter extends Thread {
141          }
142      }
143      
144 +    /**
145 +     * Deregister a DataHandler. The DataHandler should give a reference
146 +     * to the Queue it's using, and the *same* hostList it gave when it
147 +     * register. It is imperative that the hostList is the same, otherwise
148 +     * there will be all sorts of problems with lists getting out of sync.
149 +     *
150 +     * NB: Possible future addition would be recording of hostList's.
151 +     *
152 +     * @param dhQueue a Queue being used by the DataHandler that is deregistering
153 +     * @param hostList a semi-colon seperated list of hosts
154 +     */
155      public void deregister(Queue dhQueue, String hostList) {
156 +        // go through the list of hosts
157          if(hostList.equals("")) {
158              _allHostsList.remove(dhQueue);
159              _logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for all hosts");
# Line 106 | Line 162 | class PacketSorter extends Thread {
162              StringTokenizer st = new StringTokenizer(hostList, ";");
163              while(st.hasMoreTokens()) {
164                  String host = st.nextToken();
165 +                // this should in reality always be true, but best check
166                  if(_hostMap.containsKey(host)) {
167 +                    // get the list and remove the host in question
168                      LinkedList list = (LinkedList) _hostMap.get(host);
169                      list.remove(dhQueue);
170 +                    // if the list is now empty, we might as well remove it
171                      if(list.size()==0) {
172                          _hostMap.remove(host);
173                      }
# Line 137 | Line 196 | class PacketSorter extends Thread {
196   //---PRIVATE METHODS---
197  
198   //---ACCESSOR/MUTATOR METHODS---
199 <
199 >    
200 >    /**
201 >     * Accessor to return a reference to the Queue object. This
202 >     * is needed so the ClientInterfaceServant can get add data
203 >     * easily.
204 >     *
205 >     * @return a reference to our Queue object.
206 >     */
207      public Queue getQueue() {
208          return _queue;
209      }
# Line 161 | Line 227 | class PacketSorter extends Thread {
227       */
228      private Logger _logger = ReferenceManager.getInstance().getLogger();
229      
230 +    /**
231 +     * A reference to the Queue we're using.
232 +     */
233      private Queue _queue;
234      
235 <    private HashMap _hostMap;
236 <    private LinkedList _allHostsList;
235 >    /**
236 >     * A HashMap to store lists of Queue's (in the DataHandlers)
237 >     * in a way that can be easily accessed when data comes in.
238 >     */
239 >    private Map _hostMap;
240 >    
241 >    /**
242 >     * A list specifically for a Queue's associated with DataHandlers
243 >     * that want all host information.
244 >     */
245 >    private List _allHostsList;
246      
247   //---STATIC ATTRIBUTES---
248  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines