ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/pjob/pjob.c
Revision: 1.2
Committed: Mon Jan 23 22:12:39 2006 UTC (18 years, 10 months ago) by pajs
Content type: text/plain
Branch: MAIN
Changes since 1.1: +9 -22 lines
Log Message:
-Wall fixes - including 1 or 2 bugs :) - Thanks to Tim for this one.

File Contents

# User Rev Content
1 pajs 1.1 /*
2     * Peter Saunders pjob.c
3     * Copyright (C) 2000-2005 Peter Saunders
4     *
5     * This program is free software; you can redistribute it and/or
6     * modify it under the terms of the GNU General Public License
7     * as published by the Free Software Foundation; either version 2
8     * of the License, or (at your option) any later version.
9     *
10     * This program is distributed in the hope that it will be useful,
11     * but WITHOUT ANY WARRANTY; without even the implied warranty of
12     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13     * GNU General Public License for more details.
14     *
15     * You should have received a copy of the GNU General Public License
16     * along with this program; if not, write to the Free Software
17     * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18     *
19     */
20    
21     #include <stdio.h>
22 pajs 1.2 #include <stdlib.h>
23     #include <unistd.h>
24     #include <string.h>
25 pajs 1.1 #include <glib.h>
26 pajs 1.2 #include <glib/gprintf.h>
27 pajs 1.1 #include <poll.h>
28    
29     #define DEFNUMTHREAD 10
30     #define DEFTIMEOUT 60
31    
32     /* Structure for the process to be passed to the executing thread */
33     struct _process_t{
34     gchar *exec;
35     gchar *jobname;
36     GTimer *timer;
37     GPid pid;
38     gchar *file_stdout;
39     gchar *file_stderr;
40    
41     GError *err;
42     };
43    
44     typedef struct _process_t process_t;
45    
46     /* Globals for config setup */
47     static gint numthreads = DEFNUMTHREAD;
48     static gboolean verbose = FALSE;
49     static gboolean quiet = FALSE;
50     static gint timeout = DEFTIMEOUT;
51     static gchar *parseformat = NULL;
52     static gchar *jobparsename = NULL;
53     static gchar *command = NULL;
54     static gchar *outdir = NULL;
55     static gchar *infile = NULL;
56     static gchar *arglist = NULL;
57    
58     /* Command line options */
59     static GOptionEntry options[] =
60     {
61     { "jobs", 'j', 0, G_OPTION_ARG_INT, &numthreads, "Number of jobs to run in parallel [DEFNUMTHREAD]", NULL },
62     { "verbose", 'v', 0, G_OPTION_ARG_NONE, &verbose, "Be verbose", NULL },
63     { "quiet", 'q', 0, G_OPTION_ARG_NONE, &quiet, "Do not print the output of the commands as they are running", NULL },
64     { "timeout", 't', 0, G_OPTION_ARG_INT, &timeout, "Timeout before process is killed [DEFTIMEOUT]", NULL },
65     { "format", 'f', 0, G_OPTION_ARG_STRING, &parseformat, "The order of substitiution to be used in the command", NULL },
66     { "jobname", 'n', 0, G_OPTION_ARG_STRING, &jobparsename, "If format is used, which variable to use for a output dir", NULL },
67     { "command", 'c', 0, G_OPTION_ARG_STRING, &command, "The command to be executed", NULL },
68     { "output", 'o', 0, G_OPTION_ARG_STRING, &outdir, "Directory to put all output into", NULL },
69     { "stdin", 'i', 0, G_OPTION_ARG_FILENAME, &infile, "Pass contents of filename into stdin of the executing process", NULL },
70     { "argsfile", 'a', 0, G_OPTION_ARG_FILENAME, &arglist, "File for list of argumenst if you dont want to use the command line", NULL },
71     { NULL }
72     };
73    
74     /* Linked list of process jobs */
75     GList *proclist = NULL;
76    
77     /* Take a process_t and execute it, and doing the "right thing" with the output */
78     void process_child(gpointer data, gpointer user_data){
79     process_t *proc = (process_t*) data;
80     GIOChannel *soutfile[2], *sout[2], *sinfile, *sin;
81     gchar *execargv[4];
82     int outpipes[2], inpipes[2];
83     GError *err = NULL;
84    
85     struct pollfd fds[3];
86     gint fdssize=2;
87    
88     /* Setup files in output dir if requested to do so */
89     if(outdir != NULL){
90     proc->file_stdout = g_strdup_printf("%s/%s-STDOUT", outdir, proc->jobname);
91     soutfile[0] = g_io_channel_new_file(proc->file_stdout, "w", &err);
92    
93     if(soutfile[0] == NULL){
94     g_printerr("Failed to open %s for writing: %s. Skipping job\n", proc->file_stdout, err->message);
95     return;
96     }
97    
98     proc->file_stderr = g_strdup_printf("%s/%s-STDERR", outdir, proc->jobname);
99     soutfile[1] = g_io_channel_new_file(proc->file_stderr, "w", &err);
100    
101     if(soutfile[1] == NULL){
102     g_printerr("Failed to open %s for writing: %s. Skipping job\n", proc->file_stderr, err->message);
103     return;
104     }
105    
106     }
107    
108    
109     /* Open stdin file to pass to the process */
110     if(infile != NULL){
111     sinfile = g_io_channel_new_file(infile, "r", NULL);
112     pipe(inpipes);
113     }
114    
115     /* Setup argv structure for job */
116     if (verbose) g_fprintf(stderr, "Starting job '%s'\n", proc->jobname);
117     execargv[0] = "/bin/sh";
118     execargv[1] = "-c";
119     execargv[2] = proc->exec;
120     execargv[3] = NULL;
121    
122     /* Exec the job */
123     if (infile == NULL){
124     if( ! g_spawn_async_with_pipes(NULL, execargv, NULL, 0, NULL, NULL, &(proc->pid), NULL, &(outpipes[0]), &(outpipes[1]), &err)){
125     g_printerr("Failed to execute job %s: %s\n", proc->jobname, err->message);
126     return;
127     }
128     }else{
129     if( ! g_spawn_async_with_pipes(NULL, execargv, NULL, 0, NULL, NULL, &(proc->pid), &(inpipes[1]), &(outpipes[0]), &(outpipes[1]), &err)){
130     g_printerr("Failed to execute job %s: %s\n", proc->jobname, err->message);
131     return;
132     }
133     close(inpipes[0]);
134     }
135    
136    
137     /* Make a stream out of the pipes for ease of reading from them */
138     sout[0] = g_io_channel_unix_new(outpipes[0]);
139     sout[1] = g_io_channel_unix_new(outpipes[1]);
140     if(infile != NULL){
141     sin = g_io_channel_unix_new(inpipes[1]);
142     fds[2].fd = inpipes[1];
143     fds[2].events = POLLOUT | POLLHUP;
144     fdssize = 3;
145     }
146    
147    
148     /* Setup the poll events */
149     fds[0].fd = outpipes[0];
150     fds[1].fd = outpipes[1];
151     fds[0].events = fds[1].events = POLLIN | POLLPRI | POLLHUP;
152     fds[0].revents = fds[1].revents = fds[2].revents = 0;
153    
154    
155     for(;;){
156     gint x;
157     gchar *readbuf;
158     gint rdatasize, wdatasize;
159     gboolean readdata = FALSE;
160     fds[0].revents = fds[1].revents = fds[2].revents = 0;
161    
162     poll(fds, fdssize, -1);
163     /* For stdout and stderr see if there is any data, and read it */
164     for(x=0; x<2; x++){
165 pajs 1.2 if((fds[x].revents|POLLIN) == fds[x].revents){
166 pajs 1.1 /* We have data to read */
167     g_io_channel_read_line(sout[x], &readbuf, &rdatasize, NULL, NULL);
168     if(rdatasize > 0){
169     /* Print it if unless told not to */
170     if(!quiet){
171     g_printf("[%s] [%s] %s", proc->jobname, (x==0) ? "out" : "err", readbuf);
172     }
173     if(outdir != NULL){
174     g_io_channel_write_chars(soutfile[x], readbuf, rdatasize, &wdatasize, NULL);
175     }
176     readdata = TRUE;
177     free(readbuf);
178     }
179    
180     }
181     }
182     /* See if we need to pump more data down stdin */
183 pajs 1.2 if((fds[2].revents|POLLOUT) == fds[2].revents){
184 pajs 1.1 /* We have data we can write */
185     gchar *nextline;
186     gint nextlinesize;
187     gint nextlinewritesize;
188     GIOStatus s;
189    
190     /* Get the next line, and write it down the stream */
191     s = g_io_channel_read_line(sinfile, &nextline, &nextlinesize, NULL, NULL);
192     if (nextlinesize > 0){
193     g_io_channel_write_chars(sin, nextline, nextlinesize, &nextlinewritesize, NULL);
194     }
195     if (s == G_IO_STATUS_EOF){
196     g_io_channel_shutdown(sin, TRUE, NULL);
197 pajs 1.2 sin = NULL;
198 pajs 1.1 fdssize=2;
199     }
200    
201     }
202     /* Even if we did get a hangup - lets make sure there is no more data to read first by looping again */
203     if (readdata) continue;
204    
205 pajs 1.2 if(((fds[0].revents|POLLHUP) == fds[0].revents) && ((fds[1].revents|POLLHUP) == fds[1].revents)) break;
206 pajs 1.1 }
207    
208     g_io_channel_shutdown(sout[0], TRUE, NULL);
209     g_io_channel_shutdown(sout[1], TRUE, NULL);
210    
211     if((infile != NULL) && (sin != NULL)){
212     g_io_channel_shutdown(sin, TRUE, NULL);
213     }
214     if (outdir != NULL){
215     g_io_channel_shutdown(soutfile[0], TRUE, NULL);
216     g_io_channel_shutdown(soutfile[1], TRUE, NULL);
217     }
218    
219     g_spawn_close_pid(proc->pid);
220    
221     if (verbose) g_fprintf(stderr, "Ending job '%s'\n", proc->jobname);
222    
223     }
224    
225     /* Takes a string str, a search string, find, and string to
226     * replace all occurs of find with, replace. Returns a new
227     * leaving original intact.
228     */
229     gchar *strrep(gchar *str, gchar *find, gchar *replace){
230     gchar *ptr, *oldptr;
231     GString *newstr = g_string_new("");
232     gssize len = strlen(str);
233     gint findlen = strlen(find);
234    
235     ptr = g_strstr_len(str, len, find);
236     oldptr=str;
237     while(ptr != NULL){
238     /* Copy in data up to this point */
239     g_string_append_len (newstr, oldptr, (ptr - oldptr));
240     /* Put in the replacement string */
241     g_string_append(newstr, replace);
242    
243     oldptr = ptr + findlen;
244     /* BUG - len will now be wrong. But, i only wanted a strstr anyway :) */
245     ptr = g_strstr_len(oldptr, len, find);
246     }
247    
248     /* Copy remains */
249     g_string_append_len (newstr, oldptr, (ptr - oldptr));
250    
251     ptr = g_string_free(newstr, FALSE);
252    
253     return ptr;
254     }
255    
256    
257     /* Takes a cmd before substitution, takes the characters to be substituted
258     * and a line for doign the substitution with. Fills in jobname
259     */
260     gchar *genexeccmd(gchar *cmd, gchar *fmt, gchar *line, gchar **jobname){
261     gchar *newexec, *ptr;
262     int x;
263     gchar *linesep = " ";
264     gchar *fmtsep = " ";
265    
266     gchar **line_array;
267     gchar **fmt_array;
268    
269     if ( fmt == NULL ){
270     /* No format given - we'll just append the options to the end of the command */
271     if(jobname != NULL) *jobname = g_strdup(line);
272     return g_strdup_printf("%s %s", cmd, line);
273     }
274    
275     line_array = g_strsplit(line, linesep, 0);
276     fmt_array = g_strsplit(fmt, fmtsep, 0);
277    
278     if(jobparsename != NULL){
279     if(jobname != NULL){
280     for(x=0; fmt_array[x] != NULL; x++){
281     if (line_array[x] == NULL) break;
282     if((strcmp(fmt_array[x], jobparsename) == 0)){
283     *jobname = g_strdup(line_array[x]);
284     break;
285     }
286     }
287     }
288     }else{
289     /* Not told us what they want.. We'll just use the first one */
290     *jobname = g_strdup(line_array[0]);
291     }
292    
293     newexec = g_strdup(cmd);
294     for(x=0; line_array[x] != NULL; x++){
295     if (fmt_array[x] == NULL) break;
296     ptr = newexec;
297     newexec = strrep(newexec, fmt_array[x], line_array[x]);
298     free(ptr);
299     }
300    
301    
302     return newexec;
303     }
304    
305     int main(int argc, char **argv){
306    
307     GThreadPool *procpool;
308     GError *pp_err = NULL, *err = NULL;
309     gint x;
310    
311     GOptionContext *optcontext;
312    
313     optcontext = g_option_context_new(" - parallel job executer");
314     g_option_context_add_main_entries(optcontext, options, NULL);
315     g_option_context_parse (optcontext, &argc, &argv, &err);
316    
317     if(command == NULL){
318     g_printerr("Command required, see --help for more flags\n");
319     exit(1);
320     }
321    
322     if(verbose){
323     g_printerr("Command '%s'\n", command);
324     g_printerr("Timeout '%d'\n", timeout);
325     g_printerr("Jobs '%d'\n", numthreads);
326     }
327    
328    
329     if(argc < 2 && arglist == NULL){
330     /* We have no arguments */
331     g_printerr("Missing arguments, see --help for details\n");
332     exit(1);
333     }
334    
335     if (!g_thread_supported ()){
336     g_thread_init (NULL);
337     }else{
338     g_printerr("Threading not supported\n");
339     }
340    
341     if(verbose) g_printerr("Creating a threadpool %d in size\n", numthreads);
342     procpool = g_thread_pool_new(process_child, NULL, numthreads, FALSE, &pp_err);
343    
344     /* Generate the commands and push the job onto the thread pool */
345     /* If no substituion is needed */
346     if (arglist != NULL){
347     GIOChannel *f;
348     gchar *line;
349     GIOStatus status;
350    
351     f = g_io_channel_new_file(arglist, "r", &err);
352     if (f == NULL){
353     g_printerr("Failed to open argfile: %s\n", err->message);
354     exit(1);
355     }
356     status = g_io_channel_read_line(f, &line, NULL, NULL, &err);
357     while(status==G_IO_STATUS_NORMAL){
358     process_t *newproc = g_new(process_t, 1);
359     newproc->err = NULL;
360    
361     line = g_strstrip(line);
362    
363     newproc->exec = genexeccmd(command, parseformat, line, &(newproc->jobname));
364     proclist = g_list_append(proclist, (gpointer) newproc);
365    
366     if(verbose) g_printerr("Pushing command '%s' into thread pool queue\n", newproc->exec);
367     g_thread_pool_push(procpool, (gpointer) newproc, &(newproc->err));
368    
369     status = g_io_channel_read_line(f, &line, NULL, NULL, &err);
370     }
371     g_io_channel_close(f);
372    
373     }else{
374     /* substition is needed */
375     for(x=1; x<argc; x++){
376     process_t *newproc = g_new(process_t, 1);
377     newproc->err = NULL;
378    
379     newproc->exec = genexeccmd(command, parseformat, argv[x], &(newproc->jobname));
380    
381     proclist = g_list_append(proclist, (gpointer) newproc);
382    
383     if(verbose) g_printerr("Pushing command '%s' into thread pool queue\n", newproc->exec);
384     g_thread_pool_push(procpool, (gpointer) newproc, &(newproc->err));
385     }
386     }
387    
388    
389     /* Wait for the jobs to finish */
390     /* TODO - Kill jobs that don't finish in time */
391     while(g_thread_pool_get_num_threads(procpool) > 0){
392 pajs 1.2 g_usleep(1000);
393 pajs 1.1 }
394    
395     return 0;
396     }