summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--NEWS5
-rw-r--r--bootstrap.conf2
-rw-r--r--doc/coreutils.texi15
-rw-r--r--gl/lib/heap.c159
-rw-r--r--gl/lib/heap.h34
-rw-r--r--gl/modules/heap24
-rw-r--r--src/Makefile.am3
-rw-r--r--src/sort.c567
-rw-r--r--tests/Makefile.am1
-rwxr-xr-xtests/misc/sort-benchmark-random52
10 files changed, 805 insertions, 57 deletions
diff --git a/NEWS b/NEWS
index 82190d9ce..6ed23fb00 100644
--- a/NEWS
+++ b/NEWS
@@ -30,6 +30,11 @@ GNU coreutils NEWS -*- outline -*-
sort -g now uses long doubles for greater range and precision.
+ sort now uses the number of available processors to parallelize
+ the sorting operation. The number of sorts run concurrently can be
+ limited with the --parallel option or with external process
+ control like taskset for example.
+
stat no longer accepts the --context (-Z) option. Initially it was
merely accepted and ignored, for compatibility. Starting two years
ago, with coreutils-7.0, its use evoked a warning.
diff --git a/bootstrap.conf b/bootstrap.conf
index 644c18b45..fe3974ae1 100644
--- a/bootstrap.conf
+++ b/bootstrap.conf
@@ -124,6 +124,7 @@ gnulib_modules="
hard-locale
hash
hash-pjw
+ heap
host-os
human
idcache
@@ -173,6 +174,7 @@ gnulib_modules="
priv-set
progname
propername
+ pthread
putenv
quote
quotearg
diff --git a/doc/coreutils.texi b/doc/coreutils.texi
index 21cf36d1d..942978f33 100644
--- a/doc/coreutils.texi
+++ b/doc/coreutils.texi
@@ -4068,6 +4068,14 @@ have a large sort or merge that is I/O-bound, you can often improve
performance by using this option to specify directories on different
disks and controllers.
+@item --parallel=@var{n}
+@opindex --parallel
+@cindex multithreaded sort
+Limit the number of sorts run in parallel to @var{n}. By default,
+@var{n} is set to the number of available processors, and values
+greater than that are reduced to that limit. Also see
+@ref{nproc invocation}.
+
@item -u
@itemx --unique
@opindex -u
@@ -4163,6 +4171,13 @@ sort -n -r
@end example
@item
+Run no more that 4 sorts concurrently, using a buffer size of 10M.
+
+@example
+sort --parallel=4 -S 10M
+@end example
+
+@item
Sort alphabetically, omitting the first and second fields
and the blanks at the start of the third field.
This uses a single key composed of the characters beginning
diff --git a/gl/lib/heap.c b/gl/lib/heap.c
new file mode 100644
index 000000000..a37224fa0
--- /dev/null
+++ b/gl/lib/heap.c
@@ -0,0 +1,159 @@
+/* Barebones heap implementation supporting only insert and pop.
+
+ Copyright (C) 2010 Free Software Foundation, Inc.
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>. */
+
+/* Full implementation: GDSL (http://gna.org/projects/gdsl/) by Nicolas
+ Darnis <ndarnis@free.fr>. */
+
+#include <config.h>
+
+#include "heap.h"
+#include "stdlib--.h"
+#include "xalloc.h"
+
+static int heap_default_compare (const void *, const void *);
+static size_t heapify_down (void **, size_t, size_t,
+ int (*)(const void *, const void *));
+static void heapify_up (void **, size_t,
+ int (*)(const void *, const void *));
+
+
+/* Allocate memory for the heap. */
+
+struct heap *
+heap_alloc (int (*compare)(const void *, const void *), size_t n_reserve)
+{
+ struct heap *heap;
+ void *xmalloc_ret = xmalloc (sizeof *heap);
+ heap = (struct heap *) xmalloc_ret;
+ if (!heap)
+ return NULL;
+
+ if (n_reserve <= 0)
+ n_reserve = 1;
+
+ xmalloc_ret = xmalloc (n_reserve * sizeof *(heap->array));
+ heap->array = (void **) xmalloc_ret;
+ if (!heap->array)
+ {
+ free (heap);
+ return NULL;
+ }
+
+ heap->array[0] = NULL;
+ heap->capacity = n_reserve;
+ heap->count = 0;
+ heap->compare = compare ? compare : heap_default_compare;
+
+ return heap;
+}
+
+
+static int
+heap_default_compare (const void *a, const void *b)
+{
+ return 0;
+}
+
+
+void
+heap_free (struct heap *heap)
+{
+ free (heap->array);
+ free (heap);
+}
+
+/* Insert element into heap. */
+
+int
+heap_insert (struct heap *heap, void *item)
+{
+ if (heap->capacity - 1 <= heap->count)
+ {
+ size_t new_size = (2 + heap->count) * sizeof *(heap->array);
+ void *realloc_ret = xrealloc (heap->array, new_size);
+ heap->array = (void **) realloc_ret;
+ heap->capacity = (2 + heap->count);
+
+ if (!heap->array)
+ return -1;
+ }
+
+ heap->array[++heap->count] = item;
+ heapify_up (heap->array, heap->count, heap->compare);
+
+ return 0;
+}
+
+/* Pop top element off heap. */
+
+void *
+heap_remove_top (struct heap *heap)
+{
+ if (heap->count == 0)
+ return NULL;
+
+ void *top = heap->array[1];
+ heap->array[1] = heap->array[heap->count--];
+ heapify_down (heap->array, heap->count, 1, heap->compare);
+
+ return top;
+}
+
+/* Move element down into appropriate position in heap. */
+
+static size_t
+heapify_down (void **array, size_t count, size_t initial,
+ int (*compare)(const void *, const void *))
+{
+ void *element = array[initial];
+
+ size_t parent = initial;
+ while (parent <= count / 2)
+ {
+ size_t child = 2 * parent;
+
+ if (child < count && compare (array[child], array[child+1]) < 0)
+ child++;
+
+ if (compare (array[child], element) <= 0)
+ break;
+
+ array[parent] = array[child];
+ parent = child;
+ }
+
+ array[parent] = element;
+ return parent;
+}
+
+/* Move element up into appropriate position in heap. */
+
+static void
+heapify_up (void **array, size_t count,
+ int (*compare)(const void *, const void *))
+{
+ size_t k = count;
+ void *new_element = array[k];
+
+ while (k != 1 && compare (array[k/2], new_element) <= 0)
+ {
+ array[k] = array[k/2];
+ k /= 2;
+ }
+
+ array[k] = new_element;
+}
diff --git a/gl/lib/heap.h b/gl/lib/heap.h
new file mode 100644
index 000000000..0ea516a79
--- /dev/null
+++ b/gl/lib/heap.h
@@ -0,0 +1,34 @@
+/* Barebones heap implementation supporting only insert and pop.
+
+ Copyright (C) 2010 Free Software Foundation, Inc.
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>. */
+
+/* Full implementation: GDSL (http://gna.org/projects/gdsl/) by Nicolas
+ Darnis <ndarnis@free.fr>. Adapted by Gene Auyeung. */
+
+#include <stddef.h>
+
+struct heap
+{
+ void **array; /* array[0] is not used */
+ size_t capacity; /* Array size */
+ size_t count; /* Used as index to last element. Also is num of items. */
+ int (*compare)(const void *, const void *);
+};
+
+struct heap *heap_alloc (int (*)(const void *, const void *), size_t);
+void heap_free (struct heap *);
+int heap_insert (struct heap *heap, void *item);
+void *heap_remove_top (struct heap *heap);
diff --git a/gl/modules/heap b/gl/modules/heap
new file mode 100644
index 000000000..cd97e2965
--- /dev/null
+++ b/gl/modules/heap
@@ -0,0 +1,24 @@
+Description:
+Binary heap with minimal number of methods. Used in sort.
+
+Files:
+lib/heap.c
+lib/heap.h
+
+Depends-on:
+stdlib-safer
+xalloc
+
+configure.ac:
+
+Makefile.am:
+lib_SOURCES += heap.c heap.h
+
+Include:
+"heap.h"
+
+License
+GPL
+
+Maintainer:
+Gene Auyeung
diff --git a/src/Makefile.am b/src/Makefile.am
index 0630a069d..2e0e32088 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -397,6 +397,9 @@ uname_LDADD += $(GETHOSTNAME_LIB)
# for strsignal
kill_LDADD += $(LIBTHREAD)
+# for pthread
+sort_LDADD += $(LIB_PTHREAD)
+
$(PROGRAMS): ../lib/libcoreutils.a
# Get the release year from ../lib/version-etc.c.
diff --git a/src/sort.c b/src/sort.c
index ff8a97a99..5ea1b3476 100644
--- a/src/sort.c
+++ b/src/sort.c
@@ -23,6 +23,7 @@
#include <config.h>
#include <getopt.h>
+#include <pthread.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <signal.h>
@@ -32,9 +33,11 @@
#include "filevercmp.h"
#include "hard-locale.h"
#include "hash.h"
+#include "heap.h"
#include "ignore-value.h"
#include "md5.h"
#include "mbswidth.h"
+#include "nproc.h"
#include "physmem.h"
#include "posixver.h"
#include "quote.h"
@@ -93,6 +96,18 @@ struct rlimit { size_t rlim_cur; };
# define DEFAULT_TMPDIR "/tmp"
#endif
+/* Maximum number of lines to merge every time a NODE is taken from
+ the MERGE_QUEUE. Node is at LEVEL in the binary merge tree,
+ and is responsible for merging TOTAL lines. */
+#define MAX_MERGE(total, level) ((total) / ((2 << level) * (2 << level)) + 1)
+
+/* Heuristic value for the number of lines for which it is worth
+ creating a subthread, during an internal merge sort, on a machine
+ that has processors galore. Currently this number is just a guess.
+ This value must be at least 4. We don't know of any machine where
+ this number has any practical effect. */
+enum { SUBTHREAD_LINES_HEURISTIC = 4 };
+
/* Exit statuses. */
enum
{
@@ -119,6 +134,15 @@ enum
MAX_FORK_TRIES_DECOMPRESS = 9
};
+enum
+ {
+ /* Level of the end-of-merge node, one level above the root. */
+ MERGE_END = 0,
+
+ /* Level of the root node in merge tree. */
+ MERGE_ROOT = 1
+ };
+
/* The representation of the decimal point in the current locale. */
static int decimal_point;
@@ -196,6 +220,31 @@ struct month
int val;
};
+/* Binary merge tree node. */
+struct merge_node
+{
+ struct line *lo; /* Lines to merge from LO child node. */
+ struct line *hi; /* Lines to merge from HI child ndoe. */
+ struct line *end_lo; /* End of available lines from LO. */
+ struct line *end_hi; /* End of available lines from HI. */
+ struct line **dest; /* Pointer to destination of merge. */
+ size_t nlo; /* Total Lines remaining from LO. */
+ size_t nhi; /* Total lines remaining from HI. */
+ size_t level; /* Level in merge tree. */
+ struct merge_node *parent; /* Parent node. */
+ bool queued; /* Node is already in heap. */
+ pthread_spinlock_t *lock; /* Lock for node operations. */
+};
+
+/* Priority queue of merge nodes. */
+struct merge_node_queue
+{
+ struct heap *priority_queue; /* Priority queue of merge tree nodes. */
+ pthread_mutex_t mutex; /* Lock for queue operations. */
+ pthread_cond_t cond; /* Conditional wait for empty queue to populate
+ when popping. */
+};
+
/* FIXME: None of these tables work with multibyte character sets.
Also, there are many other bugs when handling multibyte characters.
One way to fix this is to rewrite `sort' to use wide characters
@@ -300,8 +349,6 @@ static bool debug;
number are present, temp files will be used. */
static unsigned int nmerge = NMERGE_DEFAULT;
-static void sortlines_temp (struct line *, size_t, struct line *);
-
/* Report MESSAGE for FILE, then clean up and exit.
If FILE is null, it represents standard output. */
@@ -398,6 +445,7 @@ Other options:\n\
-t, --field-separator=SEP use SEP instead of non-blank to blank transition\n\
-T, --temporary-directory=DIR use DIR for temporaries, not $TMPDIR or %s;\n\
multiple options specify multiple directories\n\
+ --parallel=N limit the number of sorts run concurrently to N\n\
-u, --unique with -c, check for strict ordering;\n\
without -c, output only the first of an equal run\n\
"), DEFAULT_TMPDIR);
@@ -442,7 +490,8 @@ enum
FILES0_FROM_OPTION,
NMERGE_OPTION,
RANDOM_SOURCE_OPTION,
- SORT_OPTION
+ SORT_OPTION,
+ PARALLEL_OPTION
};
static char const short_options[] = "-bcCdfghik:mMno:rRsS:t:T:uVy:z";
@@ -476,6 +525,7 @@ static struct option const long_options[] =
{"temporary-directory", required_argument, NULL, 'T'},
{"unique", no_argument, NULL, 'u'},
{"zero-terminated", no_argument, NULL, 'z'},
+ {"parallel", required_argument, NULL, PARALLEL_OPTION},
{GETOPT_HELP_OPTION_DECL},
{GETOPT_VERSION_OPTION_DECL},
{NULL, 0, NULL, 0},
@@ -1333,6 +1383,22 @@ specify_sort_size (int oi, char c, char const *s)
xstrtol_fatal (e, oi, c, long_options, s);
}
+/* Specify the number of threads to spawn during internal sort. */
+static unsigned long int
+specify_nthreads (int oi, char c, char const *s)
+{
+ unsigned long int nthreads;
+ enum strtol_error e = xstrtoul (s, NULL, 10, &nthreads, "");
+ if (e == LONGINT_OVERFLOW)
+ return ULONG_MAX;
+ if (e != LONGINT_OK)
+ xstrtol_fatal (e, oi, c, long_options, s);
+ if (nthreads == 0)
+ error (SORT_FAILURE, 0, _("number in parallel must be nonzero"));
+ return nthreads;
+}
+
+
/* Return the default sort size. */
static size_t
default_sort_size (void)
@@ -2951,25 +3017,28 @@ mergefiles (struct sortfile *files, size_t ntemps, size_t nfiles,
return nopened;
}
-/* Merge into T the two sorted arrays of lines LO (with NLO members)
- and HI (with NHI members). T, LO, and HI point just past their
- respective arrays, and the arrays are in reverse order. NLO and
- NHI must be positive, and HI - NHI must equal T - (NLO + NHI). */
+/* Merge into T (of size NLINES) the two sorted arrays of lines
+ LO (with NLINES / 2 members), and
+ T - (NLINES / 2) (with NLINES - NLINES / 2 members).
+ T and LO point just past their respective arrays, and the arrays
+ are in reverse order. NLINES must be at least 2. */
static inline void
-mergelines (struct line *t,
- struct line const *lo, size_t nlo,
- struct line const *hi, size_t nhi)
+mergelines (struct line *restrict t, size_t nlines,
+ struct line const *restrict lo)
{
+ size_t nlo = nlines / 2;
+ size_t nhi = nlines - nlo;
+ struct line *hi = t - nlo;
+
while (true)
if (compare (lo - 1, hi - 1, false) <= 0)
{
*--t = *--lo;
if (! --nlo)
{
- /* HI - NHI equalled T - (NLO + NHI) when this function
- began. Therefore HI must equal T now, and there is no
- need to copy from HI to T. */
+ /* HI must equal T now, and there is no need to copy from
+ HI to T. */
return;
}
}
@@ -3000,15 +3069,25 @@ mergelines (struct line *t,
D. A. Bell, Comp J. 1 (1958), 75. */
static void
-sortlines (struct line *lines, size_t nlines, struct line *temp)
+sequential_sort (struct line *restrict lines, size_t nlines,
+ struct line *restrict temp, bool to_temp)
{
if (nlines == 2)
{
- if (0 < compare (&lines[-1], &lines[-2], false))
+ /* Declare `swap' as int, not bool, to work around a bug
+ <http://lists.gnu.org/archive/html/bug-coreutils/2005-10/msg00086.html>
+ in the IBM xlc 6.0.0.0 compiler in 64-bit mode. */
+ int swap = (0 < compare (&lines[-1], &lines[-2], false));
+ if (to_temp)
{
- struct line tmp = lines[-1];
+ temp[-1] = lines[-1 - swap];
+ temp[-2] = lines[-2 + swap];
+ }
+ else if (swap)
+ {
+ temp[-1] = lines[-1];
lines[-1] = lines[-2];
- lines[-2] = tmp;
+ lines[-2] = temp[-1];
}
}
else
@@ -3017,46 +3096,386 @@ sortlines (struct line *lines, size_t nlines, struct line *temp)
size_t nhi = nlines - nlo;
struct line *lo = lines;
struct line *hi = lines - nlo;
- struct line *sorted_lo = temp;
- sortlines (hi, nhi, temp);
+ sequential_sort (hi, nhi, temp - (to_temp ? nlo : 0), to_temp);
if (1 < nlo)
- sortlines_temp (lo, nlo, sorted_lo);
+ sequential_sort (lo, nlo, temp, !to_temp);
+ else if (!to_temp)
+ temp[-1] = lo[-1];
+
+ struct line *dest;
+ struct line const *sorted_lo;
+ if (to_temp)
+ {
+ dest = temp;
+ sorted_lo = lines;
+ }
else
- sorted_lo[-1] = lo[-1];
+ {
+ dest = lines;
+ sorted_lo = temp;
+ }
+ mergelines (dest, nlines, sorted_lo);
+ }
+}
+
+/* Compare two NODEs for priority. The NODE with the higher (numerically
+ lower) level has priority. If tie, the NODE with the most remaining
+ lines has priority. */
+
+static int
+compare_nodes (const void *a, const void *b)
+{
+ const struct merge_node *nodea = (const struct merge_node *) a;
+ const struct merge_node *nodeb = (const struct merge_node *) b;
+ if (nodea->level == nodeb->level)
+ return (nodea->nlo + nodea->nhi) < (nodeb->nlo + nodeb->nhi);
+ return nodea->level < nodeb->level;
+}
+
+/* Lock a merge tree NODE.
+ Note spin locks were seen to perform better than mutexes
+ as long as the number of threads is limited to the
+ number of processors. */
- mergelines (lines, sorted_lo, nlo, hi, nhi);
+static inline void
+lock_node (struct merge_node *const restrict node)
+{
+ pthread_spin_lock (node->lock);
+}
+
+/* Unlock a merge tree NODE. */
+
+static inline void
+unlock_node (struct merge_node *const restrict node)
+{
+ pthread_spin_unlock (node->lock);
+}
+
+/* Destroy merge QUEUE. */
+
+static inline void
+queue_destroy (struct merge_node_queue *const restrict queue)
+{
+ heap_free (queue->priority_queue);
+ pthread_cond_destroy (&queue->cond);
+ pthread_mutex_destroy (&queue->mutex);
+}
+
+/* Initialize merge QUEUE, allocating space for a maximum of RESERVE nodes.
+ Though it's highly unlikely all nodes are in the heap at the same time,
+ RESERVE should accommodate all of them. Counting a NULL dummy head for the
+ heap, RESERVE should be 2 * NTHREADS. */
+
+static inline void
+queue_init (struct merge_node_queue *const restrict queue, size_t reserve)
+{
+ queue->priority_queue = (struct heap *) heap_alloc (compare_nodes, reserve);
+ pthread_mutex_init (&queue->mutex, NULL);
+ pthread_cond_init (&queue->cond, NULL);
+}
+
+/* Insert NODE into priority QUEUE. Assume caller either holds lock on NODE
+ or does not need to lock NODE. */
+
+static inline void
+queue_insert (struct merge_node_queue *const restrict queue,
+ struct merge_node *const restrict node)
+{
+ pthread_mutex_lock (&queue->mutex);
+ heap_insert (queue->priority_queue, node);
+ node->queued = true;
+ pthread_mutex_unlock (&queue->mutex);
+ pthread_cond_signal (&queue->cond);
+}
+
+/* Pop NODE off priority QUEUE. Guarantee a non-null, spinlocked NODE. */
+
+static inline struct merge_node *
+queue_pop (struct merge_node_queue *const restrict queue)
+{
+ struct merge_node *node = NULL;
+
+ while (!node)
+ {
+ pthread_mutex_lock (&queue->mutex);
+ if (queue->priority_queue->count)
+ node = (struct merge_node *) heap_remove_top (queue->priority_queue);
+ else
+ {
+ /* Go into conditional wait if no NODE is immediately available. */
+ pthread_cond_wait (&queue->cond, &queue->mutex);
+ }
+ pthread_mutex_unlock (&queue->mutex);
}
+ lock_node (node);
+ node->queued = false;
+ return node;
}
-/* Like sortlines (LINES, NLINES, TEMP), except output into TEMP
- rather than sorting in place. */
+/* If UNQIUE is set, checks to make sure line isn't a duplicate before
+ outputting. If UNIQUE is not set, output the passed in line. Note that
+ this function does not actually save the line, nor any key information,
+ thus is only appropriate for internal sort. */
-static void
-sortlines_temp (struct line *lines, size_t nlines, struct line *temp)
+static inline void
+write_unique (struct line *const restrict line, FILE *tfp,
+ const char *temp_output)
{
- if (nlines == 2)
+ static struct line *saved = NULL;
+
+ if (!unique)
+ write_bytes (line, tfp, temp_output);
+ else if (!saved || compare (line, saved, false))
{
- /* Declare `swap' as int, not bool, to work around a bug
- <http://lists.gnu.org/archive/html/bug-coreutils/2005-10/msg00086.html>
- in the IBM xlc 6.0.0.0 compiler in 64-bit mode. */
- int swap = (0 < compare (&lines[-1], &lines[-2], false));
- temp[-1] = lines[-1 - swap];
- temp[-2] = lines[-2 + swap];
+ saved = line;
+ write_bytes (line, tfp, temp_output);
+ }
+}
+
+/* Merge the lines currently available to a NODE in the binary
+ merge tree, up to a maximum specified by MAX_MERGE. */
+
+static inline size_t
+mergelines_node (struct merge_node *const restrict node, size_t total_lines,
+ FILE *tfp, const char *temp_output)
+{
+ struct line *lo_orig = node->lo;
+ struct line *hi_orig = node->hi;
+ size_t to_merge = MAX_MERGE (total_lines, node->level);
+ size_t merged_lo;
+ size_t merged_hi;
+
+ if (node->level > MERGE_ROOT)
+ {
+ /* Merge to destination buffer. */
+ struct line *dest = *node->dest;
+ while (node->lo != node->end_lo && node->hi != node->end_hi && to_merge--)
+ if (compare (node->lo - 1, node->hi - 1, false) <= 0)
+ *--dest = *--node->lo;
+ else
+ *--dest = *--node->hi;
+
+ merged_lo = lo_orig - node->lo;
+ merged_hi = hi_orig - node->hi;
+
+ if (node->nhi == merged_hi)
+ while (node->lo != node->end_lo && to_merge--)
+ *--dest = *--node->lo;
+ else if (node->nlo == merged_lo)
+ while (node->hi != node->end_hi && to_merge--)
+ *--dest = *--node->hi;
}
else
{
- size_t nlo = nlines / 2;
- size_t nhi = nlines - nlo;
- struct line *lo = lines;
- struct line *hi = lines - nlo;
- struct line *sorted_hi = temp - nlo;
+ /* Merge directly to output. */
+ while (node->lo != node->end_lo && node->hi != node->end_hi && to_merge--)
+ {
+ if (compare (node->lo - 1, node->hi - 1, false) <= 0)
+ write_unique (--node->lo, tfp, temp_output);
+ else
+ write_unique (--node->hi, tfp, temp_output);
+ }
+
+ merged_lo = lo_orig - node->lo;
+ merged_hi = hi_orig - node->hi;
+
+ if (node->nhi == merged_hi)
+ {
+ while (node->lo != node->end_lo && to_merge--)
+ write_unique (--node->lo, tfp, temp_output);
+ }
+ else if (node->nlo == merged_lo)
+ {
+ while (node->hi != node->end_hi && to_merge--)
+ write_unique (--node->hi, tfp, temp_output);
+ }
+ node->dest -= lo_orig - node->lo + hi_orig - node->hi;
+ }
+
+ /* Update NODE. */
+ merged_lo = lo_orig - node->lo;
+ merged_hi = hi_orig - node->hi;
+ node->nlo -= merged_lo;
+ node->nhi -= merged_hi;
+ return merged_lo + merged_hi;
+}
+
+/* Insert NODE into QUEUE if it passes insertion checks. */
+
+static inline void
+check_insert (struct merge_node *node,
+ struct merge_node_queue *const restrict queue)
+{
+ size_t lo_avail = node->lo - node->end_lo;
+ size_t hi_avail = node->hi - node->end_hi;
+
+ /* Conditions for insertion:
+ 1. NODE is not already in heap.
+ 2. NODE has available lines from both it's children, OR one child has
+ available lines, but the other has exhausted all its lines. */
+ if ((!node->queued)
+ && ((lo_avail && (hi_avail || !(node->nhi)))
+ || (hi_avail && !(node->nlo))))
+ {
+ queue_insert (queue, node);
+ }
+}
+
+/* Update parent merge tree NODE. */
+
+static inline void
+update_parent (struct merge_node *const restrict node, size_t merged,
+ struct merge_node_queue *const restrict queue)
+{
+ if (node->level > MERGE_ROOT)
+ {
+ lock_node (node->parent);
+ *node->dest -= merged;
+ check_insert (node->parent, queue);
+ unlock_node (node->parent);
+ }
+ else if (node->nlo + node->nhi == 0)
+ {
+ /* If the MERGE_ROOT NODE has finished merging, insert the
+ MERGE_END node. */
+ queue_insert (queue, node->parent);
+ }
+}
+
+/* Repeatedly pop QUEUE for a NODE with lines to merge, and merge at least
+ some of those lines, until the MERGE_END node is popped. */
+
+static void
+merge_loop (struct merge_node_queue *const restrict queue,
+ const size_t total_lines, FILE *tfp, const char *temp_output)
+{
+ while (1)
+ {
+ struct merge_node *node = queue_pop (queue);
+
+ if (node->level == MERGE_END)
+ {
+ unlock_node (node);
+ /* Reinsert so other threads can pop it. */
+ queue_insert (queue, node);
+ break;
+ }
+ size_t merged_lines = mergelines_node (node, total_lines, tfp,
+ temp_output);
+ check_insert (node, queue);
+ update_parent (node, merged_lines, queue);
+
+ unlock_node (node);
+ }
+}
+
+
+static void sortlines (struct line *restrict, struct line *restrict,
+ unsigned long int, const size_t,
+ struct merge_node *const restrict, bool,
+ struct merge_node_queue *const restrict,
+ FILE *, const char *);
+
+/* Thread arguments for sortlines_thread. */
+
+struct thread_args
+{
+ struct line *lines;
+ struct line *dest;
+ unsigned long int nthreads;
+ const size_t total_lines;
+ struct merge_node *const restrict parent;
+ bool lo_child;
+ struct merge_node_queue *const restrict merge_queue;
+ FILE *tfp;
+ const char *output_temp;
+};
+
+/* Like sortlines, except with a signature acceptable to pthread_create. */
- sortlines_temp (hi, nhi, sorted_hi);
+static void *
+sortlines_thread (void *data)
+{
+ struct thread_args const *args = data;
+ sortlines (args->lines, args->dest, args->nthreads, args->total_lines,
+ args->parent, args->lo_child, args->merge_queue,
+ args->tfp, args->output_temp);
+ return NULL;
+}
+
+/* There are three phases to the algorithm: node creation, sequential sort,
+ and binary merge.
+
+ During node creation, sortlines recursively visits each node in the
+ binary merge tree and creates a NODE structure corresponding to all the
+ future line merging NODE is responsible for. For each call to
+ sortlines, half the available threads are assigned to each recursive
+ call, until a leaf node having only 1 available thread is reached.
+
+ Each leaf node then performs two sequential sorts, one on each half of
+ the lines it is responsible for. It records in its NODE structure that
+ there are two sorted sublists available to merge from, and inserts its
+ NODE into the priority queue.
+
+ The binary merge phase then begins. Each thread drops into a loop
+ where the thread retrieves a NODE from the priority queue, merges lines
+ available to that NODE, and potentially insert NODE or its parent back
+ into the queue if there are sufficient available lines for them to
+ merge. This continues until all lines at all nodes of the merge tree
+ have been merged. */
+
+static void
+sortlines (struct line *restrict lines, struct line *restrict dest,
+ unsigned long int nthreads, const size_t total_lines,
+ struct merge_node *const restrict parent, bool lo_child,
+ struct merge_node_queue *const restrict merge_queue,
+ FILE *tfp, const char *temp_output)
+{
+ /* Create merge tree NODE. */
+ size_t nlines = (lo_child)? parent->nlo : parent->nhi;
+ size_t nlo = nlines / 2;
+ size_t nhi = nlines - nlo;
+ struct line *lo = dest - total_lines;
+ struct line *hi = lo - nlo;
+ struct line **parent_end = (lo_child)? &parent->end_lo : &parent->end_hi;
+ pthread_spinlock_t lock;
+ pthread_spin_init (&lock, PTHREAD_PROCESS_PRIVATE);
+ struct merge_node node = {lo, hi, lo, hi, parent_end, nlo, nhi,
+ parent->level + 1, parent, false, &lock};
+
+ /* Calculate thread arguments. */
+ unsigned long int lo_threads = nthreads / 2;
+ unsigned long int hi_threads = nthreads - lo_threads;
+ pthread_t thread;
+ struct thread_args args = {lines, lo, lo_threads, total_lines, &node,
+ true, merge_queue, tfp, temp_output};
+
+ if (nthreads > 1 && SUBTHREAD_LINES_HEURISTIC <= nlines
+ && pthread_create (&thread, NULL, sortlines_thread, &args) == 0)
+ {
+ sortlines (lines - nlo, hi, hi_threads, total_lines, &node, false,
+ merge_queue, tfp, temp_output);
+ pthread_join (thread, NULL);
+ }
+ else
+ {
+ /* Nthreads = 1, this is a leaf NODE, or pthread_create failed.
+ Sort with 1 thread. */
+ struct line *temp = lines - total_lines;
+ if (1 < nhi)
+ sequential_sort (lines - nlo, nhi, temp - nlo / 2, false);
if (1 < nlo)
- sortlines (lo, nlo, temp);
+ sequential_sort (lines, nlo, temp, false);
- mergelines (temp, lo, nlo, sorted_hi, nhi);
+ /* Update merge NODE. No need to lock yet. */
+ node.lo = lines;
+ node.hi = lines - nlo;
+ node.end_lo = lines - nlo;
+ node.end_hi = lines - nlo - nhi;
+
+ queue_insert (merge_queue, &node);
+ merge_loop (merge_queue, total_lines, tfp, temp_output);
}
}
@@ -3262,7 +3681,8 @@ merge (struct sortfile *files, size_t ntemps, size_t nfiles,
/* Sort NFILES FILES onto OUTPUT_FILE. */
static void
-sort (char * const *files, size_t nfiles, char const *output_file)
+sort (char * const *files, size_t nfiles, char const *output_file,
+ unsigned long int nthreads)
{
struct buffer buf;
size_t ntemps = 0;
@@ -3276,8 +3696,22 @@ sort (char * const *files, size_t nfiles, char const *output_file)
char const *file = *files;
FILE *fp = xfopen (file, "r");
FILE *tfp;
- size_t bytes_per_line = (2 * sizeof (struct line)
- - sizeof (struct line) / 2);
+
+ size_t bytes_per_line;
+ if (nthreads > 1)
+ {
+ /* Get log P. */
+ unsigned long int tmp = 1;
+ size_t mult = 1;
+ while (tmp < nthreads)
+ {
+ tmp *= 2;
+ mult++;
+ }
+ bytes_per_line = (mult * sizeof (struct line));
+ }
+ else
+ bytes_per_line = sizeof (struct line) * 3 / 2;
if (! buf.alloc)
initbuf (&buf, bytes_per_line,
@@ -3289,7 +3723,6 @@ sort (char * const *files, size_t nfiles, char const *output_file)
while (fillbuf (&buf, fp, file))
{
struct line *line;
- struct line *linebase;
if (buf.eof && nfiles
&& (bytes_per_line + 1
@@ -3303,9 +3736,6 @@ sort (char * const *files, size_t nfiles, char const *output_file)
}
line = buffer_linelim (&buf);
- linebase = line - buf.nlines;
- if (1 < buf.nlines)
- sortlines (line, buf.nlines, linebase);
if (buf.eof && !nfiles && !ntemps && !buf.left)
{
xfclose (fp, file);
@@ -3318,16 +3748,23 @@ sort (char * const *files, size_t nfiles, char const *output_file)
++ntemps;
temp_output = create_temp (&tfp, NULL);
}
-
- do
+ if (1 < buf.nlines)
{
- line--;
- write_bytes (line, tfp, temp_output);
- if (unique)
- while (linebase < line && compare (line, line - 1, false) == 0)
- line--;
+ struct merge_node_queue merge_queue;
+ queue_init (&merge_queue, 2 * nthreads);
+
+ pthread_spinlock_t lock;
+ pthread_spin_init (&lock, PTHREAD_PROCESS_PRIVATE);
+ struct merge_node node =
+ {NULL, NULL, NULL, NULL, NULL, buf.nlines,
+ buf.nlines, MERGE_END, NULL, false, &lock};
+
+ sortlines (line, line, nthreads, buf.nlines, &node, true,
+ &merge_queue, tfp, temp_output);
+ queue_destroy (&merge_queue);
}
- while (linebase < line);
+ else
+ write_unique (line - 1, tfp, temp_output);
xfclose (tfp, temp_output);
@@ -3547,6 +3984,7 @@ main (int argc, char **argv)
bool mergeonly = false;
char *random_source = NULL;
bool need_random = false;
+ unsigned long int nthreads = 0;
size_t nfiles = 0;
bool posixly_correct = (getenv ("POSIXLY_CORRECT") != NULL);
bool obsolete_usage = (posix2_version () < 200112);
@@ -3882,6 +4320,10 @@ main (int argc, char **argv)
add_temp_dir (optarg);
break;
+ case PARALLEL_OPTION:
+ nthreads = specify_nthreads (oi, c, optarg);
+ break;
+
case 'u':
unique = true;
break;
@@ -4030,6 +4472,9 @@ main (int argc, char **argv)
if (need_random)
{
+ /* Threading does not lock the randread_source structure, so
+ downgrade to one thread to avoid race conditions. */
+ nthreads = 1;
randread_source = randread_new (random_source, MD5_DIGEST_SIZE);
if (! randread_source)
die (_("open failed"), random_source);
@@ -4084,7 +4529,15 @@ main (int argc, char **argv)
IF_LINT (free (sortfiles));
}
else
- sort (files, nfiles, outfile);
+ {
+ /* If NTHREADS > number of cores on the machine, spinlocking
+ could be wasteful. */
+ unsigned long int np2 = num_processors (NPROC_CURRENT_OVERRIDABLE);
+ if (!nthreads || nthreads > np2)
+ nthreads = np2;
+
+ sort (files, nfiles, outfile, nthreads);
+ }
if (have_read_stdin && fclose (stdin) == EOF)
die (_("close failed"), "-");
diff --git a/tests/Makefile.am b/tests/Makefile.am
index a993e8288..fccd00013 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -223,6 +223,7 @@ TESTS = \
misc/shred-remove \
misc/shuf \
misc/sort \
+ misc/sort-benchmark-random \
misc/sort-compress \
misc/sort-continue \
misc/sort-debug-keys \
diff --git a/tests/misc/sort-benchmark-random b/tests/misc/sort-benchmark-random
new file mode 100755
index 000000000..332538115
--- /dev/null
+++ b/tests/misc/sort-benchmark-random
@@ -0,0 +1,52 @@
+#!/bin/sh
+# Benchmark sort on randomly generated data.
+
+# Copyright (C) 2010 Free Software Foundation, Inc.
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+# Written by Glen Lenker.
+
+. "${srcdir=.}/init.sh"; path_prepend_ ../src
+
+very_expensive_
+
+perl -e '
+my $num_lines = 500000;
+my $length = 100;
+
+for (my $i=0; $i < $num_lines; $i++)
+{
+ for (my $j=0; $j < $length; $j++)
+ {
+ printf "%c", 32 + rand(94);
+ }
+ print "\n";
+}' > in || framework_failure
+
+# We need to generate a lot of data for sort to show a noticeable
+# improvement in performance. Sorting it in PERL may take awhile.
+
+perl -e '
+open (FILE, "<in");
+my @list = <FILE>;
+print sort(@list);
+close (FILE);
+' > exp || framework_failure
+
+time sort in > out || fail=1
+
+compare out exp || fail=1
+
+Exit $fail