ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/i-scream/projects/pjob/pjob.c
Revision: 1.6
Committed: Mon Jan 30 21:46:59 2006 UTC (18 years, 8 months ago) by pajs
Content type: text/plain
Branch: MAIN
CVS Tags: HEAD
Changes since 1.5: +11 -0 lines
Error occurred while calculating annotation data.
Log Message:
Implemented the use of hard/soft limits. Need to start autoconfing around
stuff like that

File Contents

# Content
1 /*
2 * Peter Saunders pjob.c
3 * Copyright (C) 2000-2005 Peter Saunders
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 *
19 */
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <string.h>
25 #include <glib.h>
26 #include <glib/gprintf.h>
27 #include <poll.h>
28 #include <sys/types.h>
29 #include <sys/wait.h>
30 #include <sys/resource.h>
31
32 #define DEFNUMTHREAD 10
33 #define DEFTIMEOUT 60
34
35 /* Structure for the process to be passed to the executing thread */
36 struct _process_t{
37 gchar *exec;
38 gchar *jobname;
39 GTimer *timer;
40 GPid pid;
41 gint stat_loc;
42 gchar *file_stdout;
43 gchar *file_stderr;
44
45 GError *err;
46 };
47
48 typedef struct _process_t process_t;
49
50 /* Globals for config setup */
51 static gint numthreads = DEFNUMTHREAD;
52 static gboolean verbose = FALSE;
53 static gboolean quiet = FALSE;
54 static gint timeout = DEFTIMEOUT;
55 static gchar *parseformat = NULL;
56 static gchar *jobparsename = NULL;
57 static gchar *command = NULL;
58 static gchar *outdir = NULL;
59 static gchar *infile = NULL;
60 static gchar *arglist = NULL;
61
62 /* Command line options */
63 static GOptionEntry options[] =
64 {
65 { "jobs", 'j', 0, G_OPTION_ARG_INT, &numthreads, "Number of jobs to run in parallel [DEFNUMTHREAD]", NULL },
66 { "verbose", 'v', 0, G_OPTION_ARG_NONE, &verbose, "Be verbose", NULL },
67 { "quiet", 'q', 0, G_OPTION_ARG_NONE, &quiet, "Do not print the output of the commands as they are running", NULL },
68 { "timeout", 't', 0, G_OPTION_ARG_INT, &timeout, "Timeout before process is killed [DEFTIMEOUT]", NULL },
69 { "format", 'f', 0, G_OPTION_ARG_STRING, &parseformat, "The order of substitiution to be used in the command", NULL },
70 { "jobname", 'n', 0, G_OPTION_ARG_STRING, &jobparsename, "If format is used, which variable to use for a output dir", NULL },
71 { "command", 'c', 0, G_OPTION_ARG_STRING, &command, "The command to be executed", NULL },
72 { "output", 'o', 0, G_OPTION_ARG_STRING, &outdir, "Directory to put all output into", NULL },
73 { "stdin", 'i', 0, G_OPTION_ARG_FILENAME, &infile, "Pass contents of filename into stdin of the executing process", NULL },
74 { "argsfile", 'a', 0, G_OPTION_ARG_FILENAME, &arglist, "File for list of argumenst if you dont want to use the command line", NULL },
75 { NULL }
76 };
77
78 /* Linked list of process jobs */
79 GList *proclist = NULL;
80
81 /* Take a process_t and execute it, and doing the "right thing" with the output */
82 void process_child(gpointer data, gpointer user_data){
83 process_t *proc = (process_t*) data;
84 GIOChannel *soutfile[2], *sout[2], *sinfile, *sin;
85 gchar *execargv[4];
86 int outpipes[2], inpipes[2];
87 GError *err = NULL;
88
89 struct pollfd fds[3];
90 gint fdssize=2;
91
92 proc->timer = g_timer_new();
93 g_timer_start(proc->timer);
94
95 /* Setup files in output dir if requested to do so */
96 if(outdir != NULL){
97 proc->file_stdout = g_strdup_printf("%s/%s-STDOUT", outdir, proc->jobname);
98 soutfile[0] = g_io_channel_new_file(proc->file_stdout, "w", &err);
99
100 if(soutfile[0] == NULL){
101 g_printerr("Failed to open %s for writing: %s. Skipping job\n", proc->file_stdout, err->message);
102 return;
103 }
104
105 proc->file_stderr = g_strdup_printf("%s/%s-STDERR", outdir, proc->jobname);
106 soutfile[1] = g_io_channel_new_file(proc->file_stderr, "w", &err);
107
108 if(soutfile[1] == NULL){
109 g_printerr("Failed to open %s for writing: %s. Skipping job\n", proc->file_stderr, err->message);
110 return;
111 }
112
113 }
114
115
116 /* Open stdin file to pass to the process */
117 if(infile != NULL){
118 sinfile = g_io_channel_new_file(infile, "r", NULL);
119 pipe(inpipes);
120 }
121
122 /* Setup argv structure for job */
123 if (verbose) g_fprintf(stderr, "Starting job '%s'\n", proc->jobname);
124 execargv[0] = "/bin/sh";
125 execargv[1] = "-c";
126 execargv[2] = proc->exec;
127 execargv[3] = NULL;
128
129 /* Exec the job */
130 if (infile == NULL){
131 if( ! g_spawn_async_with_pipes(NULL, execargv, NULL, G_SPAWN_DO_NOT_REAP_CHILD, NULL, NULL, &(proc->pid), NULL, &(outpipes[0]), &(outpipes[1]), &err)){
132 g_printerr("Failed to execute job %s: %s\n", proc->jobname, err->message);
133 return;
134 }
135 }else{
136 if( ! g_spawn_async_with_pipes(NULL, execargv, NULL, G_SPAWN_DO_NOT_REAP_CHILD, NULL, NULL, &(proc->pid), &(inpipes[1]), &(outpipes[0]), &(outpipes[1]), &err)){
137 g_printerr("Failed to execute job %s: %s\n", proc->jobname, err->message);
138 return;
139 }
140 close(inpipes[0]);
141 }
142
143
144 /* Make a stream out of the pipes for ease of reading from them */
145 sout[0] = g_io_channel_unix_new(outpipes[0]);
146 sout[1] = g_io_channel_unix_new(outpipes[1]);
147 if(infile != NULL){
148 sin = g_io_channel_unix_new(inpipes[1]);
149 fds[2].fd = inpipes[1];
150 fds[2].events = POLLOUT | POLLHUP;
151 fdssize = 3;
152 }
153
154
155 /* Setup the poll events */
156 fds[0].fd = outpipes[0];
157 fds[1].fd = outpipes[1];
158 fds[0].events = fds[1].events = POLLIN | POLLPRI | POLLHUP;
159 fds[0].revents = fds[1].revents = fds[2].revents = 0;
160
161
162 for(;;){
163 gint x;
164 gchar *readbuf;
165 gint rdatasize, wdatasize;
166 gboolean readdata = FALSE;
167 fds[0].revents = fds[1].revents = fds[2].revents = 0;
168
169 poll(fds, fdssize, -1);
170 /* For stdout and stderr see if there is any data, and read it */
171 for(x=0; x<2; x++){
172 if((fds[x].revents & POLLIN) != 0){
173 /* We have data to read */
174 g_io_channel_read_line(sout[x], &readbuf, &rdatasize, NULL, NULL);
175 if(rdatasize > 0){
176 /* Print it if unless told not to */
177 if(!quiet){
178 g_printf("[%s] [%s] %s", proc->jobname, (x==0) ? "out" : "err", readbuf);
179 }
180 if(outdir != NULL){
181 g_io_channel_write_chars(soutfile[x], readbuf, rdatasize, &wdatasize, NULL);
182 }
183 readdata = TRUE;
184 free(readbuf);
185 }
186
187 }
188 }
189 /* See if we need to pump more data down stdin */
190 if((fds[2].revents & POLLOUT) != 0){
191 /* We have data we can write */
192 gchar *nextline;
193 gint nextlinesize;
194 gint nextlinewritesize;
195 GIOStatus s;
196
197 /* Get the next line, and write it down the stream */
198 s = g_io_channel_read_line(sinfile, &nextline, &nextlinesize, NULL, NULL);
199 if (nextlinesize > 0){
200 g_io_channel_write_chars(sin, nextline, nextlinesize, &nextlinewritesize, NULL);
201 }
202 if (s == G_IO_STATUS_EOF){
203 g_io_channel_shutdown(sin, TRUE, NULL);
204 sin = NULL;
205 fdssize=2;
206 }
207
208 }
209 /* Even if we did get a hangup - lets make sure there is no more data to read first by looping again */
210 if (readdata) continue;
211
212 if(((fds[0].revents & POLLHUP) != 0) && ((fds[1].revents & POLLHUP) != 0)) break;
213 }
214
215 while((waitpid(proc->pid, &(proc->stat_loc), 0)) != proc->pid);
216
217 g_timer_stop(proc->timer);
218
219 /* If process exited cleanly */
220 if (WIFEXITED(proc->stat_loc)){
221 /* Get the exit code */
222 if (verbose) g_fprintf(stderr, "Job '%s' exited with code %d. Exec time: %.2f\n", proc->jobname, WEXITSTATUS(proc->stat_loc), g_timer_elapsed(proc->timer, 0));
223 }else{
224 /* Otherwise - find out what it died with */
225 /* TODO - this doesn't work quite right.. Mainly because its looking at the shell process, so the
226 * child of the /bin/sh which does get a signal, this isn't passed up. Although, it handly tells
227 * us if the /bin/sh gets a SEGV etc ;)
228 */
229 g_fprintf(stderr, "Job %s exited with signal %d. Exec time: %.2f\n", proc->jobname, (WTERMSIG(proc->stat_loc)), g_timer_elapsed(proc->timer, 0));
230 }
231
232
233 g_io_channel_shutdown(sout[0], TRUE, NULL);
234 g_io_channel_shutdown(sout[1], TRUE, NULL);
235
236 if((infile != NULL) && (sin != NULL)){
237 g_io_channel_shutdown(sin, TRUE, NULL);
238 }
239 if (outdir != NULL){
240 g_io_channel_shutdown(soutfile[0], TRUE, NULL);
241 g_io_channel_shutdown(soutfile[1], TRUE, NULL);
242 }
243
244 g_spawn_close_pid(proc->pid);
245
246
247 }
248
249 /* Takes a string str, a search string, find, and string to
250 * replace all occurs of find with, replace. Returns a new
251 * leaving original intact.
252 */
253 gchar *strrep(gchar *str, gchar *find, gchar *replace){
254 gchar *ptr, *oldptr;
255 GString *newstr = g_string_new("");
256 gssize len = strlen(str);
257 gint findlen = strlen(find);
258
259 ptr = g_strstr_len(str, len, find);
260 oldptr=str;
261 while(ptr != NULL){
262 /* Copy in data up to this point */
263 g_string_append_len (newstr, oldptr, (ptr - oldptr));
264 /* Put in the replacement string */
265 g_string_append(newstr, replace);
266
267 oldptr = ptr + findlen;
268 /* BUG - len will now be wrong. But, i only wanted a strstr anyway :) */
269 ptr = g_strstr_len(oldptr, len, find);
270 }
271
272 /* Copy remains */
273 g_string_append_len (newstr, oldptr, (ptr - oldptr));
274
275 ptr = g_string_free(newstr, FALSE);
276
277 return ptr;
278 }
279
280
281 /* Takes a cmd before substitution, takes the characters to be substituted
282 * and a line for doign the substitution with. Fills in jobname
283 */
284 gchar *genexeccmd(gchar *cmd, gchar *fmt, gchar *line, gchar **jobname){
285 gchar *newexec, *ptr;
286 int x;
287 gchar *linesep = " ";
288 gchar *fmtsep = " ";
289
290 gchar **line_array;
291 gchar **fmt_array;
292
293 if ( fmt == NULL ){
294 /* No format given - we'll just append the options to the end of the command */
295 if(jobname != NULL) *jobname = g_strdup(line);
296 return g_strdup_printf("%s %s", cmd, line);
297 }
298
299 line_array = g_strsplit(line, linesep, 0);
300 fmt_array = g_strsplit(fmt, fmtsep, 0);
301
302 if(jobparsename != NULL){
303 if(jobname != NULL){
304 for(x=0; fmt_array[x] != NULL; x++){
305 if (line_array[x] == NULL) break;
306 if((strcmp(fmt_array[x], jobparsename) == 0)){
307 *jobname = g_strdup(line_array[x]);
308 break;
309 }
310 }
311 }
312 }else{
313 /* Not told us what they want.. We'll just use the first one */
314 *jobname = g_strdup(line_array[0]);
315 }
316
317 newexec = g_strdup(cmd);
318 for(x=0; line_array[x] != NULL; x++){
319 if (fmt_array[x] == NULL) break;
320 ptr = newexec;
321 newexec = strrep(newexec, fmt_array[x], line_array[x]);
322 free(ptr);
323 }
324
325
326 return newexec;
327 }
328
329 int main(int argc, char **argv){
330
331 GThreadPool *procpool;
332 GError *pp_err = NULL, *err = NULL;
333 gint x;
334
335 struct rlimit rlp;
336
337 GOptionContext *optcontext;
338
339 optcontext = g_option_context_new(" - parallel job executer");
340 g_option_context_add_main_entries(optcontext, options, NULL);
341 g_option_context_parse (optcontext, &argc, &argv, &err);
342
343 if(command == NULL){
344 g_printerr("Command required, see --help for more flags\n");
345 exit(1);
346 }
347
348 if(verbose){
349 g_printerr("Command '%s'\n", command);
350 g_printerr("Timeout '%d'\n", timeout);
351 g_printerr("Jobs '%d'\n", numthreads);
352 }
353
354
355 if(argc < 2 && arglist == NULL){
356 /* We have no arguments */
357 g_printerr("Missing arguments, see --help for details\n");
358 exit(1);
359 }
360
361 if (!g_thread_supported ()){
362 g_thread_init (NULL);
363 }else{
364 g_printerr("Threading not supported\n");
365 }
366
367 /* Up the number of FD's to the "hard" limit.
368 * This is mainly to get around the very small default
369 * solaris has, or 256
370 */
371 getrlimit(RLIMIT_NOFILE, &rlp);
372 rlp.rlim_cur = rlp.rlim_max;
373 setrlimit(RLIMIT_NOFILE, &rlp);
374
375 if(verbose) g_printerr("Creating a threadpool %d in size\n", numthreads);
376 procpool = g_thread_pool_new(process_child, NULL, numthreads, FALSE, &pp_err);
377
378 /* Generate the commands and push the job onto the thread pool */
379 /* If no substituion is needed */
380 if (arglist != NULL){
381 GIOChannel *f;
382 gchar *line;
383 GIOStatus status;
384
385 f = g_io_channel_new_file(arglist, "r", &err);
386 if (f == NULL){
387 g_printerr("Failed to open argfile: %s\n", err->message);
388 exit(1);
389 }
390 status = g_io_channel_read_line(f, &line, NULL, NULL, &err);
391 while(status==G_IO_STATUS_NORMAL){
392 process_t *newproc = g_new(process_t, 1);
393 newproc->err = NULL;
394
395 line = g_strstrip(line);
396
397 newproc->exec = genexeccmd(command, parseformat, line, &(newproc->jobname));
398 proclist = g_list_append(proclist, (gpointer) newproc);
399
400 if(verbose) g_printerr("Pushing command '%s' into thread pool queue\n", newproc->exec);
401 g_thread_pool_push(procpool, (gpointer) newproc, &(newproc->err));
402
403 status = g_io_channel_read_line(f, &line, NULL, NULL, &err);
404 }
405 g_io_channel_close(f);
406
407 }else{
408 /* substition is needed */
409 for(x=1; x<argc; x++){
410 process_t *newproc = g_new(process_t, 1);
411 newproc->err = NULL;
412
413 newproc->exec = genexeccmd(command, parseformat, argv[x], &(newproc->jobname));
414
415 proclist = g_list_append(proclist, (gpointer) newproc);
416
417 if(verbose) g_printerr("Pushing command '%s' into thread pool queue\n", newproc->exec);
418 g_thread_pool_push(procpool, (gpointer) newproc, &(newproc->err));
419 }
420 }
421
422
423 /* Wait for the jobs to finish */
424 /* TODO - Kill jobs that don't finish in time */
425 while(g_thread_pool_get_num_threads(procpool) > 0){
426 g_usleep(1000);
427 }
428
429 return 0;
430 }