ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/cms/source/server/uk/org/iscream/cms/server/clientinterface/PacketSorter.java
(Generate patch)

Comparing projects/cms/source/server/uk/org/iscream/cms/server/clientinterface/PacketSorter.java (file contents):
Revision 1.5 by tdb, Mon Feb 12 02:23:52 2001 UTC vs.
Revision 1.14 by tdb, Tue Mar 13 02:19:44 2001 UTC

# Line 33 | Line 33 | class PacketSorter extends Thread {
33  
34      /**
35       * Creates a new PacketSorter.
36 +     *
37 +     * @param queueMonitorInterval The interval at which to monitor the Queue
38       */
39 <    public PacketSorter() {
39 >    public PacketSorter(int queueMonitorInterval) {
40 >        // set the Thread name
41 >        setName("clientinterface.PacketSorter");
42 >        
43          _queue = new Queue();
44          // startup a monitor on this queue, every minute
45 <        _queue.startMonitor(60*1000, _name);
45 >        String queueName = _name + " PacketSorterQueue";
46 >        _queue.startMonitor(queueMonitorInterval*1000, queueName);
47          _hostMap = new HashMap();
48 +        _allHostDataList = new LinkedList();
49          _allHostsList = new LinkedList();
50          _logger.write(toString(), Logger.SYSINIT, "created");
51      }
# Line 50 | Line 57 | class PacketSorter extends Thread {
57       * loop forever processing and sending data.
58       */
59      public void run() {
60 +        XMLPacketMaker xmlPacketMaker = new XMLPacketMaker();
61          int qID = _queue.getQueue();
62          while(true) {
63              // attempt to get some data from the Queue
# Line 61 | Line 69 | class PacketSorter extends Thread {
69                  _logger.write(toString(), Logger.ERROR, "Queue failure: "+e);
70              }
71              
72 <            // find out which host this packet is actually for
65 <            XMLPacketMaker xmlPacketMaker = new XMLPacketMaker(xml);
66 <            XMLPacket packet = xmlPacketMaker.createXMLPacket();
67 <            String host = packet.getParam("packet.attributes.machine_name");
72 >            XMLPacket packet = null;
73              
74 <            // look in the hostMap to see if anyone wants this data
75 <            if(_hostMap.containsKey(host)) {
76 <                LinkedList list = (LinkedList) _hostMap.get(host);
77 <                Iterator i = list.iterator();
78 <                // push the data to the listening Handler's queue
79 <                while(i.hasNext()) {
75 <                    ((Queue) i.next()).add(xml);
76 <                }
74 >            try {
75 >                packet = xmlPacketMaker.createXMLPacket(xml);
76 >            } catch(InvalidXMLException e) {
77 >                _logger.write(toString(), Logger.ERROR, "Invalid XML: "+e);
78 >                // skip the rest of this loop iteration
79 >                continue;
80              }
81              
82 <            // any handler in this list wants all packets, so send
83 <            // it on to them regardless
84 <            Iterator j = _allHostsList.iterator();
85 <            while(j.hasNext()) {
86 <                ((Queue) j.next()).add(xml);
82 >            String packetType = packet.getParam("packet.attributes.type");
83 >            // check if we need to send it regardless
84 >            if(packetType.equals("data") || packetType.equals("heartbeat")) {
85 >                String host = packet.getParam("packet.attributes.machine_name");
86 >                
87 >                // look in the hostMap to see if anyone wants this data
88 >                synchronized(this) {
89 >                    if(_hostMap.containsKey(host)) {
90 >                        LinkedList list = (LinkedList) _hostMap.get(host);
91 >                        Iterator i = list.iterator();
92 >                        // push the data to the listening Handler's queue
93 >                        while(i.hasNext()) {
94 >                            ((Queue) i.next()).add(xml);
95 >                        }
96 >                    }
97 >                }
98 >                
99 >                // any handler in this list wants all packets, so send
100 >                // it on to them regardless
101 >                synchronized(this) {
102 >                    Iterator i = _allHostDataList.iterator();
103 >                    while(i.hasNext()) {
104 >                        ((Queue) i.next()).add(xml);
105 >                    }
106 >                }
107              }
108 +            else {
109 +                // always send this packet to all hosts, because it's
110 +                // "extra" data, not host data
111 +                synchronized(this) {
112 +                    Iterator i = _allHostsList.iterator();
113 +                    while(i.hasNext()) {
114 +                        ((Queue) i.next()).add(xml);
115 +                    }
116 +                }
117 +            }
118          }
119      }
120      
# Line 101 | Line 134 | class PacketSorter extends Thread {
134      public void register(Queue dhQueue, String hostList) {
135          // check to see if we want all hosts
136          if(hostList.equals("")) {
137 <            _allHostsList.add(dhQueue);
137 >            synchronized(this) {
138 >                _allHostDataList.add(dhQueue);
139 >            }
140              _logger.write(toString(), Logger.SYSMSG, "registered DataHandler for all hosts");
141          }
142          else {
# Line 109 | Line 144 | class PacketSorter extends Thread {
144              StringTokenizer st = new StringTokenizer(hostList, ";");
145              while(st.hasMoreTokens()) {
146                  String host = st.nextToken();
147 <                // see if we already have a list in the map for this host
148 <                if(_hostMap.containsKey(host)) {
149 <                    // we do, so add to it
150 <                    LinkedList list = (LinkedList) _hostMap.get(host);
151 <                    list.add(dhQueue);
147 >                synchronized(this) {
148 >                    // see if we already have a list in the map for this host
149 >                    if(_hostMap.containsKey(host)) {
150 >                        // we do, so add to it
151 >                        List list = (List) _hostMap.get(host);
152 >                        list.add(dhQueue);
153 >                    }
154 >                    else {
155 >                        // we don't, so create a list and put it in the map
156 >                        LinkedList list = new LinkedList();
157 >                        list.add(dhQueue);
158 >                        _hostMap.put(host, list);
159 >                    }
160                  }
118                else {
119                    // we don't, so create a list and put it in the map
120                    LinkedList list = new LinkedList();
121                    list.add(dhQueue);
122                    _hostMap.put(host, list);
123                }
161              }
162              _logger.write(toString(), Logger.SYSMSG, "registered DataHandler for hosts: "+hostList);
163          }
164 +        // always add host to our complete host list
165 +        synchronized(this) {
166 +            _allHostsList.add(dhQueue);
167 +        }
168      }
169      
170      /**
# Line 140 | Line 181 | class PacketSorter extends Thread {
181      public void deregister(Queue dhQueue, String hostList) {
182          // go through the list of hosts
183          if(hostList.equals("")) {
184 <            _allHostsList.remove(dhQueue);
184 >            synchronized(this) {
185 >                _allHostDataList.remove(dhQueue);
186 >            }
187              _logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for all hosts");
188          }
189          else {
190              StringTokenizer st = new StringTokenizer(hostList, ";");
191              while(st.hasMoreTokens()) {
192                  String host = st.nextToken();
193 <                // this should in reality always be true, but best check
194 <                if(_hostMap.containsKey(host)) {
195 <                    // get the list and remove the host in question
196 <                    LinkedList list = (LinkedList) _hostMap.get(host);
197 <                    list.remove(dhQueue);
198 <                    // if the list is now empty, we might as well remove it
199 <                    if(list.size()==0) {
200 <                        _hostMap.remove(host);
193 >                synchronized(this) {
194 >                    // this should in reality always be true, but best check
195 >                    if(_hostMap.containsKey(host)) {
196 >                        // get the list and remove the host in question
197 >                        LinkedList list = (LinkedList) _hostMap.get(host);
198 >                        list.remove(dhQueue);
199 >                        // if the list is now empty, we might as well remove it
200 >                        if(list.size()==0) {
201 >                            _hostMap.remove(host);
202 >                        }
203                      }
204                  }
205              }
206              _logger.write(toString(), Logger.SYSMSG, "deregistered DataHandler for hosts: "+hostList);
207          }
208 +        // always remove host from our complete host list
209 +        synchronized(this) {
210 +            _allHostsList.remove(dhQueue);
211 +        }
212      }
213      
214      /**
# Line 226 | Line 275 | class PacketSorter extends Thread {
275      /**
276       * A list specifically for a Queue's associated with DataHandlers
277       * that want all host information.
278 +     */
279 +    private LinkedList _allHostDataList;
280 +    
281 +    /**
282 +     * A list of all hosts.
283       */
284      private LinkedList _allHostsList;
285      

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines