From 20d7bce0f7e57d9a98f0ee811e31c757e9fedfff Mon Sep 17 00:00:00 2001 From: Assaf Gordon Date: Wed, 6 Mar 2013 18:25:49 -0500 Subject: shuf: use reservoir-sampling for large or unknown sized inputs Reservoir sampling optimizes selecting K random lines from large or unknown-sized input: http://en.wikipedia.org/wiki/Reservoir_sampling Note this also avoids reading any input when -n0 is specified. * src/shuf.c (main): Use reservoir-sampling when the number of output lines is known, and the input size is large or unknown. (input_size): A new function to get the input size for regular files. (read_input_reservoir_sampling): New function to read lines from input, keeping only K lines in memory, replacing lines with decreasing prob. (write_permuted_output_reservoir): New function to output reservoir. * tests/misc/shuf-reservoir.sh: An expensive_ test using valgrind to exercise the reservoir-sampling code. * tests/local.mk: Reference new test. * NEWS: Mention the improvement. --- src/shuf.c | 177 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 170 insertions(+), 7 deletions(-) (limited to 'src') diff --git a/src/shuf.c b/src/shuf.c index 71ac3e60c..bbf3a86c2 100644 --- a/src/shuf.c +++ b/src/shuf.c @@ -25,6 +25,7 @@ #include "error.h" #include "fadvise.h" #include "getopt.h" +#include "linebuffer.h" #include "quote.h" #include "quotearg.h" #include "randint.h" @@ -38,6 +39,18 @@ #define AUTHORS proper_name ("Paul Eggert") +/* For reservoir-sampling, allocate the reservoir lines in batches. */ +enum { RESERVOIR_LINES_INCREMENT = 1024 }; + +/* reservoir-sampling introduces CPU overhead for small inputs. + So only enable it for inputs >= this limit. + This limit was determined using these commands: + $ for p in $(seq 7); do src/seq $((10**$p)) > 10p$p.in; done + $ for p in $(seq 7); do time shuf-nores -n10 10p$p.in >/dev/null; done + $ for p in $(seq 7); do time shuf -n10 10p$p.in >/dev/null; done .*/ +enum { RESERVOIR_MIN_INPUT = 8192 * 1024 }; + + void usage (int status) { @@ -135,6 +148,114 @@ next_line (char *line, char eolbyte, size_t n) return p + 1; } +/* Return the size of the input if possible or OFF_T_MAX if not. */ + +static off_t +input_size (void) +{ + off_t file_size; + + struct stat stat_buf; + if (fstat (STDIN_FILENO, &stat_buf) != 0) + return OFF_T_MAX; + if (usable_st_size (&stat_buf)) + file_size = stat_buf.st_size; + else + return OFF_T_MAX; + + off_t input_offset = lseek (STDIN_FILENO, 0, SEEK_CUR); + if (input_offset < 0) + return OFF_T_MAX; + + file_size -= input_offset; + + return file_size; +} + +/* Read all lines and store up to K permuted lines in *OUT_RSRV. + Return the number of lines read, up to a maximum of K. */ + +static size_t +read_input_reservoir_sampling (FILE *in, char eolbyte, size_t k, + struct randint_source *s, + struct linebuffer **out_rsrv) +{ + randint n_lines = 0; + size_t n_alloc_lines = MIN (k, RESERVOIR_LINES_INCREMENT); + struct linebuffer *line = NULL; + struct linebuffer *rsrv; + + rsrv = xcalloc (n_alloc_lines, sizeof (struct linebuffer)); + + /* Fill the first K lines, directly into the reservoir. */ + while (n_lines < k + && (line = + readlinebuffer_delim (&rsrv[n_lines], in, eolbyte)) != NULL) + { + n_lines++; + + /* Enlarge reservoir. */ + if (n_lines >= n_alloc_lines) + { + n_alloc_lines += RESERVOIR_LINES_INCREMENT; + rsrv = xnrealloc (rsrv, n_alloc_lines, sizeof (struct linebuffer)); + memset (&rsrv[n_lines], 0, + RESERVOIR_LINES_INCREMENT * sizeof (struct linebuffer)); + } + } + + /* last line wasn't NULL - so there may be more lines to read. */ + if (line != NULL) + { + struct linebuffer dummy; + initbuffer (&dummy); /* space for lines not put in reservoir. */ + + /* Choose the fate of the next line, with decreasing probability (as + n_lines increases in size). + + If the line will be used, store it directly in the reservoir. + Otherwise, store it in dummy space. + + With 'struct linebuffer', storing into existing buffer will reduce + re-allocations (will only re-allocate if the new line is longer than + the currently allocated space). */ + do + { + randint j = randint_choose (s, n_lines + 1); /* 0 .. n_lines. */ + line = (j < k) ? (&rsrv[j]) : (&dummy); + } + while (readlinebuffer_delim (line, in, eolbyte) != NULL && n_lines++); + + if (! n_lines) + error (EXIT_FAILURE, EOVERFLOW, _("too many input lines")); + + freebuffer (&dummy); + } + + /* no more input lines, or an input error. */ + if (ferror (in)) + error (EXIT_FAILURE, errno, _("read error")); + + *out_rsrv = rsrv; + return MIN (k, n_lines); +} + +static int +write_permuted_output_reservoir (size_t n_lines, struct linebuffer *lines, + size_t const *permutation) +{ + size_t i; + + for (i = 0; i < n_lines; i++) + { + const struct linebuffer *p = &lines[permutation[i]]; + if (fwrite (p->buffer, sizeof (char), p->length, stdout) != p->length) + return -1; + } + + return 0; +} + /* Read data from file IN. Input lines are delimited by EOLBYTE; silently append a trailing EOLBYTE if the file ends in some other byte. Store a pointer to the resulting array of lines into *PLINE. @@ -152,6 +273,15 @@ read_input (FILE *in, char eolbyte, char ***pline) size_t i; size_t n_lines; + /* TODO: We should limit the amount of data read here, + to less than RESERVOIR_MIN_INPUT. I.E. adjust fread_file() to support + taking a byte limit. We'd then need to ensure we handle a line spanning + this boundary. With that in place we could set use_reservoir_sampling + when used==RESERVOIR_MIN_INPUT, and have read_input_reservoir_sampling() + call a wrapper function to populate a linebuffer from the internal pline + or if none left, stdin. Doing that would give better performance by + avoiding the reservoir CPU overhead when reading < RESERVOIR_MIN_INPUT + from a pipe, and allow us to dispense with the input_size() function. */ if (!(buf = fread_file (in, &used))) error (EXIT_FAILURE, errno, _("read error")); @@ -174,7 +304,7 @@ read_input (FILE *in, char eolbyte, char ***pline) } static int -write_permuted_output (size_t n_lines, char * const *line, size_t lo_input, +write_permuted_output (size_t n_lines, char *const *line, size_t lo_input, size_t const *permutation, char eolbyte) { size_t i; @@ -182,7 +312,7 @@ write_permuted_output (size_t n_lines, char * const *line, size_t lo_input, if (line) for (i = 0; i < n_lines; i++) { - char * const *p = line + permutation[i]; + char *const *p = line + permutation[i]; size_t len = p[1] - p[0]; if (fwrite (p[0], sizeof *p[0], len, stdout) != len) return -1; @@ -209,14 +339,17 @@ main (int argc, char **argv) char *random_source = NULL; char eolbyte = '\n'; char **input_lines = NULL; + bool use_reservoir_sampling = false; int optc; int n_operands; char **operand; size_t n_lines; - char **line; + char **line = NULL; + struct linebuffer *reservoir = NULL; struct randint_source *randint_source; size_t *permutation; + int i; initialize_main (&argc, &argv); set_program_name (argv[0]); @@ -341,17 +474,35 @@ main (int argc, char **argv) fadvise (stdin, FADVISE_SEQUENTIAL); - n_lines = read_input (stdin, eolbyte, &input_lines); - line = input_lines; + if (head_lines != SIZE_MAX && input_size () > RESERVOIR_MIN_INPUT) + { + use_reservoir_sampling = true; + n_lines = SIZE_MAX; /* unknown number of input lines, for now. */ + } + else + { + n_lines = read_input (stdin, eolbyte, &input_lines); + line = input_lines; + } } head_lines = MIN (head_lines, n_lines); randint_source = randint_all_new (random_source, + use_reservoir_sampling ? SIZE_MAX : randperm_bound (head_lines, n_lines)); if (! randint_source) error (EXIT_FAILURE, errno, "%s", quotearg_colon (random_source)); + if (use_reservoir_sampling) + { + /* Instead of reading the entire file into 'line', + use reservoir-sampling to store just "head_lines" random lines. */ + n_lines = read_input_reservoir_sampling (stdin, eolbyte, head_lines, + randint_source, &reservoir); + head_lines = n_lines; + } + /* Close stdin now, rather than earlier, so that randint_all_new doesn't have to worry about opening something other than stdin. */ @@ -363,8 +514,13 @@ main (int argc, char **argv) if (outfile && ! freopen (outfile, "w", stdout)) error (EXIT_FAILURE, errno, "%s", quotearg_colon (outfile)); - if (write_permuted_output (head_lines, line, lo_input, permutation, eolbyte) - != 0) + + if (use_reservoir_sampling) + i = write_permuted_output_reservoir (n_lines, reservoir, permutation); + else + i = write_permuted_output (head_lines, line, lo_input, + permutation, eolbyte); + if (i != 0) error (EXIT_FAILURE, errno, _("write error")); #ifdef lint @@ -375,6 +531,13 @@ main (int argc, char **argv) free (input_lines[0]); free (input_lines); } + if (reservoir) + { + size_t j; + for (j = 0; j < n_lines; ++j) + freebuffer (&reservoir[j]); + free (reservoir); + } #endif return EXIT_SUCCESS; -- cgit v1.2.3-70-g09d2