1 |
pajs |
1.1 |
/* |
2 |
|
|
* Peter Saunders pjob.c |
3 |
|
|
* Copyright (C) 2000-2005 Peter Saunders |
4 |
|
|
* |
5 |
|
|
* This program is free software; you can redistribute it and/or |
6 |
|
|
* modify it under the terms of the GNU General Public License |
7 |
|
|
* as published by the Free Software Foundation; either version 2 |
8 |
|
|
* of the License, or (at your option) any later version. |
9 |
|
|
* |
10 |
|
|
* This program is distributed in the hope that it will be useful, |
11 |
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 |
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 |
|
|
* GNU General Public License for more details. |
14 |
|
|
* |
15 |
|
|
* You should have received a copy of the GNU General Public License |
16 |
|
|
* along with this program; if not, write to the Free Software |
17 |
|
|
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. |
18 |
|
|
* |
19 |
|
|
*/ |
20 |
|
|
|
21 |
|
|
#include <stdio.h> |
22 |
|
|
#include <glib.h> |
23 |
|
|
#include <unistd.h> |
24 |
|
|
#include <poll.h> |
25 |
|
|
|
26 |
|
|
#define DEFNUMTHREAD 10 |
27 |
|
|
#define DEFTIMEOUT 60 |
28 |
|
|
|
29 |
|
|
/* Structure for the process to be passed to the executing thread */ |
30 |
|
|
struct _process_t{ |
31 |
|
|
gchar *exec; |
32 |
|
|
gchar *jobname; |
33 |
|
|
GTimer *timer; |
34 |
|
|
GPid pid; |
35 |
|
|
gchar *file_stdout; |
36 |
|
|
gchar *file_stderr; |
37 |
|
|
|
38 |
|
|
GError *err; |
39 |
|
|
}; |
40 |
|
|
|
41 |
|
|
typedef struct _process_t process_t; |
42 |
|
|
|
43 |
|
|
/* Globals for config setup */ |
44 |
|
|
static gint numthreads = DEFNUMTHREAD; |
45 |
|
|
static gboolean verbose = FALSE; |
46 |
|
|
static gboolean quiet = FALSE; |
47 |
|
|
static gint timeout = DEFTIMEOUT; |
48 |
|
|
static gchar *parseformat = NULL; |
49 |
|
|
static gchar *jobparsename = NULL; |
50 |
|
|
static gchar *command = NULL; |
51 |
|
|
static gchar *outdir = NULL; |
52 |
|
|
static gchar *infile = NULL; |
53 |
|
|
static gchar *arglist = NULL; |
54 |
|
|
static gchar *stdindata = NULL; |
55 |
|
|
|
56 |
|
|
/* Command line options */ |
57 |
|
|
static GOptionEntry options[] = |
58 |
|
|
{ |
59 |
|
|
{ "jobs", 'j', 0, G_OPTION_ARG_INT, &numthreads, "Number of jobs to run in parallel [DEFNUMTHREAD]", NULL }, |
60 |
|
|
{ "verbose", 'v', 0, G_OPTION_ARG_NONE, &verbose, "Be verbose", NULL }, |
61 |
|
|
{ "quiet", 'q', 0, G_OPTION_ARG_NONE, &quiet, "Do not print the output of the commands as they are running", NULL }, |
62 |
|
|
{ "timeout", 't', 0, G_OPTION_ARG_INT, &timeout, "Timeout before process is killed [DEFTIMEOUT]", NULL }, |
63 |
|
|
{ "format", 'f', 0, G_OPTION_ARG_STRING, &parseformat, "The order of substitiution to be used in the command", NULL }, |
64 |
|
|
{ "jobname", 'n', 0, G_OPTION_ARG_STRING, &jobparsename, "If format is used, which variable to use for a output dir", NULL }, |
65 |
|
|
{ "command", 'c', 0, G_OPTION_ARG_STRING, &command, "The command to be executed", NULL }, |
66 |
|
|
{ "output", 'o', 0, G_OPTION_ARG_STRING, &outdir, "Directory to put all output into", NULL }, |
67 |
|
|
{ "stdin", 'i', 0, G_OPTION_ARG_FILENAME, &infile, "Pass contents of filename into stdin of the executing process", NULL }, |
68 |
|
|
{ "argsfile", 'a', 0, G_OPTION_ARG_FILENAME, &arglist, "File for list of argumenst if you dont want to use the command line", NULL }, |
69 |
|
|
{ NULL } |
70 |
|
|
}; |
71 |
|
|
|
72 |
|
|
/* Linked list of process jobs */ |
73 |
|
|
GList *proclist = NULL; |
74 |
|
|
|
75 |
|
|
/* Take a process_t and execute it, and doing the "right thing" with the output */ |
76 |
|
|
void process_child(gpointer data, gpointer user_data){ |
77 |
|
|
process_t *proc = (process_t*) data; |
78 |
|
|
GIOChannel *soutfile[2], *sout[2], *sinfile, *sin; |
79 |
|
|
gchar *execargv[4]; |
80 |
|
|
int outpipes[2], inpipes[2]; |
81 |
|
|
GError *err = NULL; |
82 |
|
|
|
83 |
|
|
struct pollfd fds[3]; |
84 |
|
|
gint fdssize=2; |
85 |
|
|
|
86 |
|
|
/*if((pipe(outpipes)) != 0){ |
87 |
|
|
g_printerr("Failed to create pipe\n"); |
88 |
|
|
return ; |
89 |
|
|
}*/ |
90 |
|
|
|
91 |
|
|
/* Setup files in output dir if requested to do so */ |
92 |
|
|
if(outdir != NULL){ |
93 |
|
|
proc->file_stdout = g_strdup_printf("%s/%s-STDOUT", outdir, proc->jobname); |
94 |
|
|
soutfile[0] = g_io_channel_new_file(proc->file_stdout, "w", &err); |
95 |
|
|
|
96 |
|
|
if(soutfile[0] == NULL){ |
97 |
|
|
g_printerr("Failed to open %s for writing: %s. Skipping job\n", proc->file_stdout, err->message); |
98 |
|
|
return; |
99 |
|
|
} |
100 |
|
|
|
101 |
|
|
proc->file_stderr = g_strdup_printf("%s/%s-STDERR", outdir, proc->jobname); |
102 |
|
|
soutfile[1] = g_io_channel_new_file(proc->file_stderr, "w", &err); |
103 |
|
|
|
104 |
|
|
if(soutfile[1] == NULL){ |
105 |
|
|
g_printerr("Failed to open %s for writing: %s. Skipping job\n", proc->file_stderr, err->message); |
106 |
|
|
return; |
107 |
|
|
} |
108 |
|
|
|
109 |
|
|
} |
110 |
|
|
|
111 |
|
|
|
112 |
|
|
/* Open stdin file to pass to the process */ |
113 |
|
|
if(infile != NULL){ |
114 |
|
|
sinfile = g_io_channel_new_file(infile, "r", NULL); |
115 |
|
|
pipe(inpipes); |
116 |
|
|
} |
117 |
|
|
|
118 |
|
|
/* Setup argv structure for job */ |
119 |
|
|
if (verbose) g_fprintf(stderr, "Starting job '%s'\n", proc->jobname); |
120 |
|
|
execargv[0] = "/bin/sh"; |
121 |
|
|
execargv[1] = "-c"; |
122 |
|
|
execargv[2] = proc->exec; |
123 |
|
|
execargv[3] = NULL; |
124 |
|
|
|
125 |
|
|
/* Exec the job */ |
126 |
|
|
if (infile == NULL){ |
127 |
|
|
if( ! g_spawn_async_with_pipes(NULL, execargv, NULL, 0, NULL, NULL, &(proc->pid), NULL, &(outpipes[0]), &(outpipes[1]), &err)){ |
128 |
|
|
g_printerr("Failed to execute job %s: %s\n", proc->jobname, err->message); |
129 |
|
|
return; |
130 |
|
|
} |
131 |
|
|
}else{ |
132 |
|
|
if( ! g_spawn_async_with_pipes(NULL, execargv, NULL, 0, NULL, NULL, &(proc->pid), &(inpipes[1]), &(outpipes[0]), &(outpipes[1]), &err)){ |
133 |
|
|
g_printerr("Failed to execute job %s: %s\n", proc->jobname, err->message); |
134 |
|
|
return; |
135 |
|
|
} |
136 |
|
|
close(inpipes[0]); |
137 |
|
|
} |
138 |
|
|
|
139 |
|
|
|
140 |
|
|
/* Make a stream out of the pipes for ease of reading from them */ |
141 |
|
|
sout[0] = g_io_channel_unix_new(outpipes[0]); |
142 |
|
|
sout[1] = g_io_channel_unix_new(outpipes[1]); |
143 |
|
|
if(infile != NULL){ |
144 |
|
|
sin = g_io_channel_unix_new(inpipes[1]); |
145 |
|
|
fds[2].fd = inpipes[1]; |
146 |
|
|
fds[2].events = POLLOUT | POLLHUP; |
147 |
|
|
fdssize = 3; |
148 |
|
|
} |
149 |
|
|
|
150 |
|
|
|
151 |
|
|
/* Setup the poll events */ |
152 |
|
|
fds[0].fd = outpipes[0]; |
153 |
|
|
fds[1].fd = outpipes[1]; |
154 |
|
|
fds[0].events = fds[1].events = POLLIN | POLLPRI | POLLHUP; |
155 |
|
|
fds[0].revents = fds[1].revents = fds[2].revents = 0; |
156 |
|
|
|
157 |
|
|
|
158 |
|
|
for(;;){ |
159 |
|
|
gint x; |
160 |
|
|
gchar *readbuf; |
161 |
|
|
gint rdatasize, wdatasize; |
162 |
|
|
gboolean readdata = FALSE; |
163 |
|
|
fds[0].revents = fds[1].revents = fds[2].revents = 0; |
164 |
|
|
|
165 |
|
|
poll(fds, fdssize, -1); |
166 |
|
|
/* For stdout and stderr see if there is any data, and read it */ |
167 |
|
|
for(x=0; x<2; x++){ |
168 |
|
|
if(fds[x].revents|POLLIN == fds[x].revents){ |
169 |
|
|
/* We have data to read */ |
170 |
|
|
g_io_channel_read_line(sout[x], &readbuf, &rdatasize, NULL, NULL); |
171 |
|
|
if(rdatasize > 0){ |
172 |
|
|
/* Print it if unless told not to */ |
173 |
|
|
if(!quiet){ |
174 |
|
|
g_printf("[%s] [%s] %s", proc->jobname, (x==0) ? "out" : "err", readbuf); |
175 |
|
|
} |
176 |
|
|
if(outdir != NULL){ |
177 |
|
|
g_io_channel_write_chars(soutfile[x], readbuf, rdatasize, &wdatasize, NULL); |
178 |
|
|
} |
179 |
|
|
readdata = TRUE; |
180 |
|
|
free(readbuf); |
181 |
|
|
} |
182 |
|
|
|
183 |
|
|
} |
184 |
|
|
} |
185 |
|
|
/* See if we need to pump more data down stdin */ |
186 |
|
|
if(fds[2].revents|POLLOUT == fds[2].revents){ |
187 |
|
|
/* We have data we can write */ |
188 |
|
|
gchar *nextline; |
189 |
|
|
gint nextlinesize; |
190 |
|
|
gint nextlinewritesize; |
191 |
|
|
GIOStatus s; |
192 |
|
|
|
193 |
|
|
/* Get the next line, and write it down the stream */ |
194 |
|
|
s = g_io_channel_read_line(sinfile, &nextline, &nextlinesize, NULL, NULL); |
195 |
|
|
if (nextlinesize > 0){ |
196 |
|
|
printf("Going to write '%s'\n", nextline); |
197 |
|
|
g_io_channel_write_chars(sin, nextline, nextlinesize, &nextlinewritesize, NULL); |
198 |
|
|
} |
199 |
|
|
if (s == G_IO_STATUS_EOF){ |
200 |
|
|
g_io_channel_shutdown(sin, TRUE, NULL); |
201 |
|
|
sin == NULL; |
202 |
|
|
fdssize=2; |
203 |
|
|
} |
204 |
|
|
|
205 |
|
|
} |
206 |
|
|
/* Even if we did get a hangup - lets make sure there is no more data to read first by looping again */ |
207 |
|
|
if (readdata) continue; |
208 |
|
|
|
209 |
|
|
if((fds[0].revents|POLLHUP == fds[0].revents) && (fds[1].revents|POLLHUP == fds[1].revents)) break; |
210 |
|
|
} |
211 |
|
|
|
212 |
|
|
/* For some batty reason, it starts with a ref count of 1. Lets decrement it so we can shut it */ |
213 |
|
|
/*g_io_channel_unref(sout[0]); |
214 |
|
|
g_io_channel_unref(sout[1]);*/ |
215 |
|
|
|
216 |
|
|
|
217 |
|
|
g_io_channel_shutdown(sout[0], TRUE, NULL); |
218 |
|
|
g_io_channel_shutdown(sout[1], TRUE, NULL); |
219 |
|
|
/*close(outpipes[0]); |
220 |
|
|
close(outpipes[1]);*/ |
221 |
|
|
|
222 |
|
|
/* BUG - Causes glib error saying its already closed */ |
223 |
|
|
if((infile != NULL) && (sin != NULL)){ |
224 |
|
|
g_io_channel_shutdown(sin, TRUE, NULL); |
225 |
|
|
} |
226 |
|
|
if (outdir != NULL){ |
227 |
|
|
g_io_channel_shutdown(soutfile[0], TRUE, NULL); |
228 |
|
|
g_io_channel_shutdown(soutfile[1], TRUE, NULL); |
229 |
|
|
} |
230 |
|
|
|
231 |
|
|
g_spawn_close_pid(proc->pid); |
232 |
|
|
|
233 |
|
|
if (verbose) g_fprintf(stderr, "Ending job '%s'\n", proc->jobname); |
234 |
|
|
|
235 |
|
|
} |
236 |
|
|
|
237 |
|
|
/* Takes a string str, a search string, find, and string to |
238 |
|
|
* replace all occurs of find with, replace. Returns a new |
239 |
|
|
* leaving original intact. |
240 |
|
|
*/ |
241 |
|
|
gchar *strrep(gchar *str, gchar *find, gchar *replace){ |
242 |
|
|
gchar *ptr, *oldptr; |
243 |
|
|
GString *newstr = g_string_new(""); |
244 |
|
|
gssize len = strlen(str); |
245 |
|
|
gint findlen = strlen(find); |
246 |
|
|
|
247 |
|
|
ptr = g_strstr_len(str, len, find); |
248 |
|
|
oldptr=str; |
249 |
|
|
while(ptr != NULL){ |
250 |
|
|
/* Copy in data up to this point */ |
251 |
|
|
g_string_append_len (newstr, oldptr, (ptr - oldptr)); |
252 |
|
|
/* Put in the replacement string */ |
253 |
|
|
g_string_append(newstr, replace); |
254 |
|
|
|
255 |
|
|
oldptr = ptr + findlen; |
256 |
|
|
/* BUG - len will now be wrong. But, i only wanted a strstr anyway :) */ |
257 |
|
|
ptr = g_strstr_len(oldptr, len, find); |
258 |
|
|
} |
259 |
|
|
|
260 |
|
|
/* Copy remains */ |
261 |
|
|
g_string_append_len (newstr, oldptr, (ptr - oldptr)); |
262 |
|
|
|
263 |
|
|
ptr = g_string_free(newstr, FALSE); |
264 |
|
|
|
265 |
|
|
return ptr; |
266 |
|
|
} |
267 |
|
|
|
268 |
|
|
|
269 |
|
|
/* Takes a cmd before substitution, takes the characters to be substituted |
270 |
|
|
* and a line for doign the substitution with. Fills in jobname |
271 |
|
|
*/ |
272 |
|
|
gchar *genexeccmd(gchar *cmd, gchar *fmt, gchar *line, gchar **jobname){ |
273 |
|
|
gchar **fmttok; |
274 |
|
|
gchar *newexec, *ptr; |
275 |
|
|
int x; |
276 |
|
|
gchar *linesep = " "; |
277 |
|
|
gchar *fmtsep = " "; |
278 |
|
|
|
279 |
|
|
gchar **line_array; |
280 |
|
|
gchar **fmt_array; |
281 |
|
|
|
282 |
|
|
if ( fmt == NULL ){ |
283 |
|
|
/* No format given - we'll just append the options to the end of the command */ |
284 |
|
|
if(jobname != NULL) *jobname = g_strdup(line); |
285 |
|
|
return g_strdup_printf("%s %s", cmd, line); |
286 |
|
|
} |
287 |
|
|
|
288 |
|
|
line_array = g_strsplit(line, linesep, 0); |
289 |
|
|
fmt_array = g_strsplit(fmt, fmtsep, 0); |
290 |
|
|
|
291 |
|
|
if(jobparsename != NULL){ |
292 |
|
|
if(jobname != NULL){ |
293 |
|
|
for(x=0; fmt_array[x] != NULL; x++){ |
294 |
|
|
if (line_array[x] == NULL) break; |
295 |
|
|
if((strcmp(fmt_array[x], jobparsename) == 0)){ |
296 |
|
|
*jobname = g_strdup(line_array[x]); |
297 |
|
|
break; |
298 |
|
|
} |
299 |
|
|
} |
300 |
|
|
} |
301 |
|
|
}else{ |
302 |
|
|
/* Not told us what they want.. We'll just use the first one */ |
303 |
|
|
*jobname = g_strdup(line_array[0]); |
304 |
|
|
} |
305 |
|
|
|
306 |
|
|
newexec = g_strdup(cmd); |
307 |
|
|
for(x=0; line_array[x] != NULL; x++){ |
308 |
|
|
if (fmt_array[x] == NULL) break; |
309 |
|
|
ptr = newexec; |
310 |
|
|
newexec = strrep(newexec, fmt_array[x], line_array[x]); |
311 |
|
|
free(ptr); |
312 |
|
|
} |
313 |
|
|
|
314 |
|
|
|
315 |
|
|
return newexec; |
316 |
|
|
} |
317 |
|
|
|
318 |
|
|
int main(int argc, char **argv){ |
319 |
|
|
|
320 |
|
|
GThreadPool *procpool; |
321 |
|
|
GError *pp_err = NULL, *err = NULL; |
322 |
|
|
gint x; |
323 |
|
|
|
324 |
|
|
GOptionContext *optcontext; |
325 |
|
|
|
326 |
|
|
optcontext = g_option_context_new(" - parallel job executer"); |
327 |
|
|
g_option_context_add_main_entries(optcontext, options, NULL); |
328 |
|
|
g_option_context_parse (optcontext, &argc, &argv, &err); |
329 |
|
|
|
330 |
|
|
if(command == NULL){ |
331 |
|
|
g_printerr("Command required, see --help for more flags\n"); |
332 |
|
|
exit(1); |
333 |
|
|
} |
334 |
|
|
|
335 |
|
|
if(verbose){ |
336 |
|
|
g_printerr("Command '%s'\n", command); |
337 |
|
|
g_printerr("Timeout '%d'\n", timeout); |
338 |
|
|
g_printerr("Jobs '%d'\n", numthreads); |
339 |
|
|
} |
340 |
|
|
|
341 |
|
|
|
342 |
|
|
if(argc < 2 && arglist == NULL){ |
343 |
|
|
/* We have no arguments */ |
344 |
|
|
g_printerr("Missing arguments, see --help for details\n"); |
345 |
|
|
exit(1); |
346 |
|
|
} |
347 |
|
|
|
348 |
|
|
if (!g_thread_supported ()){ |
349 |
|
|
g_thread_init (NULL); |
350 |
|
|
}else{ |
351 |
|
|
g_printerr("Threading not supported\n"); |
352 |
|
|
} |
353 |
|
|
|
354 |
|
|
if(verbose) g_printerr("Creating a threadpool %d in size\n", numthreads); |
355 |
|
|
procpool = g_thread_pool_new(process_child, NULL, numthreads, FALSE, &pp_err); |
356 |
|
|
|
357 |
|
|
/* Generate the commands and push the job onto the thread pool */ |
358 |
|
|
/* If no substituion is needed */ |
359 |
|
|
if (arglist != NULL){ |
360 |
|
|
GIOChannel *f; |
361 |
|
|
gchar *line; |
362 |
|
|
GIOStatus status; |
363 |
|
|
|
364 |
|
|
f = g_io_channel_new_file(arglist, "r", &err); |
365 |
|
|
if (f == NULL){ |
366 |
|
|
g_printerr("Failed to open argfile: %s\n", err->message); |
367 |
|
|
exit(1); |
368 |
|
|
} |
369 |
|
|
status = g_io_channel_read_line(f, &line, NULL, NULL, &err); |
370 |
|
|
while(status==G_IO_STATUS_NORMAL){ |
371 |
|
|
process_t *newproc = g_new(process_t, 1); |
372 |
|
|
newproc->err = NULL; |
373 |
|
|
|
374 |
|
|
line = g_strstrip(line); |
375 |
|
|
|
376 |
|
|
newproc->exec = genexeccmd(command, parseformat, line, &(newproc->jobname)); |
377 |
|
|
proclist = g_list_append(proclist, (gpointer) newproc); |
378 |
|
|
|
379 |
|
|
if(verbose) g_printerr("Pushing command '%s' into thread pool queue\n", newproc->exec); |
380 |
|
|
g_thread_pool_push(procpool, (gpointer) newproc, &(newproc->err)); |
381 |
|
|
|
382 |
|
|
status = g_io_channel_read_line(f, &line, NULL, NULL, &err); |
383 |
|
|
} |
384 |
|
|
g_io_channel_close(f); |
385 |
|
|
|
386 |
|
|
}else{ |
387 |
|
|
/* substition is needed */ |
388 |
|
|
for(x=1; x<argc; x++){ |
389 |
|
|
process_t *newproc = g_new(process_t, 1); |
390 |
|
|
newproc->err = NULL; |
391 |
|
|
|
392 |
|
|
newproc->exec = genexeccmd(command, parseformat, argv[x], &(newproc->jobname)); |
393 |
|
|
|
394 |
|
|
proclist = g_list_append(proclist, (gpointer) newproc); |
395 |
|
|
|
396 |
|
|
if(verbose) g_printerr("Pushing command '%s' into thread pool queue\n", newproc->exec); |
397 |
|
|
g_thread_pool_push(procpool, (gpointer) newproc, &(newproc->err)); |
398 |
|
|
} |
399 |
|
|
} |
400 |
|
|
|
401 |
|
|
|
402 |
|
|
/* Wait for the jobs to finish */ |
403 |
|
|
/* TODO - Kill jobs that don't finish in time */ |
404 |
|
|
while(g_thread_pool_get_num_threads(procpool) > 0){ |
405 |
|
|
sleep(1); |
406 |
|
|
} |
407 |
|
|
|
408 |
|
|
return 0; |
409 |
|
|
} |