| 1 |
/* |
| 2 |
* Peter Saunders pjob.c |
| 3 |
* Copyright (C) 2000-2005 Peter Saunders |
| 4 |
* |
| 5 |
* This program is free software; you can redistribute it and/or |
| 6 |
* modify it under the terms of the GNU General Public License |
| 7 |
* as published by the Free Software Foundation; either version 2 |
| 8 |
* of the License, or (at your option) any later version. |
| 9 |
* |
| 10 |
* This program is distributed in the hope that it will be useful, |
| 11 |
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 12 |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 13 |
* GNU General Public License for more details. |
| 14 |
* |
| 15 |
* You should have received a copy of the GNU General Public License |
| 16 |
* along with this program; if not, write to the Free Software |
| 17 |
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. |
| 18 |
* |
| 19 |
*/ |
| 20 |
|
| 21 |
#include <stdio.h> |
| 22 |
#include <stdlib.h> |
| 23 |
#include <unistd.h> |
| 24 |
#include <string.h> |
| 25 |
#include <glib.h> |
| 26 |
#include <glib/gprintf.h> |
| 27 |
#include <poll.h> |
| 28 |
#include <sys/types.h> |
| 29 |
#include <sys/wait.h> |
| 30 |
#include <sys/resource.h> |
| 31 |
|
| 32 |
#define DEFNUMTHREAD 10 |
| 33 |
#define DEFTIMEOUT 60 |
| 34 |
|
| 35 |
/* Structure for the process to be passed to the executing thread */ |
| 36 |
struct _process_t{ |
| 37 |
gchar *exec; |
| 38 |
gchar *jobname; |
| 39 |
GTimer *timer; |
| 40 |
GPid pid; |
| 41 |
gint stat_loc; |
| 42 |
gchar *file_stdout; |
| 43 |
gchar *file_stderr; |
| 44 |
|
| 45 |
GError *err; |
| 46 |
}; |
| 47 |
|
| 48 |
typedef struct _process_t process_t; |
| 49 |
|
| 50 |
/* Globals for config setup */ |
| 51 |
static gint numthreads = DEFNUMTHREAD; |
| 52 |
static gboolean verbose = FALSE; |
| 53 |
static gboolean quiet = FALSE; |
| 54 |
static gint timeout = DEFTIMEOUT; |
| 55 |
static gchar *parseformat = NULL; |
| 56 |
static gchar *jobparsename = NULL; |
| 57 |
static gchar *command = NULL; |
| 58 |
static gchar *outdir = NULL; |
| 59 |
static gchar *infile = NULL; |
| 60 |
static gchar *arglist = NULL; |
| 61 |
|
| 62 |
/* Command line options */ |
| 63 |
static GOptionEntry options[] = |
| 64 |
{ |
| 65 |
{ "jobs", 'j', 0, G_OPTION_ARG_INT, &numthreads, "Number of jobs to run in parallel [DEFNUMTHREAD]", NULL }, |
| 66 |
{ "verbose", 'v', 0, G_OPTION_ARG_NONE, &verbose, "Be verbose", NULL }, |
| 67 |
{ "quiet", 'q', 0, G_OPTION_ARG_NONE, &quiet, "Do not print the output of the commands as they are running", NULL }, |
| 68 |
{ "timeout", 't', 0, G_OPTION_ARG_INT, &timeout, "Timeout before process is killed [DEFTIMEOUT]", NULL }, |
| 69 |
{ "format", 'f', 0, G_OPTION_ARG_STRING, &parseformat, "The order of substitiution to be used in the command", NULL }, |
| 70 |
{ "jobname", 'n', 0, G_OPTION_ARG_STRING, &jobparsename, "If format is used, which variable to use for a output dir", NULL }, |
| 71 |
{ "command", 'c', 0, G_OPTION_ARG_STRING, &command, "The command to be executed", NULL }, |
| 72 |
{ "output", 'o', 0, G_OPTION_ARG_STRING, &outdir, "Directory to put all output into", NULL }, |
| 73 |
{ "stdin", 'i', 0, G_OPTION_ARG_FILENAME, &infile, "Pass contents of filename into stdin of the executing process", NULL }, |
| 74 |
{ "argsfile", 'a', 0, G_OPTION_ARG_FILENAME, &arglist, "File for list of argumenst if you dont want to use the command line", NULL }, |
| 75 |
{ NULL } |
| 76 |
}; |
| 77 |
|
| 78 |
/* Linked list of process jobs */ |
| 79 |
GList *proclist = NULL; |
| 80 |
|
| 81 |
/* Take a process_t and execute it, and doing the "right thing" with the output */ |
| 82 |
void process_child(gpointer data, gpointer user_data){ |
| 83 |
process_t *proc = (process_t*) data; |
| 84 |
GIOChannel *soutfile[2], *sout[2], *sinfile, *sin; |
| 85 |
gchar *execargv[4]; |
| 86 |
int outpipes[2], inpipes[2]; |
| 87 |
GError *err = NULL; |
| 88 |
|
| 89 |
struct pollfd fds[3]; |
| 90 |
gint fdssize=2; |
| 91 |
|
| 92 |
proc->timer = g_timer_new(); |
| 93 |
g_timer_start(proc->timer); |
| 94 |
|
| 95 |
/* Setup files in output dir if requested to do so */ |
| 96 |
if(outdir != NULL){ |
| 97 |
proc->file_stdout = g_strdup_printf("%s/%s-STDOUT", outdir, proc->jobname); |
| 98 |
soutfile[0] = g_io_channel_new_file(proc->file_stdout, "w", &err); |
| 99 |
|
| 100 |
if(soutfile[0] == NULL){ |
| 101 |
g_printerr("Failed to open %s for writing: %s. Skipping job\n", proc->file_stdout, err->message); |
| 102 |
return; |
| 103 |
} |
| 104 |
|
| 105 |
proc->file_stderr = g_strdup_printf("%s/%s-STDERR", outdir, proc->jobname); |
| 106 |
soutfile[1] = g_io_channel_new_file(proc->file_stderr, "w", &err); |
| 107 |
|
| 108 |
if(soutfile[1] == NULL){ |
| 109 |
g_printerr("Failed to open %s for writing: %s. Skipping job\n", proc->file_stderr, err->message); |
| 110 |
return; |
| 111 |
} |
| 112 |
|
| 113 |
} |
| 114 |
|
| 115 |
|
| 116 |
/* Open stdin file to pass to the process */ |
| 117 |
if(infile != NULL){ |
| 118 |
sinfile = g_io_channel_new_file(infile, "r", NULL); |
| 119 |
pipe(inpipes); |
| 120 |
} |
| 121 |
|
| 122 |
/* Setup argv structure for job */ |
| 123 |
if (verbose) g_fprintf(stderr, "Starting job '%s'\n", proc->jobname); |
| 124 |
execargv[0] = "/bin/sh"; |
| 125 |
execargv[1] = "-c"; |
| 126 |
execargv[2] = proc->exec; |
| 127 |
execargv[3] = NULL; |
| 128 |
|
| 129 |
/* Exec the job */ |
| 130 |
if (infile == NULL){ |
| 131 |
if( ! g_spawn_async_with_pipes(NULL, execargv, NULL, G_SPAWN_DO_NOT_REAP_CHILD, NULL, NULL, &(proc->pid), NULL, &(outpipes[0]), &(outpipes[1]), &err)){ |
| 132 |
g_printerr("Failed to execute job %s: %s\n", proc->jobname, err->message); |
| 133 |
return; |
| 134 |
} |
| 135 |
}else{ |
| 136 |
if( ! g_spawn_async_with_pipes(NULL, execargv, NULL, G_SPAWN_DO_NOT_REAP_CHILD, NULL, NULL, &(proc->pid), &(inpipes[1]), &(outpipes[0]), &(outpipes[1]), &err)){ |
| 137 |
g_printerr("Failed to execute job %s: %s\n", proc->jobname, err->message); |
| 138 |
return; |
| 139 |
} |
| 140 |
close(inpipes[0]); |
| 141 |
} |
| 142 |
|
| 143 |
|
| 144 |
/* Make a stream out of the pipes for ease of reading from them */ |
| 145 |
sout[0] = g_io_channel_unix_new(outpipes[0]); |
| 146 |
sout[1] = g_io_channel_unix_new(outpipes[1]); |
| 147 |
if(infile != NULL){ |
| 148 |
sin = g_io_channel_unix_new(inpipes[1]); |
| 149 |
fds[2].fd = inpipes[1]; |
| 150 |
fds[2].events = POLLOUT | POLLHUP; |
| 151 |
fdssize = 3; |
| 152 |
} |
| 153 |
|
| 154 |
|
| 155 |
/* Setup the poll events */ |
| 156 |
fds[0].fd = outpipes[0]; |
| 157 |
fds[1].fd = outpipes[1]; |
| 158 |
fds[0].events = fds[1].events = POLLIN | POLLPRI | POLLHUP; |
| 159 |
fds[0].revents = fds[1].revents = fds[2].revents = 0; |
| 160 |
|
| 161 |
|
| 162 |
for(;;){ |
| 163 |
gint x; |
| 164 |
gchar *readbuf; |
| 165 |
gint rdatasize, wdatasize; |
| 166 |
gboolean readdata = FALSE; |
| 167 |
fds[0].revents = fds[1].revents = fds[2].revents = 0; |
| 168 |
|
| 169 |
poll(fds, fdssize, -1); |
| 170 |
/* For stdout and stderr see if there is any data, and read it */ |
| 171 |
for(x=0; x<2; x++){ |
| 172 |
if((fds[x].revents & POLLIN) != 0){ |
| 173 |
/* We have data to read */ |
| 174 |
g_io_channel_read_line(sout[x], &readbuf, &rdatasize, NULL, NULL); |
| 175 |
if(rdatasize > 0){ |
| 176 |
/* Print it if unless told not to */ |
| 177 |
if(!quiet){ |
| 178 |
g_printf("[%s] [%s] %s", proc->jobname, (x==0) ? "out" : "err", readbuf); |
| 179 |
} |
| 180 |
if(outdir != NULL){ |
| 181 |
g_io_channel_write_chars(soutfile[x], readbuf, rdatasize, &wdatasize, NULL); |
| 182 |
} |
| 183 |
readdata = TRUE; |
| 184 |
free(readbuf); |
| 185 |
} |
| 186 |
|
| 187 |
} |
| 188 |
} |
| 189 |
/* See if we need to pump more data down stdin */ |
| 190 |
if((fds[2].revents & POLLOUT) != 0){ |
| 191 |
/* We have data we can write */ |
| 192 |
gchar *nextline; |
| 193 |
gint nextlinesize; |
| 194 |
gint nextlinewritesize; |
| 195 |
GIOStatus s; |
| 196 |
|
| 197 |
/* Get the next line, and write it down the stream */ |
| 198 |
s = g_io_channel_read_line(sinfile, &nextline, &nextlinesize, NULL, NULL); |
| 199 |
if (nextlinesize > 0){ |
| 200 |
g_io_channel_write_chars(sin, nextline, nextlinesize, &nextlinewritesize, NULL); |
| 201 |
} |
| 202 |
if (s == G_IO_STATUS_EOF){ |
| 203 |
g_io_channel_shutdown(sin, TRUE, NULL); |
| 204 |
sin = NULL; |
| 205 |
fdssize=2; |
| 206 |
} |
| 207 |
|
| 208 |
} |
| 209 |
/* Even if we did get a hangup - lets make sure there is no more data to read first by looping again */ |
| 210 |
if (readdata) continue; |
| 211 |
|
| 212 |
if(((fds[0].revents & POLLHUP) != 0) && ((fds[1].revents & POLLHUP) != 0)) break; |
| 213 |
} |
| 214 |
|
| 215 |
while((waitpid(proc->pid, &(proc->stat_loc), 0)) != proc->pid); |
| 216 |
|
| 217 |
g_timer_stop(proc->timer); |
| 218 |
|
| 219 |
/* If process exited cleanly */ |
| 220 |
if (WIFEXITED(proc->stat_loc)){ |
| 221 |
/* Get the exit code */ |
| 222 |
if (verbose) g_fprintf(stderr, "Job '%s' exited with code %d. Exec time: %.2f\n", proc->jobname, WEXITSTATUS(proc->stat_loc), g_timer_elapsed(proc->timer, 0)); |
| 223 |
}else{ |
| 224 |
/* Otherwise - find out what it died with */ |
| 225 |
/* TODO - this doesn't work quite right.. Mainly because its looking at the shell process, so the |
| 226 |
* child of the /bin/sh which does get a signal, this isn't passed up. Although, it handly tells |
| 227 |
* us if the /bin/sh gets a SEGV etc ;) |
| 228 |
*/ |
| 229 |
g_fprintf(stderr, "Job %s exited with signal %d. Exec time: %.2f\n", proc->jobname, (WTERMSIG(proc->stat_loc)), g_timer_elapsed(proc->timer, 0)); |
| 230 |
} |
| 231 |
|
| 232 |
|
| 233 |
g_io_channel_shutdown(sout[0], TRUE, NULL); |
| 234 |
g_io_channel_shutdown(sout[1], TRUE, NULL); |
| 235 |
|
| 236 |
if((infile != NULL) && (sin != NULL)){ |
| 237 |
g_io_channel_shutdown(sin, TRUE, NULL); |
| 238 |
} |
| 239 |
if (outdir != NULL){ |
| 240 |
g_io_channel_shutdown(soutfile[0], TRUE, NULL); |
| 241 |
g_io_channel_shutdown(soutfile[1], TRUE, NULL); |
| 242 |
} |
| 243 |
|
| 244 |
g_spawn_close_pid(proc->pid); |
| 245 |
|
| 246 |
|
| 247 |
} |
| 248 |
|
| 249 |
/* Takes a string str, a search string, find, and string to |
| 250 |
* replace all occurs of find with, replace. Returns a new |
| 251 |
* leaving original intact. |
| 252 |
*/ |
| 253 |
gchar *strrep(gchar *str, gchar *find, gchar *replace){ |
| 254 |
gchar *ptr, *oldptr; |
| 255 |
GString *newstr = g_string_new(""); |
| 256 |
gssize len = strlen(str); |
| 257 |
gint findlen = strlen(find); |
| 258 |
|
| 259 |
ptr = g_strstr_len(str, len, find); |
| 260 |
oldptr=str; |
| 261 |
while(ptr != NULL){ |
| 262 |
/* Copy in data up to this point */ |
| 263 |
g_string_append_len (newstr, oldptr, (ptr - oldptr)); |
| 264 |
/* Put in the replacement string */ |
| 265 |
g_string_append(newstr, replace); |
| 266 |
|
| 267 |
oldptr = ptr + findlen; |
| 268 |
/* BUG - len will now be wrong. But, i only wanted a strstr anyway :) */ |
| 269 |
ptr = g_strstr_len(oldptr, len, find); |
| 270 |
} |
| 271 |
|
| 272 |
/* Copy remains */ |
| 273 |
g_string_append_len (newstr, oldptr, (ptr - oldptr)); |
| 274 |
|
| 275 |
ptr = g_string_free(newstr, FALSE); |
| 276 |
|
| 277 |
return ptr; |
| 278 |
} |
| 279 |
|
| 280 |
|
| 281 |
/* Takes a cmd before substitution, takes the characters to be substituted |
| 282 |
* and a line for doign the substitution with. Fills in jobname |
| 283 |
*/ |
| 284 |
gchar *genexeccmd(gchar *cmd, gchar *fmt, gchar *line, gchar **jobname){ |
| 285 |
gchar *newexec, *ptr; |
| 286 |
int x; |
| 287 |
gchar *linesep = " "; |
| 288 |
gchar *fmtsep = " "; |
| 289 |
|
| 290 |
gchar **line_array; |
| 291 |
gchar **fmt_array; |
| 292 |
|
| 293 |
if ( fmt == NULL ){ |
| 294 |
/* No format given - we'll just append the options to the end of the command */ |
| 295 |
if(jobname != NULL) *jobname = g_strdup(line); |
| 296 |
return g_strdup_printf("%s %s", cmd, line); |
| 297 |
} |
| 298 |
|
| 299 |
line_array = g_strsplit(line, linesep, 0); |
| 300 |
fmt_array = g_strsplit(fmt, fmtsep, 0); |
| 301 |
|
| 302 |
if(jobparsename != NULL){ |
| 303 |
if(jobname != NULL){ |
| 304 |
for(x=0; fmt_array[x] != NULL; x++){ |
| 305 |
if (line_array[x] == NULL) break; |
| 306 |
if((strcmp(fmt_array[x], jobparsename) == 0)){ |
| 307 |
*jobname = g_strdup(line_array[x]); |
| 308 |
break; |
| 309 |
} |
| 310 |
} |
| 311 |
} |
| 312 |
}else{ |
| 313 |
/* Not told us what they want.. We'll just use the first one */ |
| 314 |
*jobname = g_strdup(line_array[0]); |
| 315 |
} |
| 316 |
|
| 317 |
newexec = g_strdup(cmd); |
| 318 |
for(x=0; line_array[x] != NULL; x++){ |
| 319 |
if (fmt_array[x] == NULL) break; |
| 320 |
ptr = newexec; |
| 321 |
newexec = strrep(newexec, fmt_array[x], line_array[x]); |
| 322 |
free(ptr); |
| 323 |
} |
| 324 |
|
| 325 |
|
| 326 |
return newexec; |
| 327 |
} |
| 328 |
|
| 329 |
int main(int argc, char **argv){ |
| 330 |
|
| 331 |
GThreadPool *procpool; |
| 332 |
GError *pp_err = NULL, *err = NULL; |
| 333 |
gint x; |
| 334 |
|
| 335 |
struct rlimit rlp; |
| 336 |
|
| 337 |
GOptionContext *optcontext; |
| 338 |
|
| 339 |
optcontext = g_option_context_new(" - parallel job executer"); |
| 340 |
g_option_context_add_main_entries(optcontext, options, NULL); |
| 341 |
g_option_context_parse (optcontext, &argc, &argv, &err); |
| 342 |
|
| 343 |
if(command == NULL){ |
| 344 |
g_printerr("Command required, see --help for more flags\n"); |
| 345 |
exit(1); |
| 346 |
} |
| 347 |
|
| 348 |
if(verbose){ |
| 349 |
g_printerr("Command '%s'\n", command); |
| 350 |
g_printerr("Timeout '%d'\n", timeout); |
| 351 |
g_printerr("Jobs '%d'\n", numthreads); |
| 352 |
} |
| 353 |
|
| 354 |
|
| 355 |
if(argc < 2 && arglist == NULL){ |
| 356 |
/* We have no arguments */ |
| 357 |
g_printerr("Missing arguments, see --help for details\n"); |
| 358 |
exit(1); |
| 359 |
} |
| 360 |
|
| 361 |
if (!g_thread_supported ()){ |
| 362 |
g_thread_init (NULL); |
| 363 |
}else{ |
| 364 |
g_printerr("Threading not supported\n"); |
| 365 |
} |
| 366 |
|
| 367 |
/* Up the number of FD's to the "hard" limit. |
| 368 |
* This is mainly to get around the very small default |
| 369 |
* solaris has, or 256 |
| 370 |
*/ |
| 371 |
getrlimit(RLIMIT_NOFILE, &rlp); |
| 372 |
rlp.rlim_cur = rlp.rlim_max; |
| 373 |
setrlimit(RLIMIT_NOFILE, &rlp); |
| 374 |
|
| 375 |
if(verbose) g_printerr("Creating a threadpool %d in size\n", numthreads); |
| 376 |
procpool = g_thread_pool_new(process_child, NULL, numthreads, FALSE, &pp_err); |
| 377 |
|
| 378 |
/* Generate the commands and push the job onto the thread pool */ |
| 379 |
/* If no substituion is needed */ |
| 380 |
if (arglist != NULL){ |
| 381 |
GIOChannel *f; |
| 382 |
gchar *line; |
| 383 |
GIOStatus status; |
| 384 |
|
| 385 |
f = g_io_channel_new_file(arglist, "r", &err); |
| 386 |
if (f == NULL){ |
| 387 |
g_printerr("Failed to open argfile: %s\n", err->message); |
| 388 |
exit(1); |
| 389 |
} |
| 390 |
status = g_io_channel_read_line(f, &line, NULL, NULL, &err); |
| 391 |
while(status==G_IO_STATUS_NORMAL){ |
| 392 |
process_t *newproc = g_new(process_t, 1); |
| 393 |
newproc->err = NULL; |
| 394 |
|
| 395 |
line = g_strstrip(line); |
| 396 |
|
| 397 |
newproc->exec = genexeccmd(command, parseformat, line, &(newproc->jobname)); |
| 398 |
proclist = g_list_append(proclist, (gpointer) newproc); |
| 399 |
|
| 400 |
if(verbose) g_printerr("Pushing command '%s' into thread pool queue\n", newproc->exec); |
| 401 |
g_thread_pool_push(procpool, (gpointer) newproc, &(newproc->err)); |
| 402 |
|
| 403 |
status = g_io_channel_read_line(f, &line, NULL, NULL, &err); |
| 404 |
} |
| 405 |
g_io_channel_close(f); |
| 406 |
|
| 407 |
}else{ |
| 408 |
/* substition is needed */ |
| 409 |
for(x=1; x<argc; x++){ |
| 410 |
process_t *newproc = g_new(process_t, 1); |
| 411 |
newproc->err = NULL; |
| 412 |
|
| 413 |
newproc->exec = genexeccmd(command, parseformat, argv[x], &(newproc->jobname)); |
| 414 |
|
| 415 |
proclist = g_list_append(proclist, (gpointer) newproc); |
| 416 |
|
| 417 |
if(verbose) g_printerr("Pushing command '%s' into thread pool queue\n", newproc->exec); |
| 418 |
g_thread_pool_push(procpool, (gpointer) newproc, &(newproc->err)); |
| 419 |
} |
| 420 |
} |
| 421 |
|
| 422 |
|
| 423 |
/* Wait for the jobs to finish */ |
| 424 |
/* TODO - Kill jobs that don't finish in time */ |
| 425 |
while(g_thread_pool_get_num_threads(procpool) > 0){ |
| 426 |
g_usleep(1000); |
| 427 |
} |
| 428 |
|
| 429 |
return 0; |
| 430 |
} |