ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/pjob/pjob.c
(Generate patch)

Comparing projects/pjob/pjob.c (file contents):
Revision 1.1 by pajs, Mon Jan 23 11:16:32 2006 UTC vs.
Revision 1.6 by pajs, Mon Jan 30 21:46:59 2006 UTC

# Line 19 | Line 19
19   */
20  
21   #include <stdio.h>
22 < #include <glib.h>
22 > #include <stdlib.h>
23   #include <unistd.h>
24 + #include <string.h>
25 + #include <glib.h>
26 + #include <glib/gprintf.h>
27   #include <poll.h>
28 + #include <sys/types.h>
29 + #include <sys/wait.h>
30 + #include <sys/resource.h>
31  
32   #define DEFNUMTHREAD 10
33   #define DEFTIMEOUT 60
# Line 32 | Line 38 | struct _process_t{
38          gchar *jobname;
39          GTimer *timer;
40          GPid pid;
41 +        gint stat_loc;
42          gchar *file_stdout;
43          gchar *file_stderr;
44  
# Line 51 | Line 58 | static gchar *command = NULL;
58   static gchar *outdir = NULL;
59   static gchar *infile = NULL;
60   static gchar *arglist = NULL;
54 static gchar *stdindata = NULL;
61  
62   /* Command line options */
63   static GOptionEntry options[] =
# Line 83 | Line 89 | void process_child(gpointer data, gpointer user_data){
89          struct pollfd fds[3];
90          gint fdssize=2;
91  
92 <        /*if((pipe(outpipes)) != 0){
93 <                g_printerr("Failed to create pipe\n");
88 <                return ;
89 <        }*/
92 >        proc->timer = g_timer_new();
93 >        g_timer_start(proc->timer);
94  
95          /* Setup files in output dir if requested to do so */
96          if(outdir != NULL){
# Line 124 | Line 128 | void process_child(gpointer data, gpointer user_data){
128  
129          /* Exec the job */
130          if (infile == NULL){
131 <                if( ! g_spawn_async_with_pipes(NULL, execargv, NULL, 0, NULL, NULL, &(proc->pid), NULL, &(outpipes[0]), &(outpipes[1]), &err)){
131 >                if( ! g_spawn_async_with_pipes(NULL, execargv, NULL, G_SPAWN_DO_NOT_REAP_CHILD, NULL, NULL, &(proc->pid), NULL, &(outpipes[0]), &(outpipes[1]), &err)){
132                          g_printerr("Failed to execute job %s: %s\n", proc->jobname, err->message);
133                          return;
134                  }
135          }else{
136 <                if( ! g_spawn_async_with_pipes(NULL, execargv, NULL, 0, NULL, NULL, &(proc->pid), &(inpipes[1]), &(outpipes[0]), &(outpipes[1]), &err)){
136 >                if( ! g_spawn_async_with_pipes(NULL, execargv, NULL, G_SPAWN_DO_NOT_REAP_CHILD, NULL, NULL, &(proc->pid), &(inpipes[1]), &(outpipes[0]), &(outpipes[1]), &err)){
137                          g_printerr("Failed to execute job %s: %s\n", proc->jobname, err->message);
138                          return;
139                  }
# Line 165 | Line 169 | void process_child(gpointer data, gpointer user_data){
169                  poll(fds, fdssize, -1);
170                  /* For stdout and stderr see if there is any data, and read it */
171                  for(x=0; x<2; x++){
172 <                        if(fds[x].revents|POLLIN == fds[x].revents){
172 >                        if((fds[x].revents & POLLIN) != 0){
173                                  /* We have data to read */
174                                  g_io_channel_read_line(sout[x], &readbuf, &rdatasize, NULL, NULL);
175                                  if(rdatasize > 0){
# Line 183 | Line 187 | void process_child(gpointer data, gpointer user_data){
187                          }
188                  }
189                  /* See if we need to pump more data down stdin */
190 <                if(fds[2].revents|POLLOUT == fds[2].revents){
190 >                if((fds[2].revents & POLLOUT) != 0){
191                          /* We have data we can write */
192                          gchar *nextline;
193                          gint nextlinesize;
# Line 193 | Line 197 | void process_child(gpointer data, gpointer user_data){
197                          /* Get the next line, and write it down the stream */
198                          s = g_io_channel_read_line(sinfile, &nextline, &nextlinesize, NULL, NULL);
199                          if (nextlinesize > 0){
196                                printf("Going to write '%s'\n", nextline);
200                                  g_io_channel_write_chars(sin, nextline, nextlinesize, &nextlinewritesize, NULL);
201                          }
202                          if (s == G_IO_STATUS_EOF){
203                                  g_io_channel_shutdown(sin, TRUE, NULL);
204 <                                sin == NULL;
204 >                                sin = NULL;
205                                  fdssize=2;
206                          }
207  
# Line 206 | Line 209 | void process_child(gpointer data, gpointer user_data){
209                  /* Even if we did get a hangup - lets make sure there is no more data to read first by looping again */
210                  if (readdata) continue;
211  
212 <                if((fds[0].revents|POLLHUP == fds[0].revents) && (fds[1].revents|POLLHUP == fds[1].revents)) break;
212 >                if(((fds[0].revents & POLLHUP) != 0) && ((fds[1].revents & POLLHUP) != 0)) break;
213          }      
214  
215 <        /* For some batty reason, it starts with a ref count of 1. Lets decrement it so we can shut it */
213 <        /*g_io_channel_unref(sout[0]);
214 <        g_io_channel_unref(sout[1]);*/
215 >        while((waitpid(proc->pid, &(proc->stat_loc), 0)) != proc->pid);
216  
217 +        g_timer_stop(proc->timer);
218  
219 +        /* If process exited cleanly */
220 +        if (WIFEXITED(proc->stat_loc)){
221 +                /* Get the exit code */
222 +                if (verbose) g_fprintf(stderr, "Job '%s' exited with code %d. Exec time: %.2f\n", proc->jobname, WEXITSTATUS(proc->stat_loc), g_timer_elapsed(proc->timer, 0));
223 +        }else{
224 +                /* Otherwise - find out what it died with */
225 +                /* TODO - this doesn't work quite right.. Mainly because its looking at the shell process, so the
226 +                 * child of the /bin/sh which does get a signal, this isn't passed up. Although, it handly tells
227 +                 * us if the /bin/sh gets a SEGV etc ;)
228 +                 */
229 +                g_fprintf(stderr, "Job %s exited with signal %d. Exec time: %.2f\n", proc->jobname, (WTERMSIG(proc->stat_loc)), g_timer_elapsed(proc->timer, 0));
230 +        }
231 +
232 +
233          g_io_channel_shutdown(sout[0], TRUE, NULL);
234          g_io_channel_shutdown(sout[1], TRUE, NULL);
219        /*close(outpipes[0]);
220        close(outpipes[1]);*/
235  
222        /* BUG - Causes glib error saying its already closed */
236          if((infile != NULL) && (sin != NULL)){
237                  g_io_channel_shutdown(sin, TRUE, NULL);
238          }
# Line 230 | Line 243 | void process_child(gpointer data, gpointer user_data){
243  
244          g_spawn_close_pid(proc->pid);
245  
233        if (verbose) g_fprintf(stderr, "Ending job '%s'\n", proc->jobname);
246  
247   }
248  
# Line 270 | Line 282 | gchar *strrep(gchar *str, gchar *find, gchar *replace)
282   * and a line for doign the substitution with. Fills in jobname
283   */
284   gchar *genexeccmd(gchar *cmd, gchar *fmt, gchar *line, gchar **jobname){
273        gchar **fmttok;
285          gchar *newexec, *ptr;
286          int x;
287          gchar *linesep = " ";
# Line 321 | Line 332 | int main(int argc, char **argv){
332          GError *pp_err = NULL, *err = NULL;
333          gint x;
334  
335 +        struct rlimit rlp;
336 +
337          GOptionContext *optcontext;
338  
339          optcontext = g_option_context_new(" - parallel job executer");
# Line 350 | Line 363 | int main(int argc, char **argv){
363          }else{
364                  g_printerr("Threading not supported\n");
365          }
366 +
367 +        /* Up the number of FD's to the "hard" limit.
368 +         * This is mainly to get around the very small default
369 +         * solaris has, or 256
370 +         */
371 +        getrlimit(RLIMIT_NOFILE, &rlp);
372 +        rlp.rlim_cur = rlp.rlim_max;
373 +        setrlimit(RLIMIT_NOFILE, &rlp);
374          
375          if(verbose) g_printerr("Creating a threadpool %d in size\n", numthreads);
376          procpool = g_thread_pool_new(process_child, NULL, numthreads, FALSE, &pp_err);
# Line 402 | Line 423 | int main(int argc, char **argv){
423          /* Wait for the jobs to finish */
424          /* TODO - Kill jobs that don't finish in time */
425          while(g_thread_pool_get_num_threads(procpool) > 0){
426 <                sleep(1);
426 >                g_usleep(1000);
427          }
428  
429          return 0;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines