ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/pjob/pjob.c
Revision: 1.1
Committed: Mon Jan 23 11:16:32 2006 UTC (18 years, 2 months ago) by pajs
Content type: text/plain
Branch: MAIN
Log Message:
Initial version of pjob, a parrallel job runner. (Or Petes job).

Program takes a command, and X arguments. It will run the command with
argument x in parralle, up to Y number of jobs at once. It allows
command substitution, and with an argument file will take several
arguments.

The most basic example is:

pjob -c /bin/echo Azz Pete Tim Fred

This would run '/bin/echo Azz', '/bin/echo Pete' etc etc in parrallel.
A slighltly more detailed example:

pjob -f 'HOSTNAME' -c 'ssh HOSTNAME uptime' myrtle raptor swallow.

This would run

'ssh myrtle uptime', 'ssh raptor uptime', 'ssh swallow uptime' in
parrallel.

A slightly more complex example:

A datafile of:

elm 192.168.0.53 kent.ac.uk
swift 192.168.0.34 ukc.ac.uk
otter 192.168.0.116 ukc.ac.uk

run with the command
pjob -f 'name ip domainname' -c 'adddns -h name -i ip -d domainname' -a datafile

would run

adddns -h elm -i 192.168.0.53 -d kent.ac.uk
adddns -h swift -i 192.168.0.34 ukc.ac.uk
etc etc

pjob be default output all data to the screen with a jobname prefix, and also a [out] or [err] prefix showing
if the output as on stdout or stderr. A stdin file can be specified, and pjob will open this file, and write
its contents into the stdin of the jobs it executes.

Command arguments seem to be a bit unpopular - so may be changed.

File Contents

# User Rev Content
1 pajs 1.1 /*
2     * Peter Saunders pjob.c
3     * Copyright (C) 2000-2005 Peter Saunders
4     *
5     * This program is free software; you can redistribute it and/or
6     * modify it under the terms of the GNU General Public License
7     * as published by the Free Software Foundation; either version 2
8     * of the License, or (at your option) any later version.
9     *
10     * This program is distributed in the hope that it will be useful,
11     * but WITHOUT ANY WARRANTY; without even the implied warranty of
12     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13     * GNU General Public License for more details.
14     *
15     * You should have received a copy of the GNU General Public License
16     * along with this program; if not, write to the Free Software
17     * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18     *
19     */
20    
21     #include <stdio.h>
22     #include <glib.h>
23     #include <unistd.h>
24     #include <poll.h>
25    
26     #define DEFNUMTHREAD 10
27     #define DEFTIMEOUT 60
28    
29     /* Structure for the process to be passed to the executing thread */
30     struct _process_t{
31     gchar *exec;
32     gchar *jobname;
33     GTimer *timer;
34     GPid pid;
35     gchar *file_stdout;
36     gchar *file_stderr;
37    
38     GError *err;
39     };
40    
41     typedef struct _process_t process_t;
42    
43     /* Globals for config setup */
44     static gint numthreads = DEFNUMTHREAD;
45     static gboolean verbose = FALSE;
46     static gboolean quiet = FALSE;
47     static gint timeout = DEFTIMEOUT;
48     static gchar *parseformat = NULL;
49     static gchar *jobparsename = NULL;
50     static gchar *command = NULL;
51     static gchar *outdir = NULL;
52     static gchar *infile = NULL;
53     static gchar *arglist = NULL;
54     static gchar *stdindata = NULL;
55    
56     /* Command line options */
57     static GOptionEntry options[] =
58     {
59     { "jobs", 'j', 0, G_OPTION_ARG_INT, &numthreads, "Number of jobs to run in parallel [DEFNUMTHREAD]", NULL },
60     { "verbose", 'v', 0, G_OPTION_ARG_NONE, &verbose, "Be verbose", NULL },
61     { "quiet", 'q', 0, G_OPTION_ARG_NONE, &quiet, "Do not print the output of the commands as they are running", NULL },
62     { "timeout", 't', 0, G_OPTION_ARG_INT, &timeout, "Timeout before process is killed [DEFTIMEOUT]", NULL },
63     { "format", 'f', 0, G_OPTION_ARG_STRING, &parseformat, "The order of substitiution to be used in the command", NULL },
64     { "jobname", 'n', 0, G_OPTION_ARG_STRING, &jobparsename, "If format is used, which variable to use for a output dir", NULL },
65     { "command", 'c', 0, G_OPTION_ARG_STRING, &command, "The command to be executed", NULL },
66     { "output", 'o', 0, G_OPTION_ARG_STRING, &outdir, "Directory to put all output into", NULL },
67     { "stdin", 'i', 0, G_OPTION_ARG_FILENAME, &infile, "Pass contents of filename into stdin of the executing process", NULL },
68     { "argsfile", 'a', 0, G_OPTION_ARG_FILENAME, &arglist, "File for list of argumenst if you dont want to use the command line", NULL },
69     { NULL }
70     };
71    
72     /* Linked list of process jobs */
73     GList *proclist = NULL;
74    
75     /* Take a process_t and execute it, and doing the "right thing" with the output */
76     void process_child(gpointer data, gpointer user_data){
77     process_t *proc = (process_t*) data;
78     GIOChannel *soutfile[2], *sout[2], *sinfile, *sin;
79     gchar *execargv[4];
80     int outpipes[2], inpipes[2];
81     GError *err = NULL;
82    
83     struct pollfd fds[3];
84     gint fdssize=2;
85    
86     /*if((pipe(outpipes)) != 0){
87     g_printerr("Failed to create pipe\n");
88     return ;
89     }*/
90    
91     /* Setup files in output dir if requested to do so */
92     if(outdir != NULL){
93     proc->file_stdout = g_strdup_printf("%s/%s-STDOUT", outdir, proc->jobname);
94     soutfile[0] = g_io_channel_new_file(proc->file_stdout, "w", &err);
95    
96     if(soutfile[0] == NULL){
97     g_printerr("Failed to open %s for writing: %s. Skipping job\n", proc->file_stdout, err->message);
98     return;
99     }
100    
101     proc->file_stderr = g_strdup_printf("%s/%s-STDERR", outdir, proc->jobname);
102     soutfile[1] = g_io_channel_new_file(proc->file_stderr, "w", &err);
103    
104     if(soutfile[1] == NULL){
105     g_printerr("Failed to open %s for writing: %s. Skipping job\n", proc->file_stderr, err->message);
106     return;
107     }
108    
109     }
110    
111    
112     /* Open stdin file to pass to the process */
113     if(infile != NULL){
114     sinfile = g_io_channel_new_file(infile, "r", NULL);
115     pipe(inpipes);
116     }
117    
118     /* Setup argv structure for job */
119     if (verbose) g_fprintf(stderr, "Starting job '%s'\n", proc->jobname);
120     execargv[0] = "/bin/sh";
121     execargv[1] = "-c";
122     execargv[2] = proc->exec;
123     execargv[3] = NULL;
124    
125     /* Exec the job */
126     if (infile == NULL){
127     if( ! g_spawn_async_with_pipes(NULL, execargv, NULL, 0, NULL, NULL, &(proc->pid), NULL, &(outpipes[0]), &(outpipes[1]), &err)){
128     g_printerr("Failed to execute job %s: %s\n", proc->jobname, err->message);
129     return;
130     }
131     }else{
132     if( ! g_spawn_async_with_pipes(NULL, execargv, NULL, 0, NULL, NULL, &(proc->pid), &(inpipes[1]), &(outpipes[0]), &(outpipes[1]), &err)){
133     g_printerr("Failed to execute job %s: %s\n", proc->jobname, err->message);
134     return;
135     }
136     close(inpipes[0]);
137     }
138    
139    
140     /* Make a stream out of the pipes for ease of reading from them */
141     sout[0] = g_io_channel_unix_new(outpipes[0]);
142     sout[1] = g_io_channel_unix_new(outpipes[1]);
143     if(infile != NULL){
144     sin = g_io_channel_unix_new(inpipes[1]);
145     fds[2].fd = inpipes[1];
146     fds[2].events = POLLOUT | POLLHUP;
147     fdssize = 3;
148     }
149    
150    
151     /* Setup the poll events */
152     fds[0].fd = outpipes[0];
153     fds[1].fd = outpipes[1];
154     fds[0].events = fds[1].events = POLLIN | POLLPRI | POLLHUP;
155     fds[0].revents = fds[1].revents = fds[2].revents = 0;
156    
157    
158     for(;;){
159     gint x;
160     gchar *readbuf;
161     gint rdatasize, wdatasize;
162     gboolean readdata = FALSE;
163     fds[0].revents = fds[1].revents = fds[2].revents = 0;
164    
165     poll(fds, fdssize, -1);
166     /* For stdout and stderr see if there is any data, and read it */
167     for(x=0; x<2; x++){
168     if(fds[x].revents|POLLIN == fds[x].revents){
169     /* We have data to read */
170     g_io_channel_read_line(sout[x], &readbuf, &rdatasize, NULL, NULL);
171     if(rdatasize > 0){
172     /* Print it if unless told not to */
173     if(!quiet){
174     g_printf("[%s] [%s] %s", proc->jobname, (x==0) ? "out" : "err", readbuf);
175     }
176     if(outdir != NULL){
177     g_io_channel_write_chars(soutfile[x], readbuf, rdatasize, &wdatasize, NULL);
178     }
179     readdata = TRUE;
180     free(readbuf);
181     }
182    
183     }
184     }
185     /* See if we need to pump more data down stdin */
186     if(fds[2].revents|POLLOUT == fds[2].revents){
187     /* We have data we can write */
188     gchar *nextline;
189     gint nextlinesize;
190     gint nextlinewritesize;
191     GIOStatus s;
192    
193     /* Get the next line, and write it down the stream */
194     s = g_io_channel_read_line(sinfile, &nextline, &nextlinesize, NULL, NULL);
195     if (nextlinesize > 0){
196     printf("Going to write '%s'\n", nextline);
197     g_io_channel_write_chars(sin, nextline, nextlinesize, &nextlinewritesize, NULL);
198     }
199     if (s == G_IO_STATUS_EOF){
200     g_io_channel_shutdown(sin, TRUE, NULL);
201     sin == NULL;
202     fdssize=2;
203     }
204    
205     }
206     /* Even if we did get a hangup - lets make sure there is no more data to read first by looping again */
207     if (readdata) continue;
208    
209     if((fds[0].revents|POLLHUP == fds[0].revents) && (fds[1].revents|POLLHUP == fds[1].revents)) break;
210     }
211    
212     /* For some batty reason, it starts with a ref count of 1. Lets decrement it so we can shut it */
213     /*g_io_channel_unref(sout[0]);
214     g_io_channel_unref(sout[1]);*/
215    
216    
217     g_io_channel_shutdown(sout[0], TRUE, NULL);
218     g_io_channel_shutdown(sout[1], TRUE, NULL);
219     /*close(outpipes[0]);
220     close(outpipes[1]);*/
221    
222     /* BUG - Causes glib error saying its already closed */
223     if((infile != NULL) && (sin != NULL)){
224     g_io_channel_shutdown(sin, TRUE, NULL);
225     }
226     if (outdir != NULL){
227     g_io_channel_shutdown(soutfile[0], TRUE, NULL);
228     g_io_channel_shutdown(soutfile[1], TRUE, NULL);
229     }
230    
231     g_spawn_close_pid(proc->pid);
232    
233     if (verbose) g_fprintf(stderr, "Ending job '%s'\n", proc->jobname);
234    
235     }
236    
237     /* Takes a string str, a search string, find, and string to
238     * replace all occurs of find with, replace. Returns a new
239     * leaving original intact.
240     */
241     gchar *strrep(gchar *str, gchar *find, gchar *replace){
242     gchar *ptr, *oldptr;
243     GString *newstr = g_string_new("");
244     gssize len = strlen(str);
245     gint findlen = strlen(find);
246    
247     ptr = g_strstr_len(str, len, find);
248     oldptr=str;
249     while(ptr != NULL){
250     /* Copy in data up to this point */
251     g_string_append_len (newstr, oldptr, (ptr - oldptr));
252     /* Put in the replacement string */
253     g_string_append(newstr, replace);
254    
255     oldptr = ptr + findlen;
256     /* BUG - len will now be wrong. But, i only wanted a strstr anyway :) */
257     ptr = g_strstr_len(oldptr, len, find);
258     }
259    
260     /* Copy remains */
261     g_string_append_len (newstr, oldptr, (ptr - oldptr));
262    
263     ptr = g_string_free(newstr, FALSE);
264    
265     return ptr;
266     }
267    
268    
269     /* Takes a cmd before substitution, takes the characters to be substituted
270     * and a line for doign the substitution with. Fills in jobname
271     */
272     gchar *genexeccmd(gchar *cmd, gchar *fmt, gchar *line, gchar **jobname){
273     gchar **fmttok;
274     gchar *newexec, *ptr;
275     int x;
276     gchar *linesep = " ";
277     gchar *fmtsep = " ";
278    
279     gchar **line_array;
280     gchar **fmt_array;
281    
282     if ( fmt == NULL ){
283     /* No format given - we'll just append the options to the end of the command */
284     if(jobname != NULL) *jobname = g_strdup(line);
285     return g_strdup_printf("%s %s", cmd, line);
286     }
287    
288     line_array = g_strsplit(line, linesep, 0);
289     fmt_array = g_strsplit(fmt, fmtsep, 0);
290    
291     if(jobparsename != NULL){
292     if(jobname != NULL){
293     for(x=0; fmt_array[x] != NULL; x++){
294     if (line_array[x] == NULL) break;
295     if((strcmp(fmt_array[x], jobparsename) == 0)){
296     *jobname = g_strdup(line_array[x]);
297     break;
298     }
299     }
300     }
301     }else{
302     /* Not told us what they want.. We'll just use the first one */
303     *jobname = g_strdup(line_array[0]);
304     }
305    
306     newexec = g_strdup(cmd);
307     for(x=0; line_array[x] != NULL; x++){
308     if (fmt_array[x] == NULL) break;
309     ptr = newexec;
310     newexec = strrep(newexec, fmt_array[x], line_array[x]);
311     free(ptr);
312     }
313    
314    
315     return newexec;
316     }
317    
318     int main(int argc, char **argv){
319    
320     GThreadPool *procpool;
321     GError *pp_err = NULL, *err = NULL;
322     gint x;
323    
324     GOptionContext *optcontext;
325    
326     optcontext = g_option_context_new(" - parallel job executer");
327     g_option_context_add_main_entries(optcontext, options, NULL);
328     g_option_context_parse (optcontext, &argc, &argv, &err);
329    
330     if(command == NULL){
331     g_printerr("Command required, see --help for more flags\n");
332     exit(1);
333     }
334    
335     if(verbose){
336     g_printerr("Command '%s'\n", command);
337     g_printerr("Timeout '%d'\n", timeout);
338     g_printerr("Jobs '%d'\n", numthreads);
339     }
340    
341    
342     if(argc < 2 && arglist == NULL){
343     /* We have no arguments */
344     g_printerr("Missing arguments, see --help for details\n");
345     exit(1);
346     }
347    
348     if (!g_thread_supported ()){
349     g_thread_init (NULL);
350     }else{
351     g_printerr("Threading not supported\n");
352     }
353    
354     if(verbose) g_printerr("Creating a threadpool %d in size\n", numthreads);
355     procpool = g_thread_pool_new(process_child, NULL, numthreads, FALSE, &pp_err);
356    
357     /* Generate the commands and push the job onto the thread pool */
358     /* If no substituion is needed */
359     if (arglist != NULL){
360     GIOChannel *f;
361     gchar *line;
362     GIOStatus status;
363    
364     f = g_io_channel_new_file(arglist, "r", &err);
365     if (f == NULL){
366     g_printerr("Failed to open argfile: %s\n", err->message);
367     exit(1);
368     }
369     status = g_io_channel_read_line(f, &line, NULL, NULL, &err);
370     while(status==G_IO_STATUS_NORMAL){
371     process_t *newproc = g_new(process_t, 1);
372     newproc->err = NULL;
373    
374     line = g_strstrip(line);
375    
376     newproc->exec = genexeccmd(command, parseformat, line, &(newproc->jobname));
377     proclist = g_list_append(proclist, (gpointer) newproc);
378    
379     if(verbose) g_printerr("Pushing command '%s' into thread pool queue\n", newproc->exec);
380     g_thread_pool_push(procpool, (gpointer) newproc, &(newproc->err));
381    
382     status = g_io_channel_read_line(f, &line, NULL, NULL, &err);
383     }
384     g_io_channel_close(f);
385    
386     }else{
387     /* substition is needed */
388     for(x=1; x<argc; x++){
389     process_t *newproc = g_new(process_t, 1);
390     newproc->err = NULL;
391    
392     newproc->exec = genexeccmd(command, parseformat, argv[x], &(newproc->jobname));
393    
394     proclist = g_list_append(proclist, (gpointer) newproc);
395    
396     if(verbose) g_printerr("Pushing command '%s' into thread pool queue\n", newproc->exec);
397     g_thread_pool_push(procpool, (gpointer) newproc, &(newproc->err));
398     }
399     }
400    
401    
402     /* Wait for the jobs to finish */
403     /* TODO - Kill jobs that don't finish in time */
404     while(g_thread_pool_get_num_threads(procpool) > 0){
405     sleep(1);
406     }
407    
408     return 0;
409     }