From a09d9e5badacd08058a23dea97bec4e93c0e689a Mon Sep 17 00:00:00 2001 From: Karl Heuer Date: Fri, 29 Apr 2011 12:23:27 +0200 Subject: split: accept new output --filter=CMD option * src/split.c: Include , and "sig2str.h". (FILTER_OPTION): New anonymous enum member. (filter_command, filter_pid): New globals. (open_pipes, open_pipes_alloc, n_open_pipes): Likewise. (oldblocked, newblocked): Likewise. (longopts): Add "filter". (usage): Document --filter. (create): Extend to create a pipe and fork "sh -c CMD". (closeout): Adapt to close a pipe and wait for child process. (cwrite): Call closeout, not just close. (lines_chunk_split): FIXME (bytes_chunk_extract): FIXME (opid, ofile_open, lines_rr, main): FIXME (ignorable): New function, to encapsulate EPIPE test. --- src/split.c | 227 ++++++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 204 insertions(+), 23 deletions(-) (limited to 'src/split.c') diff --git a/src/split.c b/src/split.c index 65e44ddb6..05315e6fb 100644 --- a/src/split.c +++ b/src/split.c @@ -25,7 +25,9 @@ #include #include #include +#include #include +#include #include "system.h" #include "error.h" @@ -35,6 +37,7 @@ #include "full-write.h" #include "quote.h" #include "safe-read.h" +#include "sig2str.h" #include "xfreopen.h" #include "xstrtol.h" @@ -45,6 +48,21 @@ proper_name_utf8 ("Torbjorn Granlund", "Torbj\303\266rn Granlund"), \ proper_name ("Richard M. Stallman") +/* Shell command to filter through, instead of creating files. */ +static char const *filter_command; + +/* Process ID of the filter. */ +static int filter_pid; + +/* Array of open pipes. */ +static int *open_pipes; +static size_t open_pipes_alloc; +static size_t n_open_pipes; + +/* Blocked signals. */ +static sigset_t oldblocked; +static sigset_t newblocked; + /* Base name of output files. */ static char const *outbase; @@ -90,6 +108,7 @@ enum Split_type enum { VERBOSE_OPTION = CHAR_MAX + 1, + FILTER_OPTION, IO_BLKSIZE_OPTION }; @@ -103,6 +122,7 @@ static struct option const longopts[] = {"unbuffered", no_argument, NULL, 'u'}, {"suffix-length", required_argument, NULL, 'a'}, {"numeric-suffixes", no_argument, NULL, 'd'}, + {"filter", required_argument, NULL, FILTER_OPTION}, {"verbose", no_argument, NULL, VERBOSE_OPTION}, {"-io-blksize", required_argument, NULL, IO_BLKSIZE_OPTION}, /* do not document */ @@ -111,6 +131,13 @@ static struct option const longopts[] = {NULL, 0, NULL, 0} }; +/* Return true if the errno value, ERR, is ignorable. */ +static inline bool +ignorable (int err) +{ + return filter_command && err == EPIPE; +} + static void set_suffix_length (uintmax_t n_units, enum Split_type split_type) { @@ -170,6 +197,7 @@ Mandatory arguments to long options are mandatory for short options too.\n\ -C, --line-bytes=SIZE put at most SIZE bytes of lines per output file\n\ -d, --numeric-suffixes use numeric suffixes instead of alphabetic\n\ -e, --elide-empty-files do not generate empty output files with `-n'\n\ + --filter=COMMAND write to shell COMMAND; file name is $FILE\n\ -l, --lines=NUMBER put NUMBER lines per output file\n\ -n, --number=CHUNKS generate CHUNKS output files. See below\n\ -u, --unbuffered immediately copy input to output with `-n r/...'\n\ @@ -256,10 +284,123 @@ next_file_name (void) static int create (const char *name) { - if (verbose) - fprintf (stdout, _("creating file %s\n"), quote (name)); - return open (name, O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, - (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH)); + if (!filter_command) + { + if (verbose) + fprintf (stdout, _("creating file %s\n"), quote (name)); + return open (name, O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, + (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH)); + } + else + { + int fd_pair[2]; + pid_t child_pid; + char const *shell_prog = getenv ("SHELL"); + if (shell_prog == NULL) + shell_prog = "/bin/sh"; + if (setenv ("FILE", name, 1) != 0) + error (EXIT_FAILURE, errno, + _("failed to set FILE environment variable")); + if (verbose) + fprintf (stdout, _("executing with FILE=%s\n"), quote (name)); + if (pipe (fd_pair) != 0) + error (EXIT_FAILURE, errno, _("failed to create pipe")); + child_pid = fork (); + if (child_pid == 0) + { + /* This is the child process. If an error occurs here, the + parent will eventually learn about it after doing a wait, + at which time it will emit its own error message. */ + int j; + /* We have to close any pipes that were opened during an + earlier call, otherwise this process will be holding a + write-pipe that will prevent the earlier process from + reading an EOF on the corresponding read-pipe. */ + for (j = 0; j < n_open_pipes; ++j) + if (close (open_pipes[j]) != 0) + error (EXIT_FAILURE, errno, _("closing prior pipe")); + if (close (fd_pair[1])) + error (EXIT_FAILURE, errno, _("closing output pipe")); + if (fd_pair[0] != STDIN_FILENO) + { + if (dup2 (fd_pair[0], STDIN_FILENO) != STDIN_FILENO) + error (EXIT_FAILURE, errno, _("moving input pipe")); + if (close (fd_pair[0]) != 0) + error (EXIT_FAILURE, errno, _("closing input pipe")); + } + sigprocmask (SIG_SETMASK, &oldblocked, NULL); + execl (shell_prog, last_component (shell_prog), "-c", + filter_command, (char *) NULL); + error (EXIT_FAILURE, errno, _("failed to run command: \"%s -c %s\""), + shell_prog, filter_command); + } + if (child_pid == -1) + error (EXIT_FAILURE, errno, _("fork system call failed")); + if (close (fd_pair[0]) != 0) + error (EXIT_FAILURE, errno, _("failed to close input pipe")); + filter_pid = child_pid; + if (n_open_pipes == open_pipes_alloc) + open_pipes = x2nrealloc (open_pipes, &open_pipes_alloc, + sizeof *open_pipes); + open_pipes[n_open_pipes++] = fd_pair[1]; + return fd_pair[1]; + } +} + +/* Close the output file, and do any associated cleanup. + If FP and FD are both specified, they refer to the same open file; + in this case FP is closed, but FD is still used in cleanup. */ +static void +closeout (FILE *fp, int fd, pid_t pid, char const *name) +{ + if (fp != NULL && fclose (fp) != 0 && ! ignorable (errno)) + error (EXIT_FAILURE, errno, "%s", name); + if (fd >= 0) + { + if (fp == NULL && close (fd) < 0) + error (EXIT_FAILURE, errno, "%s", name); + int j; + for (j = 0; j < n_open_pipes; ++j) + { + if (open_pipes[j] == fd) + { + open_pipes[j] = open_pipes[--n_open_pipes]; + break; + } + } + } + if (pid > 0) + { + int wstatus = 0; + if (waitpid (pid, &wstatus, 0) == -1 && errno != ECHILD) + error (EXIT_FAILURE, errno, _("waiting for child process")); + if (WIFSIGNALED (wstatus)) + { + int sig = WTERMSIG (wstatus); + if (sig != SIGPIPE) + { + char signame[MAX (SIG2STR_MAX, INT_BUFSIZE_BOUND (int))]; + if (sig2str (sig, signame) != 0) + sprintf (signame, "%d", sig); + error (sig + 128, 0, + _("with FILE=%s, signal %s (%s) from command: %s"), + name, signame, strsignal (sig), filter_command); + } + } + else if (WIFEXITED (wstatus)) + { + int ex = WEXITSTATUS (wstatus); + if (ex != 0) + error (ex, 0, _("with FILE=%s, exit %d from command: %s"), + name, ex, filter_command); + } + else + { + /* shouldn't happen. */ + error (EXIT_FAILURE, 0, + _("unknown status from command (0x%X)"), wstatus); + } + } } /* Write BYTES bytes at BP to an output file. @@ -273,13 +414,12 @@ cwrite (bool new_file_flag, const char *bp, size_t bytes) { if (!bp && bytes == 0 && elide_empty_files) return; - if (output_desc >= 0 && close (output_desc) < 0) - error (EXIT_FAILURE, errno, "%s", outfile); + closeout (NULL, output_desc, filter_pid, outfile); next_file_name (); if ((output_desc = create (outfile)) < 0) error (EXIT_FAILURE, errno, "%s", outfile); } - if (full_write (output_desc, bp, bytes) != bytes) + if (full_write (output_desc, bp, bytes) != bytes && ! ignorable (errno)) error (EXIT_FAILURE, errno, "%s", outfile); } @@ -501,7 +641,8 @@ lines_chunk_split (uintmax_t k, uintmax_t n, char *buf, size_t bufsize, /* We don't use the stdout buffer here since we're writing large chunks from an existing file, so it's more efficient to write out directly. */ - if (full_write (STDOUT_FILENO, bp, to_write) != to_write) + if (full_write (STDOUT_FILENO, bp, to_write) != to_write + && ! ignorable (errno)) error (EXIT_FAILURE, errno, "%s", _("write error")); } else @@ -564,7 +705,8 @@ bytes_chunk_extract (uintmax_t k, uintmax_t n, char *buf, size_t bufsize, error (EXIT_FAILURE, errno, "%s", infile); else if (n_read == 0) break; /* eof. */ - if (full_write (STDOUT_FILENO, buf, n_read) != n_read) + if (full_write (STDOUT_FILENO, buf, n_read) != n_read + && ! ignorable (errno)) error (EXIT_FAILURE, errno, "%s", quote ("-")); start += n_read; } @@ -575,6 +717,7 @@ typedef struct of_info char *of_name; int ofd; FILE *ofile; + int opid; } of_t; enum @@ -637,14 +780,17 @@ ofile_open (of_t *files, size_t i_check, size_t nfiles) error (EXIT_FAILURE, errno, "%s", files[i_check].of_name); } - if (fclose (files[i_reopen].ofile) != 0) + if (fclose (files[i_reopen].ofile) != 0 && ! ignorable (errno)) error (EXIT_FAILURE, errno, "%s", files[i_reopen].of_name); + files[i_reopen].ofile = NULL; files[i_reopen].ofd = OFD_APPEND; } files[i_check].ofd = fd; if (!(files[i_check].ofile = fdopen (fd, "a"))) error (EXIT_FAILURE, errno, "%s", files[i_check].of_name); + files[i_check].opid = filter_pid; + filter_pid = 0; } return file_limit; @@ -658,6 +804,7 @@ ofile_open (of_t *files, size_t i_check, size_t nfiles) static void lines_rr (uintmax_t k, uintmax_t n, char *buf, size_t bufsize) { + bool wrapped = false; bool file_limit; size_t i_file; of_t *files IF_LINT (= NULL); @@ -678,6 +825,7 @@ lines_rr (uintmax_t k, uintmax_t n, char *buf, size_t bufsize) files[i_file].of_name = xstrdup (outfile); files[i_file].ofd = OFD_NEW; files[i_file].ofile = NULL; + files[i_file].opid = 0; } i_file = 0; file_limit = false; @@ -715,10 +863,12 @@ lines_rr (uintmax_t k, uintmax_t n, char *buf, size_t bufsize) { if (line_no == k && unbuffered) { - if (full_write (STDOUT_FILENO, bp, to_write) != to_write) + if (full_write (STDOUT_FILENO, bp, to_write) != to_write + && ! ignorable (errno)) error (EXIT_FAILURE, errno, "%s", _("write error")); } - else if (line_no == k && fwrite (bp, to_write, 1, stdout) != 1) + else if (line_no == k && fwrite (bp, to_write, 1, stdout) != 1 + && ! ignorable (errno)) { clearerr (stdout); /* To silence close_stdout(). */ error (EXIT_FAILURE, errno, "%s", _("write error")); @@ -734,19 +884,25 @@ lines_rr (uintmax_t k, uintmax_t n, char *buf, size_t bufsize) { /* Note writing to fd, rather than flushing the FILE gives an 8% performance benefit, due to reduced data copying. */ - if (full_write (files[i_file].ofd, bp, to_write) != to_write) + if (full_write (files[i_file].ofd, bp, to_write) != to_write + && ! ignorable (errno)) error (EXIT_FAILURE, errno, "%s", files[i_file].of_name); } - else if (fwrite (bp, to_write, 1, files[i_file].ofile) != 1) + else if (fwrite (bp, to_write, 1, files[i_file].ofile) != 1 + && ! ignorable (errno)) error (EXIT_FAILURE, errno, "%s", files[i_file].of_name); if (file_limit) { - if (fclose (files[i_file].ofile) != 0) + if (fclose (files[i_file].ofile) != 0 && ! ignorable (errno)) error (EXIT_FAILURE, errno, "%s", files[i_file].of_name); + files[i_file].ofile = NULL; files[i_file].ofd = OFD_APPEND; } if (next && ++i_file == n) - i_file = 0; + { + wrapped = true; + i_file = 0; + } } bp = bp_out; @@ -757,11 +913,18 @@ lines_rr (uintmax_t k, uintmax_t n, char *buf, size_t bufsize) and to signal any waiting fifo consumers. Also, close any open file descriptors. FIXME: Should we do this before EXIT_FAILURE? */ - for (i_file = 0; !k && !elide_empty_files && i_file < n; i_file++) + if (!k) { - file_limit |= ofile_open (files, i_file, n); - if (fclose (files[i_file].ofile) != 0) - error (EXIT_FAILURE, errno, "%s", files[i_file].of_name); + int ceiling = (wrapped ? n : i_file); + for (i_file = 0; i_file < n; i_file++) + { + if (i_file >= ceiling && !elide_empty_files) + file_limit |= ofile_open (files, i_file, n); + if (files[i_file].ofd >= 0) + closeout (files[i_file].ofile, files[i_file].ofd, + files[i_file].opid, files[i_file].of_name); + files[i_file].ofd = OFD_APPEND; + } } } @@ -824,7 +987,8 @@ main (int argc, char **argv) int this_optind = optind ? optind : 1; char *slash; - c = getopt_long (argc, argv, "0123456789C:a:b:del:n:u", longopts, NULL); + c = getopt_long (argc, argv, "0123456789C:a:b:del:n:u", + longopts, NULL); if (c == -1) break; @@ -955,6 +1119,10 @@ main (int argc, char **argv) elide_empty_files = true; break; + case FILTER_OPTION: + filter_command = optarg; + break; + case IO_BLKSIZE_OPTION: { uintmax_t tmp_blk_size; @@ -1048,6 +1216,18 @@ main (int argc, char **argv) buf = ptr_align (xmalloc (in_blk_size + 1 + page_size - 1), page_size); + /* When filtering, closure of one pipe must not terminate the process, + as there may still be other streams expecting input from us. */ + sigemptyset (&newblocked); + if (filter_command) + { + struct sigaction act; + sigaction (SIGPIPE, NULL, &act); + if (act.sa_handler != SIG_IGN) + sigaddset (&newblocked, SIGPIPE); + } + sigprocmask (SIG_BLOCK, &newblocked, &oldblocked); + switch (split_type) { case type_digits: @@ -1084,10 +1264,11 @@ main (int argc, char **argv) abort (); } + sigprocmask (SIG_SETMASK, &oldblocked, NULL); + if (close (STDIN_FILENO) != 0) error (EXIT_FAILURE, errno, "%s", infile); - if (output_desc >= 0 && close (output_desc) < 0) - error (EXIT_FAILURE, errno, "%s", outfile); + closeout (NULL, output_desc, filter_pid, outfile); exit (EXIT_SUCCESS); } -- cgit v1.2.3-54-g00ecf