ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/pjob/pjob.c
Revision: 1.5
Committed: Wed Jan 25 20:05:52 2006 UTC (18 years, 3 months ago) by pajs
Content type: text/plain
Branch: MAIN
Changes since 1.4: +1 -1 lines
Log Message:
Whoops - broke all output :)

File Contents

# Content
1 /*
2 * Peter Saunders pjob.c
3 * Copyright (C) 2000-2005 Peter Saunders
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 *
19 */
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <string.h>
25 #include <glib.h>
26 #include <glib/gprintf.h>
27 #include <poll.h>
28 #include <sys/types.h>
29 #include <sys/wait.h>
30
31 #define DEFNUMTHREAD 10
32 #define DEFTIMEOUT 60
33
34 /* Structure for the process to be passed to the executing thread */
35 struct _process_t{
36 gchar *exec;
37 gchar *jobname;
38 GTimer *timer;
39 GPid pid;
40 gint stat_loc;
41 gchar *file_stdout;
42 gchar *file_stderr;
43
44 GError *err;
45 };
46
47 typedef struct _process_t process_t;
48
49 /* Globals for config setup */
50 static gint numthreads = DEFNUMTHREAD;
51 static gboolean verbose = FALSE;
52 static gboolean quiet = FALSE;
53 static gint timeout = DEFTIMEOUT;
54 static gchar *parseformat = NULL;
55 static gchar *jobparsename = NULL;
56 static gchar *command = NULL;
57 static gchar *outdir = NULL;
58 static gchar *infile = NULL;
59 static gchar *arglist = NULL;
60
61 /* Command line options */
62 static GOptionEntry options[] =
63 {
64 { "jobs", 'j', 0, G_OPTION_ARG_INT, &numthreads, "Number of jobs to run in parallel [DEFNUMTHREAD]", NULL },
65 { "verbose", 'v', 0, G_OPTION_ARG_NONE, &verbose, "Be verbose", NULL },
66 { "quiet", 'q', 0, G_OPTION_ARG_NONE, &quiet, "Do not print the output of the commands as they are running", NULL },
67 { "timeout", 't', 0, G_OPTION_ARG_INT, &timeout, "Timeout before process is killed [DEFTIMEOUT]", NULL },
68 { "format", 'f', 0, G_OPTION_ARG_STRING, &parseformat, "The order of substitiution to be used in the command", NULL },
69 { "jobname", 'n', 0, G_OPTION_ARG_STRING, &jobparsename, "If format is used, which variable to use for a output dir", NULL },
70 { "command", 'c', 0, G_OPTION_ARG_STRING, &command, "The command to be executed", NULL },
71 { "output", 'o', 0, G_OPTION_ARG_STRING, &outdir, "Directory to put all output into", NULL },
72 { "stdin", 'i', 0, G_OPTION_ARG_FILENAME, &infile, "Pass contents of filename into stdin of the executing process", NULL },
73 { "argsfile", 'a', 0, G_OPTION_ARG_FILENAME, &arglist, "File for list of argumenst if you dont want to use the command line", NULL },
74 { NULL }
75 };
76
77 /* Linked list of process jobs */
78 GList *proclist = NULL;
79
80 /* Take a process_t and execute it, and doing the "right thing" with the output */
81 void process_child(gpointer data, gpointer user_data){
82 process_t *proc = (process_t*) data;
83 GIOChannel *soutfile[2], *sout[2], *sinfile, *sin;
84 gchar *execargv[4];
85 int outpipes[2], inpipes[2];
86 GError *err = NULL;
87
88 struct pollfd fds[3];
89 gint fdssize=2;
90
91 proc->timer = g_timer_new();
92 g_timer_start(proc->timer);
93
94 /* Setup files in output dir if requested to do so */
95 if(outdir != NULL){
96 proc->file_stdout = g_strdup_printf("%s/%s-STDOUT", outdir, proc->jobname);
97 soutfile[0] = g_io_channel_new_file(proc->file_stdout, "w", &err);
98
99 if(soutfile[0] == NULL){
100 g_printerr("Failed to open %s for writing: %s. Skipping job\n", proc->file_stdout, err->message);
101 return;
102 }
103
104 proc->file_stderr = g_strdup_printf("%s/%s-STDERR", outdir, proc->jobname);
105 soutfile[1] = g_io_channel_new_file(proc->file_stderr, "w", &err);
106
107 if(soutfile[1] == NULL){
108 g_printerr("Failed to open %s for writing: %s. Skipping job\n", proc->file_stderr, err->message);
109 return;
110 }
111
112 }
113
114
115 /* Open stdin file to pass to the process */
116 if(infile != NULL){
117 sinfile = g_io_channel_new_file(infile, "r", NULL);
118 pipe(inpipes);
119 }
120
121 /* Setup argv structure for job */
122 if (verbose) g_fprintf(stderr, "Starting job '%s'\n", proc->jobname);
123 execargv[0] = "/bin/sh";
124 execargv[1] = "-c";
125 execargv[2] = proc->exec;
126 execargv[3] = NULL;
127
128 /* Exec the job */
129 if (infile == NULL){
130 if( ! g_spawn_async_with_pipes(NULL, execargv, NULL, G_SPAWN_DO_NOT_REAP_CHILD, NULL, NULL, &(proc->pid), NULL, &(outpipes[0]), &(outpipes[1]), &err)){
131 g_printerr("Failed to execute job %s: %s\n", proc->jobname, err->message);
132 return;
133 }
134 }else{
135 if( ! g_spawn_async_with_pipes(NULL, execargv, NULL, G_SPAWN_DO_NOT_REAP_CHILD, NULL, NULL, &(proc->pid), &(inpipes[1]), &(outpipes[0]), &(outpipes[1]), &err)){
136 g_printerr("Failed to execute job %s: %s\n", proc->jobname, err->message);
137 return;
138 }
139 close(inpipes[0]);
140 }
141
142
143 /* Make a stream out of the pipes for ease of reading from them */
144 sout[0] = g_io_channel_unix_new(outpipes[0]);
145 sout[1] = g_io_channel_unix_new(outpipes[1]);
146 if(infile != NULL){
147 sin = g_io_channel_unix_new(inpipes[1]);
148 fds[2].fd = inpipes[1];
149 fds[2].events = POLLOUT | POLLHUP;
150 fdssize = 3;
151 }
152
153
154 /* Setup the poll events */
155 fds[0].fd = outpipes[0];
156 fds[1].fd = outpipes[1];
157 fds[0].events = fds[1].events = POLLIN | POLLPRI | POLLHUP;
158 fds[0].revents = fds[1].revents = fds[2].revents = 0;
159
160
161 for(;;){
162 gint x;
163 gchar *readbuf;
164 gint rdatasize, wdatasize;
165 gboolean readdata = FALSE;
166 fds[0].revents = fds[1].revents = fds[2].revents = 0;
167
168 poll(fds, fdssize, -1);
169 /* For stdout and stderr see if there is any data, and read it */
170 for(x=0; x<2; x++){
171 if((fds[x].revents & POLLIN) != 0){
172 /* We have data to read */
173 g_io_channel_read_line(sout[x], &readbuf, &rdatasize, NULL, NULL);
174 if(rdatasize > 0){
175 /* Print it if unless told not to */
176 if(!quiet){
177 g_printf("[%s] [%s] %s", proc->jobname, (x==0) ? "out" : "err", readbuf);
178 }
179 if(outdir != NULL){
180 g_io_channel_write_chars(soutfile[x], readbuf, rdatasize, &wdatasize, NULL);
181 }
182 readdata = TRUE;
183 free(readbuf);
184 }
185
186 }
187 }
188 /* See if we need to pump more data down stdin */
189 if((fds[2].revents & POLLOUT) != 0){
190 /* We have data we can write */
191 gchar *nextline;
192 gint nextlinesize;
193 gint nextlinewritesize;
194 GIOStatus s;
195
196 /* Get the next line, and write it down the stream */
197 s = g_io_channel_read_line(sinfile, &nextline, &nextlinesize, NULL, NULL);
198 if (nextlinesize > 0){
199 g_io_channel_write_chars(sin, nextline, nextlinesize, &nextlinewritesize, NULL);
200 }
201 if (s == G_IO_STATUS_EOF){
202 g_io_channel_shutdown(sin, TRUE, NULL);
203 sin = NULL;
204 fdssize=2;
205 }
206
207 }
208 /* Even if we did get a hangup - lets make sure there is no more data to read first by looping again */
209 if (readdata) continue;
210
211 if(((fds[0].revents & POLLHUP) != 0) && ((fds[1].revents & POLLHUP) != 0)) break;
212 }
213
214 while((waitpid(proc->pid, &(proc->stat_loc), 0)) != proc->pid);
215
216 g_timer_stop(proc->timer);
217
218 /* If process exited cleanly */
219 if (WIFEXITED(proc->stat_loc)){
220 /* Get the exit code */
221 if (verbose) g_fprintf(stderr, "Job '%s' exited with code %d. Exec time: %.2f\n", proc->jobname, WEXITSTATUS(proc->stat_loc), g_timer_elapsed(proc->timer, 0));
222 }else{
223 /* Otherwise - find out what it died with */
224 /* TODO - this doesn't work quite right.. Mainly because its looking at the shell process, so the
225 * child of the /bin/sh which does get a signal, this isn't passed up. Although, it handly tells
226 * us if the /bin/sh gets a SEGV etc ;)
227 */
228 g_fprintf(stderr, "Job %s exited with signal %d. Exec time: %.2f\n", proc->jobname, (WTERMSIG(proc->stat_loc)), g_timer_elapsed(proc->timer, 0));
229 }
230
231
232 g_io_channel_shutdown(sout[0], TRUE, NULL);
233 g_io_channel_shutdown(sout[1], TRUE, NULL);
234
235 if((infile != NULL) && (sin != NULL)){
236 g_io_channel_shutdown(sin, TRUE, NULL);
237 }
238 if (outdir != NULL){
239 g_io_channel_shutdown(soutfile[0], TRUE, NULL);
240 g_io_channel_shutdown(soutfile[1], TRUE, NULL);
241 }
242
243 g_spawn_close_pid(proc->pid);
244
245
246 }
247
248 /* Takes a string str, a search string, find, and string to
249 * replace all occurs of find with, replace. Returns a new
250 * leaving original intact.
251 */
252 gchar *strrep(gchar *str, gchar *find, gchar *replace){
253 gchar *ptr, *oldptr;
254 GString *newstr = g_string_new("");
255 gssize len = strlen(str);
256 gint findlen = strlen(find);
257
258 ptr = g_strstr_len(str, len, find);
259 oldptr=str;
260 while(ptr != NULL){
261 /* Copy in data up to this point */
262 g_string_append_len (newstr, oldptr, (ptr - oldptr));
263 /* Put in the replacement string */
264 g_string_append(newstr, replace);
265
266 oldptr = ptr + findlen;
267 /* BUG - len will now be wrong. But, i only wanted a strstr anyway :) */
268 ptr = g_strstr_len(oldptr, len, find);
269 }
270
271 /* Copy remains */
272 g_string_append_len (newstr, oldptr, (ptr - oldptr));
273
274 ptr = g_string_free(newstr, FALSE);
275
276 return ptr;
277 }
278
279
280 /* Takes a cmd before substitution, takes the characters to be substituted
281 * and a line for doign the substitution with. Fills in jobname
282 */
283 gchar *genexeccmd(gchar *cmd, gchar *fmt, gchar *line, gchar **jobname){
284 gchar *newexec, *ptr;
285 int x;
286 gchar *linesep = " ";
287 gchar *fmtsep = " ";
288
289 gchar **line_array;
290 gchar **fmt_array;
291
292 if ( fmt == NULL ){
293 /* No format given - we'll just append the options to the end of the command */
294 if(jobname != NULL) *jobname = g_strdup(line);
295 return g_strdup_printf("%s %s", cmd, line);
296 }
297
298 line_array = g_strsplit(line, linesep, 0);
299 fmt_array = g_strsplit(fmt, fmtsep, 0);
300
301 if(jobparsename != NULL){
302 if(jobname != NULL){
303 for(x=0; fmt_array[x] != NULL; x++){
304 if (line_array[x] == NULL) break;
305 if((strcmp(fmt_array[x], jobparsename) == 0)){
306 *jobname = g_strdup(line_array[x]);
307 break;
308 }
309 }
310 }
311 }else{
312 /* Not told us what they want.. We'll just use the first one */
313 *jobname = g_strdup(line_array[0]);
314 }
315
316 newexec = g_strdup(cmd);
317 for(x=0; line_array[x] != NULL; x++){
318 if (fmt_array[x] == NULL) break;
319 ptr = newexec;
320 newexec = strrep(newexec, fmt_array[x], line_array[x]);
321 free(ptr);
322 }
323
324
325 return newexec;
326 }
327
328 int main(int argc, char **argv){
329
330 GThreadPool *procpool;
331 GError *pp_err = NULL, *err = NULL;
332 gint x;
333
334 GOptionContext *optcontext;
335
336 optcontext = g_option_context_new(" - parallel job executer");
337 g_option_context_add_main_entries(optcontext, options, NULL);
338 g_option_context_parse (optcontext, &argc, &argv, &err);
339
340 if(command == NULL){
341 g_printerr("Command required, see --help for more flags\n");
342 exit(1);
343 }
344
345 if(verbose){
346 g_printerr("Command '%s'\n", command);
347 g_printerr("Timeout '%d'\n", timeout);
348 g_printerr("Jobs '%d'\n", numthreads);
349 }
350
351
352 if(argc < 2 && arglist == NULL){
353 /* We have no arguments */
354 g_printerr("Missing arguments, see --help for details\n");
355 exit(1);
356 }
357
358 if (!g_thread_supported ()){
359 g_thread_init (NULL);
360 }else{
361 g_printerr("Threading not supported\n");
362 }
363
364 if(verbose) g_printerr("Creating a threadpool %d in size\n", numthreads);
365 procpool = g_thread_pool_new(process_child, NULL, numthreads, FALSE, &pp_err);
366
367 /* Generate the commands and push the job onto the thread pool */
368 /* If no substituion is needed */
369 if (arglist != NULL){
370 GIOChannel *f;
371 gchar *line;
372 GIOStatus status;
373
374 f = g_io_channel_new_file(arglist, "r", &err);
375 if (f == NULL){
376 g_printerr("Failed to open argfile: %s\n", err->message);
377 exit(1);
378 }
379 status = g_io_channel_read_line(f, &line, NULL, NULL, &err);
380 while(status==G_IO_STATUS_NORMAL){
381 process_t *newproc = g_new(process_t, 1);
382 newproc->err = NULL;
383
384 line = g_strstrip(line);
385
386 newproc->exec = genexeccmd(command, parseformat, line, &(newproc->jobname));
387 proclist = g_list_append(proclist, (gpointer) newproc);
388
389 if(verbose) g_printerr("Pushing command '%s' into thread pool queue\n", newproc->exec);
390 g_thread_pool_push(procpool, (gpointer) newproc, &(newproc->err));
391
392 status = g_io_channel_read_line(f, &line, NULL, NULL, &err);
393 }
394 g_io_channel_close(f);
395
396 }else{
397 /* substition is needed */
398 for(x=1; x<argc; x++){
399 process_t *newproc = g_new(process_t, 1);
400 newproc->err = NULL;
401
402 newproc->exec = genexeccmd(command, parseformat, argv[x], &(newproc->jobname));
403
404 proclist = g_list_append(proclist, (gpointer) newproc);
405
406 if(verbose) g_printerr("Pushing command '%s' into thread pool queue\n", newproc->exec);
407 g_thread_pool_push(procpool, (gpointer) newproc, &(newproc->err));
408 }
409 }
410
411
412 /* Wait for the jobs to finish */
413 /* TODO - Kill jobs that don't finish in time */
414 while(g_thread_pool_get_num_threads(procpool) > 0){
415 g_usleep(1000);
416 }
417
418 return 0;
419 }