ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/pjob/pjob.c
Revision: 1.6
Committed: Mon Jan 30 21:46:59 2006 UTC (18 years, 9 months ago) by pajs
Content type: text/plain
Branch: MAIN
CVS Tags: HEAD
Changes since 1.5: +11 -0 lines
Log Message:
Implemented the use of hard/soft limits. Need to start autoconfing around
stuff like that

File Contents

# User Rev Content
1 pajs 1.1 /*
2     * Peter Saunders pjob.c
3     * Copyright (C) 2000-2005 Peter Saunders
4     *
5     * This program is free software; you can redistribute it and/or
6     * modify it under the terms of the GNU General Public License
7     * as published by the Free Software Foundation; either version 2
8     * of the License, or (at your option) any later version.
9     *
10     * This program is distributed in the hope that it will be useful,
11     * but WITHOUT ANY WARRANTY; without even the implied warranty of
12     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13     * GNU General Public License for more details.
14     *
15     * You should have received a copy of the GNU General Public License
16     * along with this program; if not, write to the Free Software
17     * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18     *
19     */
20    
21     #include <stdio.h>
22 pajs 1.2 #include <stdlib.h>
23     #include <unistd.h>
24     #include <string.h>
25 pajs 1.1 #include <glib.h>
26 pajs 1.2 #include <glib/gprintf.h>
27 pajs 1.1 #include <poll.h>
28 pajs 1.4 #include <sys/types.h>
29     #include <sys/wait.h>
30 pajs 1.6 #include <sys/resource.h>
31 pajs 1.1
32     #define DEFNUMTHREAD 10
33     #define DEFTIMEOUT 60
34    
35     /* Structure for the process to be passed to the executing thread */
36     struct _process_t{
37     gchar *exec;
38     gchar *jobname;
39     GTimer *timer;
40     GPid pid;
41 pajs 1.4 gint stat_loc;
42 pajs 1.1 gchar *file_stdout;
43     gchar *file_stderr;
44    
45     GError *err;
46     };
47    
48     typedef struct _process_t process_t;
49    
50     /* Globals for config setup */
51     static gint numthreads = DEFNUMTHREAD;
52     static gboolean verbose = FALSE;
53     static gboolean quiet = FALSE;
54     static gint timeout = DEFTIMEOUT;
55     static gchar *parseformat = NULL;
56     static gchar *jobparsename = NULL;
57     static gchar *command = NULL;
58     static gchar *outdir = NULL;
59     static gchar *infile = NULL;
60     static gchar *arglist = NULL;
61    
62     /* Command line options */
63     static GOptionEntry options[] =
64     {
65     { "jobs", 'j', 0, G_OPTION_ARG_INT, &numthreads, "Number of jobs to run in parallel [DEFNUMTHREAD]", NULL },
66     { "verbose", 'v', 0, G_OPTION_ARG_NONE, &verbose, "Be verbose", NULL },
67     { "quiet", 'q', 0, G_OPTION_ARG_NONE, &quiet, "Do not print the output of the commands as they are running", NULL },
68     { "timeout", 't', 0, G_OPTION_ARG_INT, &timeout, "Timeout before process is killed [DEFTIMEOUT]", NULL },
69     { "format", 'f', 0, G_OPTION_ARG_STRING, &parseformat, "The order of substitiution to be used in the command", NULL },
70     { "jobname", 'n', 0, G_OPTION_ARG_STRING, &jobparsename, "If format is used, which variable to use for a output dir", NULL },
71     { "command", 'c', 0, G_OPTION_ARG_STRING, &command, "The command to be executed", NULL },
72     { "output", 'o', 0, G_OPTION_ARG_STRING, &outdir, "Directory to put all output into", NULL },
73     { "stdin", 'i', 0, G_OPTION_ARG_FILENAME, &infile, "Pass contents of filename into stdin of the executing process", NULL },
74     { "argsfile", 'a', 0, G_OPTION_ARG_FILENAME, &arglist, "File for list of argumenst if you dont want to use the command line", NULL },
75     { NULL }
76     };
77    
78     /* Linked list of process jobs */
79     GList *proclist = NULL;
80    
81     /* Take a process_t and execute it, and doing the "right thing" with the output */
82     void process_child(gpointer data, gpointer user_data){
83     process_t *proc = (process_t*) data;
84     GIOChannel *soutfile[2], *sout[2], *sinfile, *sin;
85     gchar *execargv[4];
86     int outpipes[2], inpipes[2];
87     GError *err = NULL;
88    
89     struct pollfd fds[3];
90     gint fdssize=2;
91    
92 pajs 1.3 proc->timer = g_timer_new();
93     g_timer_start(proc->timer);
94    
95 pajs 1.1 /* Setup files in output dir if requested to do so */
96     if(outdir != NULL){
97     proc->file_stdout = g_strdup_printf("%s/%s-STDOUT", outdir, proc->jobname);
98     soutfile[0] = g_io_channel_new_file(proc->file_stdout, "w", &err);
99    
100     if(soutfile[0] == NULL){
101     g_printerr("Failed to open %s for writing: %s. Skipping job\n", proc->file_stdout, err->message);
102     return;
103     }
104    
105     proc->file_stderr = g_strdup_printf("%s/%s-STDERR", outdir, proc->jobname);
106     soutfile[1] = g_io_channel_new_file(proc->file_stderr, "w", &err);
107    
108     if(soutfile[1] == NULL){
109     g_printerr("Failed to open %s for writing: %s. Skipping job\n", proc->file_stderr, err->message);
110     return;
111     }
112    
113     }
114    
115    
116     /* Open stdin file to pass to the process */
117     if(infile != NULL){
118     sinfile = g_io_channel_new_file(infile, "r", NULL);
119     pipe(inpipes);
120     }
121    
122     /* Setup argv structure for job */
123     if (verbose) g_fprintf(stderr, "Starting job '%s'\n", proc->jobname);
124     execargv[0] = "/bin/sh";
125     execargv[1] = "-c";
126     execargv[2] = proc->exec;
127     execargv[3] = NULL;
128    
129     /* Exec the job */
130     if (infile == NULL){
131 pajs 1.4 if( ! g_spawn_async_with_pipes(NULL, execargv, NULL, G_SPAWN_DO_NOT_REAP_CHILD, NULL, NULL, &(proc->pid), NULL, &(outpipes[0]), &(outpipes[1]), &err)){
132 pajs 1.1 g_printerr("Failed to execute job %s: %s\n", proc->jobname, err->message);
133     return;
134     }
135     }else{
136 pajs 1.4 if( ! g_spawn_async_with_pipes(NULL, execargv, NULL, G_SPAWN_DO_NOT_REAP_CHILD, NULL, NULL, &(proc->pid), &(inpipes[1]), &(outpipes[0]), &(outpipes[1]), &err)){
137 pajs 1.1 g_printerr("Failed to execute job %s: %s\n", proc->jobname, err->message);
138     return;
139     }
140     close(inpipes[0]);
141     }
142    
143    
144     /* Make a stream out of the pipes for ease of reading from them */
145     sout[0] = g_io_channel_unix_new(outpipes[0]);
146     sout[1] = g_io_channel_unix_new(outpipes[1]);
147     if(infile != NULL){
148     sin = g_io_channel_unix_new(inpipes[1]);
149     fds[2].fd = inpipes[1];
150     fds[2].events = POLLOUT | POLLHUP;
151     fdssize = 3;
152     }
153    
154    
155     /* Setup the poll events */
156     fds[0].fd = outpipes[0];
157     fds[1].fd = outpipes[1];
158     fds[0].events = fds[1].events = POLLIN | POLLPRI | POLLHUP;
159     fds[0].revents = fds[1].revents = fds[2].revents = 0;
160    
161    
162     for(;;){
163     gint x;
164     gchar *readbuf;
165     gint rdatasize, wdatasize;
166     gboolean readdata = FALSE;
167     fds[0].revents = fds[1].revents = fds[2].revents = 0;
168    
169     poll(fds, fdssize, -1);
170     /* For stdout and stderr see if there is any data, and read it */
171     for(x=0; x<2; x++){
172 pajs 1.5 if((fds[x].revents & POLLIN) != 0){
173 pajs 1.1 /* We have data to read */
174     g_io_channel_read_line(sout[x], &readbuf, &rdatasize, NULL, NULL);
175     if(rdatasize > 0){
176     /* Print it if unless told not to */
177     if(!quiet){
178     g_printf("[%s] [%s] %s", proc->jobname, (x==0) ? "out" : "err", readbuf);
179     }
180     if(outdir != NULL){
181     g_io_channel_write_chars(soutfile[x], readbuf, rdatasize, &wdatasize, NULL);
182     }
183     readdata = TRUE;
184     free(readbuf);
185     }
186    
187     }
188     }
189     /* See if we need to pump more data down stdin */
190 pajs 1.3 if((fds[2].revents & POLLOUT) != 0){
191 pajs 1.1 /* We have data we can write */
192     gchar *nextline;
193     gint nextlinesize;
194     gint nextlinewritesize;
195     GIOStatus s;
196    
197     /* Get the next line, and write it down the stream */
198     s = g_io_channel_read_line(sinfile, &nextline, &nextlinesize, NULL, NULL);
199     if (nextlinesize > 0){
200     g_io_channel_write_chars(sin, nextline, nextlinesize, &nextlinewritesize, NULL);
201     }
202     if (s == G_IO_STATUS_EOF){
203     g_io_channel_shutdown(sin, TRUE, NULL);
204 pajs 1.2 sin = NULL;
205 pajs 1.1 fdssize=2;
206     }
207    
208     }
209     /* Even if we did get a hangup - lets make sure there is no more data to read first by looping again */
210     if (readdata) continue;
211    
212 pajs 1.3 if(((fds[0].revents & POLLHUP) != 0) && ((fds[1].revents & POLLHUP) != 0)) break;
213 pajs 1.1 }
214 pajs 1.3
215 pajs 1.4 while((waitpid(proc->pid, &(proc->stat_loc), 0)) != proc->pid);
216    
217 pajs 1.3 g_timer_stop(proc->timer);
218 pajs 1.1
219 pajs 1.4 /* If process exited cleanly */
220     if (WIFEXITED(proc->stat_loc)){
221     /* Get the exit code */
222     if (verbose) g_fprintf(stderr, "Job '%s' exited with code %d. Exec time: %.2f\n", proc->jobname, WEXITSTATUS(proc->stat_loc), g_timer_elapsed(proc->timer, 0));
223     }else{
224     /* Otherwise - find out what it died with */
225     /* TODO - this doesn't work quite right.. Mainly because its looking at the shell process, so the
226     * child of the /bin/sh which does get a signal, this isn't passed up. Although, it handly tells
227     * us if the /bin/sh gets a SEGV etc ;)
228     */
229     g_fprintf(stderr, "Job %s exited with signal %d. Exec time: %.2f\n", proc->jobname, (WTERMSIG(proc->stat_loc)), g_timer_elapsed(proc->timer, 0));
230     }
231    
232    
233 pajs 1.1 g_io_channel_shutdown(sout[0], TRUE, NULL);
234     g_io_channel_shutdown(sout[1], TRUE, NULL);
235    
236     if((infile != NULL) && (sin != NULL)){
237     g_io_channel_shutdown(sin, TRUE, NULL);
238     }
239     if (outdir != NULL){
240     g_io_channel_shutdown(soutfile[0], TRUE, NULL);
241     g_io_channel_shutdown(soutfile[1], TRUE, NULL);
242     }
243    
244     g_spawn_close_pid(proc->pid);
245    
246    
247     }
248    
249     /* Takes a string str, a search string, find, and string to
250     * replace all occurs of find with, replace. Returns a new
251     * leaving original intact.
252     */
253     gchar *strrep(gchar *str, gchar *find, gchar *replace){
254     gchar *ptr, *oldptr;
255     GString *newstr = g_string_new("");
256     gssize len = strlen(str);
257     gint findlen = strlen(find);
258    
259     ptr = g_strstr_len(str, len, find);
260     oldptr=str;
261     while(ptr != NULL){
262     /* Copy in data up to this point */
263     g_string_append_len (newstr, oldptr, (ptr - oldptr));
264     /* Put in the replacement string */
265     g_string_append(newstr, replace);
266    
267     oldptr = ptr + findlen;
268     /* BUG - len will now be wrong. But, i only wanted a strstr anyway :) */
269     ptr = g_strstr_len(oldptr, len, find);
270     }
271    
272     /* Copy remains */
273     g_string_append_len (newstr, oldptr, (ptr - oldptr));
274    
275     ptr = g_string_free(newstr, FALSE);
276    
277     return ptr;
278     }
279    
280    
281     /* Takes a cmd before substitution, takes the characters to be substituted
282     * and a line for doign the substitution with. Fills in jobname
283     */
284     gchar *genexeccmd(gchar *cmd, gchar *fmt, gchar *line, gchar **jobname){
285     gchar *newexec, *ptr;
286     int x;
287     gchar *linesep = " ";
288     gchar *fmtsep = " ";
289    
290     gchar **line_array;
291     gchar **fmt_array;
292    
293     if ( fmt == NULL ){
294     /* No format given - we'll just append the options to the end of the command */
295     if(jobname != NULL) *jobname = g_strdup(line);
296     return g_strdup_printf("%s %s", cmd, line);
297     }
298    
299     line_array = g_strsplit(line, linesep, 0);
300     fmt_array = g_strsplit(fmt, fmtsep, 0);
301    
302     if(jobparsename != NULL){
303     if(jobname != NULL){
304     for(x=0; fmt_array[x] != NULL; x++){
305     if (line_array[x] == NULL) break;
306     if((strcmp(fmt_array[x], jobparsename) == 0)){
307     *jobname = g_strdup(line_array[x]);
308     break;
309     }
310     }
311     }
312     }else{
313     /* Not told us what they want.. We'll just use the first one */
314     *jobname = g_strdup(line_array[0]);
315     }
316    
317     newexec = g_strdup(cmd);
318     for(x=0; line_array[x] != NULL; x++){
319     if (fmt_array[x] == NULL) break;
320     ptr = newexec;
321     newexec = strrep(newexec, fmt_array[x], line_array[x]);
322     free(ptr);
323     }
324    
325    
326     return newexec;
327     }
328    
329     int main(int argc, char **argv){
330    
331     GThreadPool *procpool;
332     GError *pp_err = NULL, *err = NULL;
333     gint x;
334    
335 pajs 1.6 struct rlimit rlp;
336    
337 pajs 1.1 GOptionContext *optcontext;
338    
339     optcontext = g_option_context_new(" - parallel job executer");
340     g_option_context_add_main_entries(optcontext, options, NULL);
341     g_option_context_parse (optcontext, &argc, &argv, &err);
342    
343     if(command == NULL){
344     g_printerr("Command required, see --help for more flags\n");
345     exit(1);
346     }
347    
348     if(verbose){
349     g_printerr("Command '%s'\n", command);
350     g_printerr("Timeout '%d'\n", timeout);
351     g_printerr("Jobs '%d'\n", numthreads);
352     }
353    
354    
355     if(argc < 2 && arglist == NULL){
356     /* We have no arguments */
357     g_printerr("Missing arguments, see --help for details\n");
358     exit(1);
359     }
360    
361     if (!g_thread_supported ()){
362     g_thread_init (NULL);
363     }else{
364     g_printerr("Threading not supported\n");
365     }
366 pajs 1.6
367     /* Up the number of FD's to the "hard" limit.
368     * This is mainly to get around the very small default
369     * solaris has, or 256
370     */
371     getrlimit(RLIMIT_NOFILE, &rlp);
372     rlp.rlim_cur = rlp.rlim_max;
373     setrlimit(RLIMIT_NOFILE, &rlp);
374 pajs 1.1
375     if(verbose) g_printerr("Creating a threadpool %d in size\n", numthreads);
376     procpool = g_thread_pool_new(process_child, NULL, numthreads, FALSE, &pp_err);
377    
378     /* Generate the commands and push the job onto the thread pool */
379     /* If no substituion is needed */
380     if (arglist != NULL){
381     GIOChannel *f;
382     gchar *line;
383     GIOStatus status;
384    
385     f = g_io_channel_new_file(arglist, "r", &err);
386     if (f == NULL){
387     g_printerr("Failed to open argfile: %s\n", err->message);
388     exit(1);
389     }
390     status = g_io_channel_read_line(f, &line, NULL, NULL, &err);
391     while(status==G_IO_STATUS_NORMAL){
392     process_t *newproc = g_new(process_t, 1);
393     newproc->err = NULL;
394    
395     line = g_strstrip(line);
396    
397     newproc->exec = genexeccmd(command, parseformat, line, &(newproc->jobname));
398     proclist = g_list_append(proclist, (gpointer) newproc);
399    
400     if(verbose) g_printerr("Pushing command '%s' into thread pool queue\n", newproc->exec);
401     g_thread_pool_push(procpool, (gpointer) newproc, &(newproc->err));
402    
403     status = g_io_channel_read_line(f, &line, NULL, NULL, &err);
404     }
405     g_io_channel_close(f);
406    
407     }else{
408     /* substition is needed */
409     for(x=1; x<argc; x++){
410     process_t *newproc = g_new(process_t, 1);
411     newproc->err = NULL;
412    
413     newproc->exec = genexeccmd(command, parseformat, argv[x], &(newproc->jobname));
414    
415     proclist = g_list_append(proclist, (gpointer) newproc);
416    
417     if(verbose) g_printerr("Pushing command '%s' into thread pool queue\n", newproc->exec);
418     g_thread_pool_push(procpool, (gpointer) newproc, &(newproc->err));
419     }
420     }
421    
422    
423     /* Wait for the jobs to finish */
424     /* TODO - Kill jobs that don't finish in time */
425     while(g_thread_pool_get_num_threads(procpool) > 0){
426 pajs 1.2 g_usleep(1000);
427 pajs 1.1 }
428    
429     return 0;
430     }