summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAssaf Gordon <assafgordon@gmail.com>2013-03-06 18:25:49 -0500
committerPádraig Brady <P@draigBrady.com>2013-03-25 20:07:14 +0000
commit20d7bce0f7e57d9a98f0ee811e31c757e9fedfff (patch)
tree0c3a560e7bbd16762807e0b2fc28d32670549eff /src
parent4c49dc823ff7da589ae58d8d8313d38a75fc8f64 (diff)
downloadcoreutils-20d7bce0f7e57d9a98f0ee811e31c757e9fedfff.tar.xz
shuf: use reservoir-sampling for large or unknown sized inputs
Reservoir sampling optimizes selecting K random lines from large or unknown-sized input: http://en.wikipedia.org/wiki/Reservoir_sampling Note this also avoids reading any input when -n0 is specified. * src/shuf.c (main): Use reservoir-sampling when the number of output lines is known, and the input size is large or unknown. (input_size): A new function to get the input size for regular files. (read_input_reservoir_sampling): New function to read lines from input, keeping only K lines in memory, replacing lines with decreasing prob. (write_permuted_output_reservoir): New function to output reservoir. * tests/misc/shuf-reservoir.sh: An expensive_ test using valgrind to exercise the reservoir-sampling code. * tests/local.mk: Reference new test. * NEWS: Mention the improvement.
Diffstat (limited to 'src')
-rw-r--r--src/shuf.c177
1 files changed, 170 insertions, 7 deletions
diff --git a/src/shuf.c b/src/shuf.c
index 71ac3e60c..bbf3a86c2 100644
--- a/src/shuf.c
+++ b/src/shuf.c
@@ -25,6 +25,7 @@
#include "error.h"
#include "fadvise.h"
#include "getopt.h"
+#include "linebuffer.h"
#include "quote.h"
#include "quotearg.h"
#include "randint.h"
@@ -38,6 +39,18 @@
#define AUTHORS proper_name ("Paul Eggert")
+/* For reservoir-sampling, allocate the reservoir lines in batches. */
+enum { RESERVOIR_LINES_INCREMENT = 1024 };
+
+/* reservoir-sampling introduces CPU overhead for small inputs.
+ So only enable it for inputs >= this limit.
+ This limit was determined using these commands:
+ $ for p in $(seq 7); do src/seq $((10**$p)) > 10p$p.in; done
+ $ for p in $(seq 7); do time shuf-nores -n10 10p$p.in >/dev/null; done
+ $ for p in $(seq 7); do time shuf -n10 10p$p.in >/dev/null; done .*/
+enum { RESERVOIR_MIN_INPUT = 8192 * 1024 };
+
+
void
usage (int status)
{
@@ -135,6 +148,114 @@ next_line (char *line, char eolbyte, size_t n)
return p + 1;
}
+/* Return the size of the input if possible or OFF_T_MAX if not. */
+
+static off_t
+input_size (void)
+{
+ off_t file_size;
+
+ struct stat stat_buf;
+ if (fstat (STDIN_FILENO, &stat_buf) != 0)
+ return OFF_T_MAX;
+ if (usable_st_size (&stat_buf))
+ file_size = stat_buf.st_size;
+ else
+ return OFF_T_MAX;
+
+ off_t input_offset = lseek (STDIN_FILENO, 0, SEEK_CUR);
+ if (input_offset < 0)
+ return OFF_T_MAX;
+
+ file_size -= input_offset;
+
+ return file_size;
+}
+
+/* Read all lines and store up to K permuted lines in *OUT_RSRV.
+ Return the number of lines read, up to a maximum of K. */
+
+static size_t
+read_input_reservoir_sampling (FILE *in, char eolbyte, size_t k,
+ struct randint_source *s,
+ struct linebuffer **out_rsrv)
+{
+ randint n_lines = 0;
+ size_t n_alloc_lines = MIN (k, RESERVOIR_LINES_INCREMENT);
+ struct linebuffer *line = NULL;
+ struct linebuffer *rsrv;
+
+ rsrv = xcalloc (n_alloc_lines, sizeof (struct linebuffer));
+
+ /* Fill the first K lines, directly into the reservoir. */
+ while (n_lines < k
+ && (line =
+ readlinebuffer_delim (&rsrv[n_lines], in, eolbyte)) != NULL)
+ {
+ n_lines++;
+
+ /* Enlarge reservoir. */
+ if (n_lines >= n_alloc_lines)
+ {
+ n_alloc_lines += RESERVOIR_LINES_INCREMENT;
+ rsrv = xnrealloc (rsrv, n_alloc_lines, sizeof (struct linebuffer));
+ memset (&rsrv[n_lines], 0,
+ RESERVOIR_LINES_INCREMENT * sizeof (struct linebuffer));
+ }
+ }
+
+ /* last line wasn't NULL - so there may be more lines to read. */
+ if (line != NULL)
+ {
+ struct linebuffer dummy;
+ initbuffer (&dummy); /* space for lines not put in reservoir. */
+
+ /* Choose the fate of the next line, with decreasing probability (as
+ n_lines increases in size).
+
+ If the line will be used, store it directly in the reservoir.
+ Otherwise, store it in dummy space.
+
+ With 'struct linebuffer', storing into existing buffer will reduce
+ re-allocations (will only re-allocate if the new line is longer than
+ the currently allocated space). */
+ do
+ {
+ randint j = randint_choose (s, n_lines + 1); /* 0 .. n_lines. */
+ line = (j < k) ? (&rsrv[j]) : (&dummy);
+ }
+ while (readlinebuffer_delim (line, in, eolbyte) != NULL && n_lines++);
+
+ if (! n_lines)
+ error (EXIT_FAILURE, EOVERFLOW, _("too many input lines"));
+
+ freebuffer (&dummy);
+ }
+
+ /* no more input lines, or an input error. */
+ if (ferror (in))
+ error (EXIT_FAILURE, errno, _("read error"));
+
+ *out_rsrv = rsrv;
+ return MIN (k, n_lines);
+}
+
+static int
+write_permuted_output_reservoir (size_t n_lines, struct linebuffer *lines,
+ size_t const *permutation)
+{
+ size_t i;
+
+ for (i = 0; i < n_lines; i++)
+ {
+ const struct linebuffer *p = &lines[permutation[i]];
+ if (fwrite (p->buffer, sizeof (char), p->length, stdout) != p->length)
+ return -1;
+ }
+
+ return 0;
+}
+
/* Read data from file IN. Input lines are delimited by EOLBYTE;
silently append a trailing EOLBYTE if the file ends in some other
byte. Store a pointer to the resulting array of lines into *PLINE.
@@ -152,6 +273,15 @@ read_input (FILE *in, char eolbyte, char ***pline)
size_t i;
size_t n_lines;
+ /* TODO: We should limit the amount of data read here,
+ to less than RESERVOIR_MIN_INPUT. I.E. adjust fread_file() to support
+ taking a byte limit. We'd then need to ensure we handle a line spanning
+ this boundary. With that in place we could set use_reservoir_sampling
+ when used==RESERVOIR_MIN_INPUT, and have read_input_reservoir_sampling()
+ call a wrapper function to populate a linebuffer from the internal pline
+ or if none left, stdin. Doing that would give better performance by
+ avoiding the reservoir CPU overhead when reading < RESERVOIR_MIN_INPUT
+ from a pipe, and allow us to dispense with the input_size() function. */
if (!(buf = fread_file (in, &used)))
error (EXIT_FAILURE, errno, _("read error"));
@@ -174,7 +304,7 @@ read_input (FILE *in, char eolbyte, char ***pline)
}
static int
-write_permuted_output (size_t n_lines, char * const *line, size_t lo_input,
+write_permuted_output (size_t n_lines, char *const *line, size_t lo_input,
size_t const *permutation, char eolbyte)
{
size_t i;
@@ -182,7 +312,7 @@ write_permuted_output (size_t n_lines, char * const *line, size_t lo_input,
if (line)
for (i = 0; i < n_lines; i++)
{
- char * const *p = line + permutation[i];
+ char *const *p = line + permutation[i];
size_t len = p[1] - p[0];
if (fwrite (p[0], sizeof *p[0], len, stdout) != len)
return -1;
@@ -209,14 +339,17 @@ main (int argc, char **argv)
char *random_source = NULL;
char eolbyte = '\n';
char **input_lines = NULL;
+ bool use_reservoir_sampling = false;
int optc;
int n_operands;
char **operand;
size_t n_lines;
- char **line;
+ char **line = NULL;
+ struct linebuffer *reservoir = NULL;
struct randint_source *randint_source;
size_t *permutation;
+ int i;
initialize_main (&argc, &argv);
set_program_name (argv[0]);
@@ -341,17 +474,35 @@ main (int argc, char **argv)
fadvise (stdin, FADVISE_SEQUENTIAL);
- n_lines = read_input (stdin, eolbyte, &input_lines);
- line = input_lines;
+ if (head_lines != SIZE_MAX && input_size () > RESERVOIR_MIN_INPUT)
+ {
+ use_reservoir_sampling = true;
+ n_lines = SIZE_MAX; /* unknown number of input lines, for now. */
+ }
+ else
+ {
+ n_lines = read_input (stdin, eolbyte, &input_lines);
+ line = input_lines;
+ }
}
head_lines = MIN (head_lines, n_lines);
randint_source = randint_all_new (random_source,
+ use_reservoir_sampling ? SIZE_MAX :
randperm_bound (head_lines, n_lines));
if (! randint_source)
error (EXIT_FAILURE, errno, "%s", quotearg_colon (random_source));
+ if (use_reservoir_sampling)
+ {
+ /* Instead of reading the entire file into 'line',
+ use reservoir-sampling to store just "head_lines" random lines. */
+ n_lines = read_input_reservoir_sampling (stdin, eolbyte, head_lines,
+ randint_source, &reservoir);
+ head_lines = n_lines;
+ }
+
/* Close stdin now, rather than earlier, so that randint_all_new
doesn't have to worry about opening something other than
stdin. */
@@ -363,8 +514,13 @@ main (int argc, char **argv)
if (outfile && ! freopen (outfile, "w", stdout))
error (EXIT_FAILURE, errno, "%s", quotearg_colon (outfile));
- if (write_permuted_output (head_lines, line, lo_input, permutation, eolbyte)
- != 0)
+
+ if (use_reservoir_sampling)
+ i = write_permuted_output_reservoir (n_lines, reservoir, permutation);
+ else
+ i = write_permuted_output (head_lines, line, lo_input,
+ permutation, eolbyte);
+ if (i != 0)
error (EXIT_FAILURE, errno, _("write error"));
#ifdef lint
@@ -375,6 +531,13 @@ main (int argc, char **argv)
free (input_lines[0]);
free (input_lines);
}
+ if (reservoir)
+ {
+ size_t j;
+ for (j = 0; j < n_lines; ++j)
+ freebuffer (&reservoir[j]);
+ free (reservoir);
+ }
#endif
return EXIT_SUCCESS;