diff options
author | Paul Eggert <eggert@CS.UCLA.EDU> | 2009-03-13 15:48:30 -0700 |
---|---|---|
committer | Jim Meyering <meyering@redhat.com> | 2009-03-18 21:44:37 +0100 |
commit | 8f7fae59727e0a050a4833705a6f7849ba2c4531 (patch) | |
tree | 7af8e87ff4424402ac12314a1d8f429a0dd23cba /src | |
parent | e6d2d9479495dff8520e577c221d5195eb9bb48b (diff) | |
download | coreutils-8f7fae59727e0a050a4833705a6f7849ba2c4531.tar.xz |
sort: handle fd exhaustion better when merging
This is an alternative to my 9 March patch labeled "Silently lower
nmerge; don't (sometimes incorrectly) range-check"
<http://lists.gnu.org/archive/html/bug-coreutils/2009-03/msg00070.html>.
It differs by not using 'dup' to probe for extra file descriptors;
instead, it simply calls 'open' (and 'pipe') to open files and pipes,
until one of these calls fails due to file descriptor exhaustion; it
then backs off by 1, does a merge with the files that it has opened,
and then retries with the (now-smaller) number of files.
This patch requires quite a few more changes to the source code than
the earlier patch, but it is in some sense "better" because it doesn't
need to call "dup" ahead of time in order to decide whether "open" or
"pipe" will fail. Also, it's more robust in the case where "open" or
"pipe" fails with errno==EMFILE because some system-wide limit is
exhausted.
* src/sort.c (create_temp_file): New arg SURVIVE_FD_EXHAUSTION.
(stream_open): New function, containing guts of xfopen.
(xfopen): Use it.
(pipe_fork): Set errno on failure.
(maybe_create_temp): New function, containing guts of create_temp.
(create_temp): Use it.
(open_temp): Distinguish failures due to file descriptor exhaustion
from other failures, and on fd exhaustion return a notice to caller
rather than dying. Don't test execlp's return value; when it returns,
it *always* returns -1.
(open_input_files): New function.
(mergefps): New arg FPS. It's now the caller's responsibility to open
the input and output files. All callers changed.
(mergefiles): New function.
(avoid_trashing_input, merge): Handle the case where a single merge
can't merge as much as we wanted due to file descriptor exhaustion, by
merging as much as we can and then retrying.
* tests/Makefile.am (TESTS): Add misc/sort-continue.
* tests/misc/sort-continue: New file.
* THANKS: Add Glen Lenker and Matt Pham who coauthored this patch.
Diffstat (limited to 'src')
-rw-r--r-- | src/sort.c | 292 |
1 files changed, 218 insertions, 74 deletions
diff --git a/src/sort.c b/src/sort.c index 7b0b06476..ced0f2ddf 100644 --- a/src/sort.c +++ b/src/sort.c @@ -732,10 +732,13 @@ exit_cleanup (void) } /* Create a new temporary file, returning its newly allocated tempnode. - Store into *PFD the file descriptor open for writing. */ + Store into *PFD the file descriptor open for writing. + If the creation fails, return NULL and store -1 into *PFD if the + failure is due to file descriptor exhaustion and + SURVIVE_FD_EXHAUSTION; otherwise, die. */ static struct tempnode * -create_temp_file (int *pfd) +create_temp_file (int *pfd, bool survive_fd_exhaustion) { static char const slashbase[] = "/sortXXXXXX"; static size_t temp_dir_index; @@ -768,8 +771,13 @@ create_temp_file (int *pfd) errno = saved_errno; if (fd < 0) - error (SORT_FAILURE, errno, _("cannot create temporary file in %s"), - quote (temp_dir)); + { + if (! (survive_fd_exhaustion && errno == EMFILE)) + error (SORT_FAILURE, errno, _("cannot create temporary file in %s"), + quote (temp_dir)); + free (node); + node = NULL; + } *pfd = fd; return node; @@ -779,27 +787,30 @@ create_temp_file (int *pfd) standard output; HOW should be "w". When opening for input, "-" means standard input. To avoid confusion, do not return file descriptors STDIN_FILENO, STDOUT_FILENO, or STDERR_FILENO when - opening an ordinary FILE. */ + opening an ordinary FILE. Return NULL if unsuccessful. */ static FILE * -xfopen (const char *file, const char *how) +stream_open (const char *file, const char *how) { - FILE *fp; - if (!file) - fp = stdout; - else if (STREQ (file, "-") && *how == 'r') + return stdout; + if (STREQ (file, "-") && *how == 'r') { have_read_stdin = true; - fp = stdin; - } - else - { - fp = fopen (file, how); - if (! fp) - die (_("open failed"), file); + return stdin; } + return fopen (file, how); +} + +/* Same as stream_open, except always return a non-null value; die on + failure. */ +static FILE * +xfopen (const char *file, const char *how) + { + FILE *fp = stream_open (file, how); + if (!fp) + die (_("open failed"), file); return fp; } @@ -838,7 +849,8 @@ dup2_or_die (int oldfd, int newfd) /* Fork a child process for piping to and do common cleanup. The TRIES parameter tells us how many times to try to fork before - giving up. Return the PID of the child or -1 if fork failed. */ + giving up. Return the PID of the child, or -1 (setting errno) + on failure. */ static pid_t pipe_fork (int pipefds[2], size_t tries) @@ -881,8 +893,10 @@ pipe_fork (int pipefds[2], size_t tries) if (pid < 0) { + saved_errno = errno; close (pipefds[0]); close (pipefds[1]); + errno = saved_errno; } else if (pid == 0) { @@ -900,15 +914,22 @@ pipe_fork (int pipefds[2], size_t tries) } /* Create a temporary file and start a compression program to filter output - to that file. Set *PFP to the file handle and if *PPID is non-NULL, - set it to the PID of the newly-created process. */ + to that file. Set *PFP to the file handle and if PPID is non-NULL, + set *PPID to the PID of the newly-created process. If the creation + fails, return NULL if the failure is due to file descriptor + exhaustion and SURVIVE_FD_EXHAUSTION; otherwise, die. */ static char * -create_temp (FILE **pfp, pid_t *ppid) +maybe_create_temp (FILE **pfp, pid_t *ppid, bool survive_fd_exhaustion) { int tempfd; - struct tempnode *node = create_temp_file (&tempfd); - char *name = node->name; + struct tempnode *node = create_temp_file (&tempfd, survive_fd_exhaustion); + char *name; + + if (! node) + return NULL; + + name = node->name; if (compress_program) { @@ -949,48 +970,68 @@ create_temp (FILE **pfp, pid_t *ppid) return name; } +/* Create a temporary file and start a compression program to filter output + to that file. Set *PFP to the file handle and if *PPID is non-NULL, + set it to the PID of the newly-created process. Die on failure. */ + +static char * +create_temp (FILE **pfp, pid_t *ppid) +{ + return maybe_create_temp (pfp, ppid, false); +} + /* Open a compressed temp file and start a decompression process through which to filter the input. PID must be the valid processes ID of the - process used to compress the file. */ + process used to compress the file. Return NULL (setting errno to + EMFILE) if we ran out of file descriptors, and die on any other + kind of failure. */ static FILE * open_temp (const char *name, pid_t pid) { int tempfd, pipefds[2]; - pid_t child_pid; - FILE *fp; + FILE *fp = NULL; wait_proc (pid); tempfd = open (name, O_RDONLY); if (tempfd < 0) - die (_("couldn't open temporary file"), name); + return NULL; - child_pid = pipe_fork (pipefds, MAX_FORK_TRIES_DECOMPRESS); - if (0 < child_pid) + switch (pipe_fork (pipefds, MAX_FORK_TRIES_DECOMPRESS)) { + case -1: + if (errno != EMFILE) + error (SORT_FAILURE, errno, _("couldn't create process for %s -d"), + compress_program); close (tempfd); - close (pipefds[1]); - } - else if (child_pid == 0) - { + errno = EMFILE; + break; + + case 0: close (pipefds[0]); dup2_or_die (tempfd, STDIN_FILENO); close (tempfd); dup2_or_die (pipefds[1], STDOUT_FILENO); close (pipefds[1]); - if (execlp (compress_program, compress_program, "-d", (char *) NULL) < 0) - error (SORT_FAILURE, errno, _("couldn't execute %s -d"), - compress_program); - } - else - error (SORT_FAILURE, errno, _("couldn't create process for %s -d"), - compress_program); + execlp (compress_program, compress_program, "-d", (char *) NULL); + error (SORT_FAILURE, errno, _("couldn't execute %s -d"), + compress_program); - fp = fdopen (pipefds[0], "r"); - if (! fp) - die (_("couldn't create temporary file"), name); + default: + close (tempfd); + close (pipefds[1]); + + fp = fdopen (pipefds[0], "r"); + if (! fp) + { + int saved_errno = errno; + close (pipefds[0]); + errno = saved_errno; + } + break; + } return fp; } @@ -2148,31 +2189,53 @@ check (char const *file_name, char checkonly) return ordered; } +/* Open FILES (there are NFILES of them) and store the resulting array + of stream pointers into (*PFPS). Allocate the array. Return the + number of successfully opened files, setting errno if this value is + less than NFILES. */ + +static size_t +open_input_files (struct sortfile *files, size_t nfiles, FILE ***pfps) +{ + FILE **fps = *pfps = xnmalloc (nfiles, sizeof *fps); + int i; + + /* Open as many input files as we can. */ + for (i = 0; i < nfiles; i++) + { + fps[i] = (files[i].pid + ? open_temp (files[i].name, files[i].pid) + : stream_open (files[i].name, "r")); + if (!fps[i]) + break; + } + + return i; +} + /* Merge lines from FILES onto OFP. NTEMPS is the number of temporary files (all of which are at the start of the FILES array), and NFILES is the number of files; 0 <= NTEMPS <= NFILES <= NMERGE. - Close input and output files before returning. + FPS is the vector of open stream corresponding to the files. + Close input and output streams before returning. OUTPUT_FILE gives the name of the output file. If it is NULL, - the output file is standard output. If OFP is NULL, the output - file has not been opened yet (or written to, if standard output). */ + the output file is standard output. */ static void mergefps (struct sortfile *files, size_t ntemps, size_t nfiles, - FILE *ofp, char const *output_file) + FILE *ofp, char const *output_file, FILE **fps) { - FILE **fps = xnmalloc (nmerge, sizeof *fps); - /* Input streams for each file. */ - struct buffer *buffer = xnmalloc (nmerge, sizeof *buffer); + struct buffer *buffer = xnmalloc (nfiles, sizeof *buffer); /* Input buffers for each file. */ struct line saved; /* Saved line storage for unique check. */ struct line const *savedline = NULL; /* &saved if there is a saved line. */ size_t savealloc = 0; /* Size allocated for the saved line. */ - struct line const **cur = xnmalloc (nmerge, sizeof *cur); + struct line const **cur = xnmalloc (nfiles, sizeof *cur); /* Current line in each line table. */ - struct line const **base = xnmalloc (nmerge, sizeof *base); + struct line const **base = xnmalloc (nfiles, sizeof *base); /* Base of each line table. */ - size_t *ord = xnmalloc (nmerge, sizeof *ord); + size_t *ord = xnmalloc (nfiles, sizeof *ord); /* Table representing a permutation of fps, such that cur[ord[0]] is the smallest line and will be next output. */ @@ -2185,9 +2248,6 @@ mergefps (struct sortfile *files, size_t ntemps, size_t nfiles, /* Read initial lines from each input file. */ for (i = 0; i < nfiles; ) { - fps[i] = (files[i].pid - ? open_temp (files[i].name, files[i].pid) - : xfopen (files[i].name, "r")); initbuf (&buffer[i], sizeof (struct line), MAX (merge_buffer_size, sort_size / nfiles)); if (fillbuf (&buffer[i], fps[i], files[i].name)) @@ -2209,13 +2269,13 @@ mergefps (struct sortfile *files, size_t ntemps, size_t nfiles, free (buffer[i].buf); --nfiles; for (j = i; j < nfiles; ++j) - files[j] = files[j + 1]; + { + files[j] = files[j + 1]; + fps[j] = fps[j + 1]; + } } } - if (! ofp) - ofp = xfopen (output_file, "w"); - /* Set up the ord table according to comparisons among input lines. Since this only reorders two items if one is strictly greater than the other, it is stable. */ @@ -2353,6 +2413,28 @@ mergefps (struct sortfile *files, size_t ntemps, size_t nfiles, free(cur); } +/* Merge lines from FILES onto OFP. NTEMPS is the number of temporary + files (all of which are at the start of the FILES array), and + NFILES is the number of files; 0 <= NTEMPS <= NFILES <= NMERGE. + Close input and output files before returning. + OUTPUT_FILE gives the name of the output file. + + Return the number of files successfully merged. This number can be + less than NFILES if we ran low on file descriptors, but in this + case it is never less than 2. */ + +static size_t +mergefiles (struct sortfile *files, size_t ntemps, size_t nfiles, + FILE *ofp, char const *output_file) +{ + FILE **fps; + size_t nopened = open_input_files (files, nfiles, &fps); + if (nopened < nfiles && nopened < 2) + die (_("open failed"), files[nopened].name); + mergefps (files, ntemps, nopened, ofp, output_file, fps); + return nopened; +} + /* Merge into T the two sorted arrays of lines LO (with NLO members) and HI (with NHI members). T, LO, and HI point just past their respective arrays, and the arrays are in reverse order. NLO and @@ -2519,10 +2601,19 @@ avoid_trashing_input (struct sortfile *files, size_t ntemps, FILE *tftp; pid_t pid; char *temp = create_temp (&tftp, &pid); - mergefps (&files[i],0, nfiles - i, tftp, temp); - files[i].name = temp; - files[i].pid = pid; - return i + 1; + size_t num_merged = 0; + while (i + num_merged < nfiles) + { + num_merged += mergefiles (&files[i], 0, nfiles - i, tftp, temp); + files[i].name = temp; + files[i].pid = pid; + + memmove(&files[i], &files[i + num_merged], + num_merged * sizeof *files); + ntemps += 1; + nfiles -= num_merged - 1;; + i += num_merged; + } } } @@ -2553,17 +2644,20 @@ merge (struct sortfile *files, size_t ntemps, size_t nfiles, /* Number of easily-available slots at the next loop iteration. */ size_t cheap_slots; - /* Do as many NMERGE-size merges as possible. */ - for (out = in = 0; out < nfiles / nmerge; out++, in += nmerge) + /* Do as many NMERGE-size merges as possible. In the case that + nmerge is bogus, increment by the maximum number of file + descriptors allowed. */ + for (out = in = 0; nmerge <= nfiles - in; out++) { FILE *tfp; pid_t pid; char *temp = create_temp (&tfp, &pid); - size_t nt = MIN (ntemps, nmerge); - ntemps -= nt; - mergefps (&files[in], nt, nmerge, tfp, temp); + size_t num_merged = mergefiles (&files[in], MIN (ntemps, nmerge), + nmerge, tfp, temp); + ntemps -= MIN (ntemps, num_merged); files[out].name = temp; files[out].pid = pid; + in += num_merged; } remainder = nfiles - in; @@ -2578,12 +2672,12 @@ merge (struct sortfile *files, size_t ntemps, size_t nfiles, FILE *tfp; pid_t pid; char *temp = create_temp (&tfp, &pid); - size_t nt = MIN (ntemps, nshortmerge); - ntemps -= nt; - mergefps (&files[in], nt, nshortmerge, tfp, temp); + size_t num_merged = mergefiles (&files[in], MIN (ntemps, nshortmerge), + nshortmerge, tfp, temp); + ntemps -= MIN (ntemps, num_merged); files[out].name = temp; files[out++].pid = pid; - in += nshortmerge; + in += num_merged; } /* Put the remaining input files into the last NMERGE-sized output @@ -2594,7 +2688,57 @@ merge (struct sortfile *files, size_t ntemps, size_t nfiles, } nfiles = avoid_trashing_input (files, ntemps, nfiles, output_file); - mergefps (files, ntemps, nfiles, NULL, output_file); + + /* We aren't guaranteed that this final mergefiles will work, therefore we + try to merge into the output, and then merge as much as we can into a + temp file if we can't. Repeat. */ + + for (;;) + { + /* Merge directly into the output file if possible. */ + FILE **fps; + size_t nopened = open_input_files (files, nfiles, &fps); + + if (nopened == nfiles) + { + FILE *ofp = stream_open (output_file, "w"); + if (ofp) + { + mergefps (files, ntemps, nfiles, ofp, output_file, fps); + break; + } + if (errno != EMFILE || nopened <= 2) + die (_("open failed"), output_file); + } + else if (nopened <= 2) + die (_("open failed"), files[nopened].name); + + /* We ran out of file descriptors. Close one of the input + files, to gain a file descriptor. Then create a temporary + file with our spare file descriptor. Retry if that failed + (e.g., some other process could open a file between the time + we closed and tried to create). */ + FILE *tfp; + pid_t pid; + char *temp; + do + { + nopened--; + xfclose (fps[nopened], files[nopened].name); + temp = maybe_create_temp (&tfp, &pid, ! (nopened <= 2)); + } + while (!temp); + + /* Merge into the newly allocated temporary. */ + mergefps (&files[0], MIN (ntemps, nopened), nopened, tfp, temp, fps); + ntemps -= MIN (ntemps, nopened); + files[0].name = temp; + files[0].pid = pid; + + memmove (&files[1], &files[nopened], (nfiles - nopened) * sizeof *files); + ntemps++; + nfiles -= nopened - 1; + } } /* Sort NFILES FILES onto OUTPUT_FILE. */ |