From f33168da952e13e4ee99e1531ac41643b78addd4 Mon Sep 17 00:00:00 2001 From: Dan Hipschman Date: Wed, 24 Jan 2007 00:00:21 +0100 Subject: * src/sort.c (MAX_FORK_RETRIES_COMPRESS, MAX_FORK_RETRIES_DECOMPRESS): In pipe_fork callers, use these named constants, not "2" and "8". (proctab, nprocs): Declare to be "static". (pipe_fork) [lint]: Initialize local, pid, to avoid unwarranted may-be-used-uninitialized warning. (create_temp): Use the active voice. Describe parameters, too. 2007-01-21 James Youngman Centralize all the uses of sigprocmask(). Don't restore an invalid saved mask. * src/sort.c (enter_cs, leave_cs): New functions for protecting code sequences against signal delivery. * (exit_cleanup): Use enter_cs and leave_cs instead of calling sigprocmask directly. (create_temp_file, pipe_fork, zaptemp): Likewise 2007-01-21 Dan Hipschman Add compression of temp files to sort. * NEWS: Mention this. * bootstrap.conf: Import findprog. * configure.ac: Add AC_FUNC_FORK. * doc/coreutils.texi: Document GNUSORT_COMPRESSOR environment variable. * src/sort.c (compress_program): New global, holds the name of the external compression program. (struct sortfile): New type used by mergepfs and friends instead of filenames to hold PIDs of compressor processes. (proctab): New global, holds compressor PIDs on which to wait. (enum procstate, struct procnode): New types used by proctab. (proctab_hasher, proctab_comparator): New functions for proctab. (nprocs): New global, number of forked but unreaped children. (reap, reap_some): New function, wait for/cleanup forked processes. (register_proc, update_proc, wait_proc): New functions for adding, modifying and removing proctab entries. (create_temp_file): Change parameter type to pointer to file descriptor, and return type to pointer to struct tempnode. (dup2_or_die): New function used in create_temp and open_temp. (pipe_fork): New function, creates a pipe and child process. (create_temp): Creates a temp file and possibly a compression program to which we filter output. (open_temp): Opens a compressed temp file and creates a decompression process through which to filter the input. (mergefps): Change FILES parameter type to struct sortfile array and update access accordingly. Use open_temp and reap_some. (avoid_trashing_input, merge): Change FILES parameter like mergefps and call create_temp instead of create_temp_file. (sort): Call create_temp instead of create_temp_file. Use reap_some. (avoid_trashing_input, merge, sort, main): Adapt to mergefps. --- src/sort.c | 515 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 473 insertions(+), 42 deletions(-) (limited to 'src') diff --git a/src/sort.c b/src/sort.c index 8a2279637..31421cff6 100644 --- a/src/sort.c +++ b/src/sort.c @@ -1,5 +1,5 @@ /* sort - sort lines of text (with all kinds of options). - Copyright (C) 1988, 1991-2006 Free Software Foundation, Inc. + Copyright (C) 1988, 1991-2007 Free Software Foundation, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -25,10 +25,13 @@ #include #include +#include #include #include "system.h" #include "error.h" +#include "findprog.h" #include "hard-locale.h" +#include "hash.h" #include "inttostr.h" #include "md5.h" #include "physmem.h" @@ -63,7 +66,8 @@ struct rlimit { size_t rlim_cur; }; present. */ #ifndef SA_NOCLDSTOP # define SA_NOCLDSTOP 0 -# define sigprocmask(How, Set, Oset) /* empty */ +/* No sigprocmask. Always 'return' zero. */ +# define sigprocmask(How, Set, Oset) (0) # define sigset_t int # if ! HAVE_SIGINTERRUPT # define siginterrupt(sig, flag) /* empty */ @@ -92,6 +96,20 @@ enum SORT_FAILURE = 2 }; +enum + { + /* The number of times we should try to fork a compression process + (we retry if the fork call fails). We don't _need_ to compress + temp files, this is just to reduce disk access, so this number + can be small. */ + MAX_FORK_TRIES_COMPRESS = 2, + + /* The number of times we should try to fork a decompression process. + If we can't fork a decompression process, we can't sort, so this + number should be big. */ + MAX_FORK_TRIES_DECOMPRESS = 8 + }; + /* The representation of the decimal point in the current locale. */ static int decimal_point; @@ -261,6 +279,9 @@ static bool have_read_stdin; /* List of key field comparisons to be tried. */ static struct keyfield *keylist; +/* Program used to (de)compress temp files. Must accept -d. */ +static const char *compress_program; + static void sortlines_temp (struct line *, size_t, struct line *); /* Report MESSAGE for FILE, then clean up and exit. @@ -399,15 +420,207 @@ static struct option const long_options[] = /* The set of signals that are caught. */ static sigset_t caught_signals; +/* Critical section status. */ +struct cs_status +{ + bool valid; + sigset_t sigs; +}; + +/* Enter a critical section. */ +static struct cs_status +cs_enter (void) +{ + struct cs_status status; + status.valid = (sigprocmask (SIG_BLOCK, &caught_signals, &status.sigs) == 0); + return status; +} + +/* Leave a critical section. */ +static void +cs_leave (struct cs_status status) +{ + if (status.valid) + { + /* Ignore failure when restoring the signal mask. */ + sigprocmask (SIG_SETMASK, &status.sigs, NULL); + } +} + /* The list of temporary files. */ struct tempnode { struct tempnode *volatile next; + pid_t pid; /* If compressed, the pid of compressor, else zero */ char name[1]; /* Actual size is 1 + file name length. */ }; static struct tempnode *volatile temphead; static struct tempnode *volatile *temptail = &temphead; +struct sortfile +{ + char *name; + pid_t pid; /* If compressed, the pid of compressor, else zero */ +}; + +/* A table where we store compression process states. We clean up all + processes in a timely manner so as not to exhaust system resources, + so we store the info on whether the process is still running, or has + been reaped here. */ +static Hash_table *proctab; + +enum { INIT_PROCTAB_SIZE = 47 }; + +enum procstate { ALIVE, ZOMBIE }; + +/* A proctab entry. The COUNT field is there in case we fork a new + compression process that has the same PID as an old zombie process + that is still in the table (because the process to decompress the + temp file it was associated with hasn't started yet). */ +struct procnode +{ + pid_t pid; + enum procstate state; + size_t count; +}; + +static size_t +proctab_hasher (const void *entry, size_t tabsize) +{ + const struct procnode *node = entry; + return node->pid % tabsize; +} + +static bool +proctab_comparator (const void *e1, const void *e2) +{ + const struct procnode *n1 = e1, *n2 = e2; + return n1->pid == n2->pid; +} + +/* The total number of forked processes (compressors and decompressors) + that have not been reaped yet. */ +static size_t nprocs; + +/* The number of child processes we'll allow before we try to reap some. */ +enum { MAX_PROCS_BEFORE_REAP = 2 }; + +/* If 0 < PID, wait for the child process with that PID to exit. + If PID is -1, clean up a random child process which has finished and + return the process ID of that child. If PID is -1 and no processes + have quit yet, return 0 without waiting. */ + +static pid_t +reap (pid_t pid) +{ + int status; + pid_t cpid = waitpid (pid, &status, pid < 0 ? WNOHANG : 0); + + if (cpid < 0) + error (SORT_FAILURE, errno, _("waiting for %s [-d]"), + compress_program); + else if (0 < cpid) + { + if (! WIFEXITED (status) || WEXITSTATUS (status)) + error (SORT_FAILURE, 0, _("%s [-d] terminated abnormally"), + compress_program); + --nprocs; + } + + return cpid; +} + +/* Add the PID of a running compression process to proctab, or update + the entry COUNT and STATE fields if it's already there. This also + creates the table for us the first time it's called. */ + +static void +register_proc (pid_t pid) +{ + struct procnode test, *node; + + if (! proctab) + { + proctab = hash_initialize (INIT_PROCTAB_SIZE, NULL, + proctab_hasher, + proctab_comparator, + free); + if (! proctab) + xalloc_die (); + } + + test.pid = pid; + node = hash_lookup (proctab, &test); + if (node) + { + node->state = ALIVE; + ++node->count; + } + else + { + node = xmalloc (sizeof *node); + node->pid = pid; + node->state = ALIVE; + node->count = 1; + hash_insert (proctab, node); + } +} + +/* This is called when we reap a random process. We don't know + whether we have reaped a compression process or a decompression + process until we look in the table. If there's an ALIVE entry for + it, then we have reaped a compression process, so change the state + to ZOMBIE. Otherwise, it's a decompression processes, so ignore it. */ + +static void +update_proc (pid_t pid) +{ + struct procnode test, *node; + + test.pid = pid; + node = hash_lookup (proctab, &test); + if (node) + node->state = ZOMBIE; +} + +/* This is for when we need to wait for a compression process to exit. + If it has a ZOMBIE entry in the table then it's already dead and has + been reaped. Note that if there's an ALIVE entry for it, it still may + already have died and been reaped if a second process was created with + the same PID. This is probably exceedingly rare, but to be on the safe + side we will have to wait for any compression process with this PID. */ + +static void +wait_proc (pid_t pid) +{ + struct procnode test, *node; + + test.pid = pid; + node = hash_lookup (proctab, &test); + if (node->state == ALIVE) + reap (pid); + + node->state = ZOMBIE; + if (! --node->count) + { + hash_delete (proctab, node); + free (node); + } +} + +/* Keep reaping finished children as long as there are more to reap. + This doesn't block waiting for any of them, it only reaps those + that are already dead. */ + +static void +reap_some (void) +{ + pid_t pid; + + while (0 < nprocs && (pid = reap (-1))) + update_proc (pid); +} + /* Clean up any remaining temporary files. */ static void @@ -429,24 +642,22 @@ exit_cleanup (void) { /* Clean up any remaining temporary files in a critical section so that a signal handler does not try to clean them too. */ - sigset_t oldset; - sigprocmask (SIG_BLOCK, &caught_signals, &oldset); + struct cs_status cs = cs_enter (); cleanup (); - sigprocmask (SIG_SETMASK, &oldset, NULL); + cs_leave (cs); } close_stdout (); } -/* Create a new temporary file, returning its newly allocated name. - Store into *PFP a stream open for writing. */ +/* Create a new temporary file, returning its newly allocated tempnode. + Store into *PFD the file descriptor open for writing. */ -static char * -create_temp_file (FILE **pfp) +static struct tempnode * +create_temp_file (int *pfd) { static char const slashbase[] = "/sortXXXXXX"; static size_t temp_dir_index; - sigset_t oldset; int fd; int saved_errno; char const *temp_dir = temp_dirs[temp_dir_index]; @@ -454,15 +665,17 @@ create_temp_file (FILE **pfp) struct tempnode *node = xmalloc (offsetof (struct tempnode, name) + len + sizeof slashbase); char *file = node->name; + struct cs_status cs; memcpy (file, temp_dir, len); memcpy (file + len, slashbase, sizeof slashbase); node->next = NULL; + node->pid = 0; if (++temp_dir_index == temp_dir_count) temp_dir_index = 0; /* Create the temporary file in a critical section, to avoid races. */ - sigprocmask (SIG_BLOCK, &caught_signals, &oldset); + cs = cs_enter (); fd = mkstemp (file); if (0 <= fd) { @@ -470,13 +683,14 @@ create_temp_file (FILE **pfp) temptail = &node->next; } saved_errno = errno; - sigprocmask (SIG_SETMASK, &oldset, NULL); + cs_leave (cs); errno = saved_errno; - if (fd < 0 || (*pfp = fdopen (fd, "w")) == NULL) + if (fd < 0) die (_("cannot create temporary file"), file); - return file; + *pfd = fd; + return node; } /* Return a stream for FILE, opened with mode HOW. A null FILE means @@ -533,6 +747,197 @@ xfclose (FILE *fp, char const *file) } } +static void +dup2_or_die (int oldfd, int newfd) +{ + if (dup2 (oldfd, newfd) < 0) + error (SORT_FAILURE, errno, _("dup2 failed")); +} + +/* Fork a child process for piping to and do common cleanup. The + TRIES parameter tells us how many times to try to fork before + giving up. Return the PID of the child or -1 if fork failed. */ + +static pid_t +pipe_fork (int pipefds[2], size_t tries) +{ +#if HAVE_WORKING_FORK + struct tempnode *saved_temphead; + int saved_errno; + unsigned int wait_retry = 1; + pid_t pid IF_LINT (= -1); + struct cs_status cs; + + if (pipe (pipefds) < 0) + return -1; + + while (tries--) + { + /* This is so the child process won't delete our temp files + if it receives a signal before exec-ing. */ + cs = cs_enter (); + saved_temphead = temphead; + temphead = NULL; + + pid = fork (); + saved_errno = errno; + if (pid) + temphead = saved_temphead; + + cs_leave (cs); + errno = saved_errno; + + if (0 <= pid || errno != EAGAIN) + break; + else + { + sleep (wait_retry); + wait_retry *= 2; + reap_some (); + } + } + + if (pid < 0) + { + close (pipefds[0]); + close (pipefds[1]); + } + else if (pid == 0) + { + close (STDIN_FILENO); + close (STDOUT_FILENO); + } + else + ++nprocs; + + return pid; + +#else /* ! HAVE_WORKING_FORK */ + return -1; +#endif +} + +/* Create a temporary file and start a compression program to filter output + to that file. Set *PFP to the file handle and if *PPID is non-NULL, + set it to the PID of the newly-created process. */ + +static char * +create_temp (FILE **pfp, pid_t *ppid) +{ + static bool compress_program_known; + int tempfd; + struct tempnode *node = create_temp_file (&tempfd); + char *name = node->name; + + if (! compress_program_known) + { + compress_program = getenv ("GNUSORT_COMPRESSOR"); + if (compress_program == NULL) + { + static const char *default_program = "gzip"; + const char *path_program = find_in_path (default_program); + + if (path_program != default_program) + { + if (access (path_program, X_OK) == 0) + compress_program = path_program; + else + free ((char *) path_program); + } + } + else if (*compress_program == '\0') + compress_program = NULL; + + compress_program_known = true; + } + + if (compress_program) + { + int pipefds[2]; + + node->pid = pipe_fork (pipefds, MAX_FORK_TRIES_COMPRESS); + if (0 < node->pid) + { + close (tempfd); + close (pipefds[0]); + tempfd = pipefds[1]; + + register_proc (node->pid); + } + else if (node->pid == 0) + { + close (pipefds[1]); + dup2_or_die (tempfd, STDOUT_FILENO); + close (tempfd); + dup2_or_die (pipefds[0], STDIN_FILENO); + close (pipefds[0]); + + if (execlp (compress_program, compress_program, + (char *) NULL) < 0) + error (SORT_FAILURE, errno, _("couldn't execute %s"), + compress_program); + } + else + node->pid = 0; + } + + *pfp = fdopen (tempfd, "w"); + if (! *pfp) + die (_("couldn't create temporary file"), name); + + if (ppid) + *ppid = node->pid; + + return name; +} + +/* Open a compressed temp file and start a decompression process through + which to filter the input. PID must be the valid processes ID of the + process used to compress the file. */ + +static FILE * +open_temp (const char *name, pid_t pid) +{ + int tempfd, pipefds[2]; + pid_t child_pid; + FILE *fp; + + wait_proc (pid); + + tempfd = open (name, O_RDONLY); + if (tempfd < 0) + die (_("couldn't open temporary file"), name); + + child_pid = pipe_fork (pipefds, MAX_FORK_TRIES_DECOMPRESS); + if (0 < child_pid) + { + close (tempfd); + close (pipefds[1]); + } + else if (child_pid == 0) + { + close (pipefds[0]); + dup2_or_die (tempfd, STDIN_FILENO); + close (tempfd); + dup2_or_die (pipefds[1], STDOUT_FILENO); + close (pipefds[1]); + + if (execlp (compress_program, compress_program, + "-d", (char *) NULL) < 0) + error (SORT_FAILURE, errno, _("couldn't execute %s -d"), + compress_program); + } + else + error (SORT_FAILURE, errno, _("couldn't create process for %s -d"), + compress_program); + + fp = fdopen (pipefds[0], "r"); + if (! fp) + die (_("couldn't create temporary file"), name); + + return fp; +} + static void write_bytes (const char *buf, size_t n_bytes, FILE *fp, const char *output_file) { @@ -558,20 +963,20 @@ zaptemp (const char *name) struct tempnode *volatile *pnode; struct tempnode *node; struct tempnode *next; - sigset_t oldset; int unlink_status; int unlink_errno = 0; + struct cs_status cs; for (pnode = &temphead; (node = *pnode)->name != name; pnode = &node->next) continue; /* Unlink the temporary file in a critical section to avoid races. */ next = node->next; - sigprocmask (SIG_BLOCK, &caught_signals, &oldset); + cs = cs_enter (); unlink_status = unlink (name); unlink_errno = errno; *pnode = next; - sigprocmask (SIG_SETMASK, &oldset, NULL); + cs_leave (cs); if (unlink_status != 0) error (0, unlink_errno, _("warning: cannot remove: %s"), name); @@ -1605,7 +2010,7 @@ check (char const *file_name) file has not been opened yet (or written to, if standard output). */ static void -mergefps (char **files, size_t ntemps, size_t nfiles, +mergefps (struct sortfile *files, size_t ntemps, size_t nfiles, FILE *ofp, char const *output_file) { FILE *fps[NMERGE]; /* Input streams for each file. */ @@ -1628,10 +2033,12 @@ mergefps (char **files, size_t ntemps, size_t nfiles, /* Read initial lines from each input file. */ for (i = 0; i < nfiles; ) { - fps[i] = xfopen (files[i], "r"); + fps[i] = (files[i].pid + ? open_temp (files[i].name, files[i].pid) + : xfopen (files[i].name, "r")); initbuf (&buffer[i], sizeof (struct line), MAX (merge_buffer_size, sort_size / nfiles)); - if (fillbuf (&buffer[i], fps[i], files[i])) + if (fillbuf (&buffer[i], fps[i], files[i].name)) { struct line const *linelim = buffer_linelim (&buffer[i]); cur[i] = linelim - 1; @@ -1641,11 +2048,11 @@ mergefps (char **files, size_t ntemps, size_t nfiles, else { /* fps[i] is empty; eliminate it from future consideration. */ - xfclose (fps[i], files[i]); + xfclose (fps[i], files[i].name); if (i < ntemps) { ntemps--; - zaptemp (files[i]); + zaptemp (files[i].name); } free (buffer[i].buf); --nfiles; @@ -1714,7 +2121,7 @@ mergefps (char **files, size_t ntemps, size_t nfiles, cur[ord[0]] = smallest - 1; else { - if (fillbuf (&buffer[ord[0]], fps[ord[0]], files[ord[0]])) + if (fillbuf (&buffer[ord[0]], fps[ord[0]], files[ord[0]].name)) { struct line const *linelim = buffer_linelim (&buffer[ord[0]]); cur[ord[0]] = linelim - 1; @@ -1727,11 +2134,11 @@ mergefps (char **files, size_t ntemps, size_t nfiles, if (ord[i] > ord[0]) --ord[i]; --nfiles; - xfclose (fps[ord[0]], files[ord[0]]); + xfclose (fps[ord[0]], files[ord[0]].name); if (ord[0] < ntemps) { ntemps--; - zaptemp (files[ord[0]]); + zaptemp (files[ord[0]].name); } free (buffer[ord[0]].buf); for (i = ord[0]; i < nfiles; ++i) @@ -1774,6 +2181,10 @@ mergefps (char **files, size_t ntemps, size_t nfiles, ord[j] = ord[j + 1]; ord[count_of_smaller_lines] = ord0; } + + /* Free up some resources every once in a while. */ + if (MAX_PROCS_BEFORE_REAP < nprocs) + reap_some (); } if (unique && savedline) @@ -1912,8 +2323,8 @@ sortlines_temp (struct line *lines, size_t nlines, struct line *temp) common cases. */ static size_t -avoid_trashing_input (char **files, size_t ntemps, size_t nfiles, - char const *outfile) +avoid_trashing_input (struct sortfile *files, size_t ntemps, + size_t nfiles, char const *outfile) { size_t i; bool got_outstat = false; @@ -1921,11 +2332,11 @@ avoid_trashing_input (char **files, size_t ntemps, size_t nfiles, for (i = ntemps; i < nfiles; i++) { - bool is_stdin = STREQ (files[i], "-"); + bool is_stdin = STREQ (files[i].name, "-"); bool same; struct stat instat; - if (outfile && STREQ (outfile, files[i]) && !is_stdin) + if (outfile && STREQ (outfile, files[i].name) && !is_stdin) same = true; else { @@ -1941,7 +2352,7 @@ avoid_trashing_input (char **files, size_t ntemps, size_t nfiles, same = (((is_stdin ? fstat (STDIN_FILENO, &instat) - : stat (files[i], &instat)) + : stat (files[i].name, &instat)) == 0) && SAME_INODE (instat, outstat)); } @@ -1949,9 +2360,11 @@ avoid_trashing_input (char **files, size_t ntemps, size_t nfiles, if (same) { FILE *tftp; - char *temp = create_temp_file (&tftp); - mergefps (&files[i], 0, nfiles - i, tftp, temp); - files[i] = temp; + pid_t pid; + char *temp = create_temp (&tftp, &pid); + mergefps (&files[i],0, nfiles - i, tftp, temp); + files[i].name = temp; + files[i].pid = pid; return i + 1; } } @@ -1965,7 +2378,8 @@ avoid_trashing_input (char **files, size_t ntemps, size_t nfiles, OUTPUT_FILE; a null OUTPUT_FILE stands for standard output. */ static void -merge (char **files, size_t ntemps, size_t nfiles, char const *output_file) +merge (struct sortfile *files, size_t ntemps, size_t nfiles, + char const *output_file) { while (NMERGE < nfiles) { @@ -1986,11 +2400,13 @@ merge (char **files, size_t ntemps, size_t nfiles, char const *output_file) for (out = in = 0; out < nfiles / NMERGE; out++, in += NMERGE) { FILE *tfp; - char *temp = create_temp_file (&tfp); + pid_t pid; + char *temp = create_temp (&tfp, &pid); size_t nt = MIN (ntemps, NMERGE); ntemps -= nt; mergefps (&files[in], nt, NMERGE, tfp, temp); - files[out] = temp; + files[out].name = temp; + files[out].pid = pid; } remainder = nfiles - in; @@ -2003,11 +2419,13 @@ merge (char **files, size_t ntemps, size_t nfiles, char const *output_file) files as possible, to avoid needless I/O. */ size_t nshortmerge = remainder - cheap_slots + 1; FILE *tfp; - char *temp = create_temp_file (&tfp); + pid_t pid; + char *temp = create_temp (&tfp, &pid); size_t nt = MIN (ntemps, nshortmerge); ntemps -= nt; mergefps (&files[in], nt, nshortmerge, tfp, temp); - files[out++] = temp; + files[out].name = temp; + files[out++].pid = pid; in += nshortmerge; } @@ -2079,7 +2497,7 @@ sort (char * const *files, size_t nfiles, char const *output_file) else { ++ntemps; - temp_output = create_temp_file (&tfp); + temp_output = create_temp (&tfp, NULL); } do @@ -2094,6 +2512,10 @@ sort (char * const *files, size_t nfiles, char const *output_file) xfclose (tfp, temp_output); + /* Free up some resources every once in a while. */ + if (MAX_PROCS_BEFORE_REAP < nprocs) + reap_some (); + if (output_file_created) goto finish; } @@ -2107,10 +2529,11 @@ sort (char * const *files, size_t nfiles, char const *output_file) { size_t i; struct tempnode *node = temphead; - char **tempfiles = xnmalloc (ntemps, sizeof *tempfiles); + struct sortfile *tempfiles = xnmalloc (ntemps, sizeof *tempfiles); for (i = 0; node; i++) { - tempfiles[i] = node->name; + tempfiles[i].name = node->name; + tempfiles[i].pid = node->pid; node = node->next; } merge (tempfiles, ntemps, ntemps, output_file); @@ -2717,7 +3140,15 @@ main (int argc, char **argv) } if (mergeonly) - merge (files, 0, nfiles, outfile); + { + struct sortfile *sortfiles = xcalloc (nfiles, sizeof *sortfiles); + size_t i; + + for (i = 0; i < nfiles; ++i) + sortfiles[i].name = files[i]; + + merge (sortfiles, 0, nfiles, outfile); + } else sort (files, nfiles, outfile); -- cgit v1.2.3-54-g00ecf