diff options
-rw-r--r-- | NEWS | 5 | ||||
-rw-r--r-- | bootstrap.conf | 2 | ||||
-rw-r--r-- | doc/coreutils.texi | 15 | ||||
-rw-r--r-- | gl/lib/heap.c | 159 | ||||
-rw-r--r-- | gl/lib/heap.h | 34 | ||||
-rw-r--r-- | gl/modules/heap | 24 | ||||
-rw-r--r-- | src/Makefile.am | 3 | ||||
-rw-r--r-- | src/sort.c | 567 | ||||
-rw-r--r-- | tests/Makefile.am | 1 | ||||
-rwxr-xr-x | tests/misc/sort-benchmark-random | 52 |
10 files changed, 805 insertions, 57 deletions
@@ -30,6 +30,11 @@ GNU coreutils NEWS -*- outline -*- sort -g now uses long doubles for greater range and precision. + sort now uses the number of available processors to parallelize + the sorting operation. The number of sorts run concurrently can be + limited with the --parallel option or with external process + control like taskset for example. + stat no longer accepts the --context (-Z) option. Initially it was merely accepted and ignored, for compatibility. Starting two years ago, with coreutils-7.0, its use evoked a warning. diff --git a/bootstrap.conf b/bootstrap.conf index 644c18b45..fe3974ae1 100644 --- a/bootstrap.conf +++ b/bootstrap.conf @@ -124,6 +124,7 @@ gnulib_modules=" hard-locale hash hash-pjw + heap host-os human idcache @@ -173,6 +174,7 @@ gnulib_modules=" priv-set progname propername + pthread putenv quote quotearg diff --git a/doc/coreutils.texi b/doc/coreutils.texi index 21cf36d1d..942978f33 100644 --- a/doc/coreutils.texi +++ b/doc/coreutils.texi @@ -4068,6 +4068,14 @@ have a large sort or merge that is I/O-bound, you can often improve performance by using this option to specify directories on different disks and controllers. +@item --parallel=@var{n} +@opindex --parallel +@cindex multithreaded sort +Limit the number of sorts run in parallel to @var{n}. By default, +@var{n} is set to the number of available processors, and values +greater than that are reduced to that limit. Also see +@ref{nproc invocation}. + @item -u @itemx --unique @opindex -u @@ -4163,6 +4171,13 @@ sort -n -r @end example @item +Run no more that 4 sorts concurrently, using a buffer size of 10M. + +@example +sort --parallel=4 -S 10M +@end example + +@item Sort alphabetically, omitting the first and second fields and the blanks at the start of the third field. This uses a single key composed of the characters beginning diff --git a/gl/lib/heap.c b/gl/lib/heap.c new file mode 100644 index 000000000..a37224fa0 --- /dev/null +++ b/gl/lib/heap.c @@ -0,0 +1,159 @@ +/* Barebones heap implementation supporting only insert and pop. + + Copyright (C) 2010 Free Software Foundation, Inc. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. */ + +/* Full implementation: GDSL (http://gna.org/projects/gdsl/) by Nicolas + Darnis <ndarnis@free.fr>. */ + +#include <config.h> + +#include "heap.h" +#include "stdlib--.h" +#include "xalloc.h" + +static int heap_default_compare (const void *, const void *); +static size_t heapify_down (void **, size_t, size_t, + int (*)(const void *, const void *)); +static void heapify_up (void **, size_t, + int (*)(const void *, const void *)); + + +/* Allocate memory for the heap. */ + +struct heap * +heap_alloc (int (*compare)(const void *, const void *), size_t n_reserve) +{ + struct heap *heap; + void *xmalloc_ret = xmalloc (sizeof *heap); + heap = (struct heap *) xmalloc_ret; + if (!heap) + return NULL; + + if (n_reserve <= 0) + n_reserve = 1; + + xmalloc_ret = xmalloc (n_reserve * sizeof *(heap->array)); + heap->array = (void **) xmalloc_ret; + if (!heap->array) + { + free (heap); + return NULL; + } + + heap->array[0] = NULL; + heap->capacity = n_reserve; + heap->count = 0; + heap->compare = compare ? compare : heap_default_compare; + + return heap; +} + + +static int +heap_default_compare (const void *a, const void *b) +{ + return 0; +} + + +void +heap_free (struct heap *heap) +{ + free (heap->array); + free (heap); +} + +/* Insert element into heap. */ + +int +heap_insert (struct heap *heap, void *item) +{ + if (heap->capacity - 1 <= heap->count) + { + size_t new_size = (2 + heap->count) * sizeof *(heap->array); + void *realloc_ret = xrealloc (heap->array, new_size); + heap->array = (void **) realloc_ret; + heap->capacity = (2 + heap->count); + + if (!heap->array) + return -1; + } + + heap->array[++heap->count] = item; + heapify_up (heap->array, heap->count, heap->compare); + + return 0; +} + +/* Pop top element off heap. */ + +void * +heap_remove_top (struct heap *heap) +{ + if (heap->count == 0) + return NULL; + + void *top = heap->array[1]; + heap->array[1] = heap->array[heap->count--]; + heapify_down (heap->array, heap->count, 1, heap->compare); + + return top; +} + +/* Move element down into appropriate position in heap. */ + +static size_t +heapify_down (void **array, size_t count, size_t initial, + int (*compare)(const void *, const void *)) +{ + void *element = array[initial]; + + size_t parent = initial; + while (parent <= count / 2) + { + size_t child = 2 * parent; + + if (child < count && compare (array[child], array[child+1]) < 0) + child++; + + if (compare (array[child], element) <= 0) + break; + + array[parent] = array[child]; + parent = child; + } + + array[parent] = element; + return parent; +} + +/* Move element up into appropriate position in heap. */ + +static void +heapify_up (void **array, size_t count, + int (*compare)(const void *, const void *)) +{ + size_t k = count; + void *new_element = array[k]; + + while (k != 1 && compare (array[k/2], new_element) <= 0) + { + array[k] = array[k/2]; + k /= 2; + } + + array[k] = new_element; +} diff --git a/gl/lib/heap.h b/gl/lib/heap.h new file mode 100644 index 000000000..0ea516a79 --- /dev/null +++ b/gl/lib/heap.h @@ -0,0 +1,34 @@ +/* Barebones heap implementation supporting only insert and pop. + + Copyright (C) 2010 Free Software Foundation, Inc. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. */ + +/* Full implementation: GDSL (http://gna.org/projects/gdsl/) by Nicolas + Darnis <ndarnis@free.fr>. Adapted by Gene Auyeung. */ + +#include <stddef.h> + +struct heap +{ + void **array; /* array[0] is not used */ + size_t capacity; /* Array size */ + size_t count; /* Used as index to last element. Also is num of items. */ + int (*compare)(const void *, const void *); +}; + +struct heap *heap_alloc (int (*)(const void *, const void *), size_t); +void heap_free (struct heap *); +int heap_insert (struct heap *heap, void *item); +void *heap_remove_top (struct heap *heap); diff --git a/gl/modules/heap b/gl/modules/heap new file mode 100644 index 000000000..cd97e2965 --- /dev/null +++ b/gl/modules/heap @@ -0,0 +1,24 @@ +Description: +Binary heap with minimal number of methods. Used in sort. + +Files: +lib/heap.c +lib/heap.h + +Depends-on: +stdlib-safer +xalloc + +configure.ac: + +Makefile.am: +lib_SOURCES += heap.c heap.h + +Include: +"heap.h" + +License +GPL + +Maintainer: +Gene Auyeung diff --git a/src/Makefile.am b/src/Makefile.am index 0630a069d..2e0e32088 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -397,6 +397,9 @@ uname_LDADD += $(GETHOSTNAME_LIB) # for strsignal kill_LDADD += $(LIBTHREAD) +# for pthread +sort_LDADD += $(LIB_PTHREAD) + $(PROGRAMS): ../lib/libcoreutils.a # Get the release year from ../lib/version-etc.c. diff --git a/src/sort.c b/src/sort.c index ff8a97a99..5ea1b3476 100644 --- a/src/sort.c +++ b/src/sort.c @@ -23,6 +23,7 @@ #include <config.h> #include <getopt.h> +#include <pthread.h> #include <sys/types.h> #include <sys/wait.h> #include <signal.h> @@ -32,9 +33,11 @@ #include "filevercmp.h" #include "hard-locale.h" #include "hash.h" +#include "heap.h" #include "ignore-value.h" #include "md5.h" #include "mbswidth.h" +#include "nproc.h" #include "physmem.h" #include "posixver.h" #include "quote.h" @@ -93,6 +96,18 @@ struct rlimit { size_t rlim_cur; }; # define DEFAULT_TMPDIR "/tmp" #endif +/* Maximum number of lines to merge every time a NODE is taken from + the MERGE_QUEUE. Node is at LEVEL in the binary merge tree, + and is responsible for merging TOTAL lines. */ +#define MAX_MERGE(total, level) ((total) / ((2 << level) * (2 << level)) + 1) + +/* Heuristic value for the number of lines for which it is worth + creating a subthread, during an internal merge sort, on a machine + that has processors galore. Currently this number is just a guess. + This value must be at least 4. We don't know of any machine where + this number has any practical effect. */ +enum { SUBTHREAD_LINES_HEURISTIC = 4 }; + /* Exit statuses. */ enum { @@ -119,6 +134,15 @@ enum MAX_FORK_TRIES_DECOMPRESS = 9 }; +enum + { + /* Level of the end-of-merge node, one level above the root. */ + MERGE_END = 0, + + /* Level of the root node in merge tree. */ + MERGE_ROOT = 1 + }; + /* The representation of the decimal point in the current locale. */ static int decimal_point; @@ -196,6 +220,31 @@ struct month int val; }; +/* Binary merge tree node. */ +struct merge_node +{ + struct line *lo; /* Lines to merge from LO child node. */ + struct line *hi; /* Lines to merge from HI child ndoe. */ + struct line *end_lo; /* End of available lines from LO. */ + struct line *end_hi; /* End of available lines from HI. */ + struct line **dest; /* Pointer to destination of merge. */ + size_t nlo; /* Total Lines remaining from LO. */ + size_t nhi; /* Total lines remaining from HI. */ + size_t level; /* Level in merge tree. */ + struct merge_node *parent; /* Parent node. */ + bool queued; /* Node is already in heap. */ + pthread_spinlock_t *lock; /* Lock for node operations. */ +}; + +/* Priority queue of merge nodes. */ +struct merge_node_queue +{ + struct heap *priority_queue; /* Priority queue of merge tree nodes. */ + pthread_mutex_t mutex; /* Lock for queue operations. */ + pthread_cond_t cond; /* Conditional wait for empty queue to populate + when popping. */ +}; + /* FIXME: None of these tables work with multibyte character sets. Also, there are many other bugs when handling multibyte characters. One way to fix this is to rewrite `sort' to use wide characters @@ -300,8 +349,6 @@ static bool debug; number are present, temp files will be used. */ static unsigned int nmerge = NMERGE_DEFAULT; -static void sortlines_temp (struct line *, size_t, struct line *); - /* Report MESSAGE for FILE, then clean up and exit. If FILE is null, it represents standard output. */ @@ -398,6 +445,7 @@ Other options:\n\ -t, --field-separator=SEP use SEP instead of non-blank to blank transition\n\ -T, --temporary-directory=DIR use DIR for temporaries, not $TMPDIR or %s;\n\ multiple options specify multiple directories\n\ + --parallel=N limit the number of sorts run concurrently to N\n\ -u, --unique with -c, check for strict ordering;\n\ without -c, output only the first of an equal run\n\ "), DEFAULT_TMPDIR); @@ -442,7 +490,8 @@ enum FILES0_FROM_OPTION, NMERGE_OPTION, RANDOM_SOURCE_OPTION, - SORT_OPTION + SORT_OPTION, + PARALLEL_OPTION }; static char const short_options[] = "-bcCdfghik:mMno:rRsS:t:T:uVy:z"; @@ -476,6 +525,7 @@ static struct option const long_options[] = {"temporary-directory", required_argument, NULL, 'T'}, {"unique", no_argument, NULL, 'u'}, {"zero-terminated", no_argument, NULL, 'z'}, + {"parallel", required_argument, NULL, PARALLEL_OPTION}, {GETOPT_HELP_OPTION_DECL}, {GETOPT_VERSION_OPTION_DECL}, {NULL, 0, NULL, 0}, @@ -1333,6 +1383,22 @@ specify_sort_size (int oi, char c, char const *s) xstrtol_fatal (e, oi, c, long_options, s); } +/* Specify the number of threads to spawn during internal sort. */ +static unsigned long int +specify_nthreads (int oi, char c, char const *s) +{ + unsigned long int nthreads; + enum strtol_error e = xstrtoul (s, NULL, 10, &nthreads, ""); + if (e == LONGINT_OVERFLOW) + return ULONG_MAX; + if (e != LONGINT_OK) + xstrtol_fatal (e, oi, c, long_options, s); + if (nthreads == 0) + error (SORT_FAILURE, 0, _("number in parallel must be nonzero")); + return nthreads; +} + + /* Return the default sort size. */ static size_t default_sort_size (void) @@ -2951,25 +3017,28 @@ mergefiles (struct sortfile *files, size_t ntemps, size_t nfiles, return nopened; } -/* Merge into T the two sorted arrays of lines LO (with NLO members) - and HI (with NHI members). T, LO, and HI point just past their - respective arrays, and the arrays are in reverse order. NLO and - NHI must be positive, and HI - NHI must equal T - (NLO + NHI). */ +/* Merge into T (of size NLINES) the two sorted arrays of lines + LO (with NLINES / 2 members), and + T - (NLINES / 2) (with NLINES - NLINES / 2 members). + T and LO point just past their respective arrays, and the arrays + are in reverse order. NLINES must be at least 2. */ static inline void -mergelines (struct line *t, - struct line const *lo, size_t nlo, - struct line const *hi, size_t nhi) +mergelines (struct line *restrict t, size_t nlines, + struct line const *restrict lo) { + size_t nlo = nlines / 2; + size_t nhi = nlines - nlo; + struct line *hi = t - nlo; + while (true) if (compare (lo - 1, hi - 1, false) <= 0) { *--t = *--lo; if (! --nlo) { - /* HI - NHI equalled T - (NLO + NHI) when this function - began. Therefore HI must equal T now, and there is no - need to copy from HI to T. */ + /* HI must equal T now, and there is no need to copy from + HI to T. */ return; } } @@ -3000,15 +3069,25 @@ mergelines (struct line *t, D. A. Bell, Comp J. 1 (1958), 75. */ static void -sortlines (struct line *lines, size_t nlines, struct line *temp) +sequential_sort (struct line *restrict lines, size_t nlines, + struct line *restrict temp, bool to_temp) { if (nlines == 2) { - if (0 < compare (&lines[-1], &lines[-2], false)) + /* Declare `swap' as int, not bool, to work around a bug + <http://lists.gnu.org/archive/html/bug-coreutils/2005-10/msg00086.html> + in the IBM xlc 6.0.0.0 compiler in 64-bit mode. */ + int swap = (0 < compare (&lines[-1], &lines[-2], false)); + if (to_temp) { - struct line tmp = lines[-1]; + temp[-1] = lines[-1 - swap]; + temp[-2] = lines[-2 + swap]; + } + else if (swap) + { + temp[-1] = lines[-1]; lines[-1] = lines[-2]; - lines[-2] = tmp; + lines[-2] = temp[-1]; } } else @@ -3017,46 +3096,386 @@ sortlines (struct line *lines, size_t nlines, struct line *temp) size_t nhi = nlines - nlo; struct line *lo = lines; struct line *hi = lines - nlo; - struct line *sorted_lo = temp; - sortlines (hi, nhi, temp); + sequential_sort (hi, nhi, temp - (to_temp ? nlo : 0), to_temp); if (1 < nlo) - sortlines_temp (lo, nlo, sorted_lo); + sequential_sort (lo, nlo, temp, !to_temp); + else if (!to_temp) + temp[-1] = lo[-1]; + + struct line *dest; + struct line const *sorted_lo; + if (to_temp) + { + dest = temp; + sorted_lo = lines; + } else - sorted_lo[-1] = lo[-1]; + { + dest = lines; + sorted_lo = temp; + } + mergelines (dest, nlines, sorted_lo); + } +} + +/* Compare two NODEs for priority. The NODE with the higher (numerically + lower) level has priority. If tie, the NODE with the most remaining + lines has priority. */ + +static int +compare_nodes (const void *a, const void *b) +{ + const struct merge_node *nodea = (const struct merge_node *) a; + const struct merge_node *nodeb = (const struct merge_node *) b; + if (nodea->level == nodeb->level) + return (nodea->nlo + nodea->nhi) < (nodeb->nlo + nodeb->nhi); + return nodea->level < nodeb->level; +} + +/* Lock a merge tree NODE. + Note spin locks were seen to perform better than mutexes + as long as the number of threads is limited to the + number of processors. */ - mergelines (lines, sorted_lo, nlo, hi, nhi); +static inline void +lock_node (struct merge_node *const restrict node) +{ + pthread_spin_lock (node->lock); +} + +/* Unlock a merge tree NODE. */ + +static inline void +unlock_node (struct merge_node *const restrict node) +{ + pthread_spin_unlock (node->lock); +} + +/* Destroy merge QUEUE. */ + +static inline void +queue_destroy (struct merge_node_queue *const restrict queue) +{ + heap_free (queue->priority_queue); + pthread_cond_destroy (&queue->cond); + pthread_mutex_destroy (&queue->mutex); +} + +/* Initialize merge QUEUE, allocating space for a maximum of RESERVE nodes. + Though it's highly unlikely all nodes are in the heap at the same time, + RESERVE should accommodate all of them. Counting a NULL dummy head for the + heap, RESERVE should be 2 * NTHREADS. */ + +static inline void +queue_init (struct merge_node_queue *const restrict queue, size_t reserve) +{ + queue->priority_queue = (struct heap *) heap_alloc (compare_nodes, reserve); + pthread_mutex_init (&queue->mutex, NULL); + pthread_cond_init (&queue->cond, NULL); +} + +/* Insert NODE into priority QUEUE. Assume caller either holds lock on NODE + or does not need to lock NODE. */ + +static inline void +queue_insert (struct merge_node_queue *const restrict queue, + struct merge_node *const restrict node) +{ + pthread_mutex_lock (&queue->mutex); + heap_insert (queue->priority_queue, node); + node->queued = true; + pthread_mutex_unlock (&queue->mutex); + pthread_cond_signal (&queue->cond); +} + +/* Pop NODE off priority QUEUE. Guarantee a non-null, spinlocked NODE. */ + +static inline struct merge_node * +queue_pop (struct merge_node_queue *const restrict queue) +{ + struct merge_node *node = NULL; + + while (!node) + { + pthread_mutex_lock (&queue->mutex); + if (queue->priority_queue->count) + node = (struct merge_node *) heap_remove_top (queue->priority_queue); + else + { + /* Go into conditional wait if no NODE is immediately available. */ + pthread_cond_wait (&queue->cond, &queue->mutex); + } + pthread_mutex_unlock (&queue->mutex); } + lock_node (node); + node->queued = false; + return node; } -/* Like sortlines (LINES, NLINES, TEMP), except output into TEMP - rather than sorting in place. */ +/* If UNQIUE is set, checks to make sure line isn't a duplicate before + outputting. If UNIQUE is not set, output the passed in line. Note that + this function does not actually save the line, nor any key information, + thus is only appropriate for internal sort. */ -static void -sortlines_temp (struct line *lines, size_t nlines, struct line *temp) +static inline void +write_unique (struct line *const restrict line, FILE *tfp, + const char *temp_output) { - if (nlines == 2) + static struct line *saved = NULL; + + if (!unique) + write_bytes (line, tfp, temp_output); + else if (!saved || compare (line, saved, false)) { - /* Declare `swap' as int, not bool, to work around a bug - <http://lists.gnu.org/archive/html/bug-coreutils/2005-10/msg00086.html> - in the IBM xlc 6.0.0.0 compiler in 64-bit mode. */ - int swap = (0 < compare (&lines[-1], &lines[-2], false)); - temp[-1] = lines[-1 - swap]; - temp[-2] = lines[-2 + swap]; + saved = line; + write_bytes (line, tfp, temp_output); + } +} + +/* Merge the lines currently available to a NODE in the binary + merge tree, up to a maximum specified by MAX_MERGE. */ + +static inline size_t +mergelines_node (struct merge_node *const restrict node, size_t total_lines, + FILE *tfp, const char *temp_output) +{ + struct line *lo_orig = node->lo; + struct line *hi_orig = node->hi; + size_t to_merge = MAX_MERGE (total_lines, node->level); + size_t merged_lo; + size_t merged_hi; + + if (node->level > MERGE_ROOT) + { + /* Merge to destination buffer. */ + struct line *dest = *node->dest; + while (node->lo != node->end_lo && node->hi != node->end_hi && to_merge--) + if (compare (node->lo - 1, node->hi - 1, false) <= 0) + *--dest = *--node->lo; + else + *--dest = *--node->hi; + + merged_lo = lo_orig - node->lo; + merged_hi = hi_orig - node->hi; + + if (node->nhi == merged_hi) + while (node->lo != node->end_lo && to_merge--) + *--dest = *--node->lo; + else if (node->nlo == merged_lo) + while (node->hi != node->end_hi && to_merge--) + *--dest = *--node->hi; } else { - size_t nlo = nlines / 2; - size_t nhi = nlines - nlo; - struct line *lo = lines; - struct line *hi = lines - nlo; - struct line *sorted_hi = temp - nlo; + /* Merge directly to output. */ + while (node->lo != node->end_lo && node->hi != node->end_hi && to_merge--) + { + if (compare (node->lo - 1, node->hi - 1, false) <= 0) + write_unique (--node->lo, tfp, temp_output); + else + write_unique (--node->hi, tfp, temp_output); + } + + merged_lo = lo_orig - node->lo; + merged_hi = hi_orig - node->hi; + + if (node->nhi == merged_hi) + { + while (node->lo != node->end_lo && to_merge--) + write_unique (--node->lo, tfp, temp_output); + } + else if (node->nlo == merged_lo) + { + while (node->hi != node->end_hi && to_merge--) + write_unique (--node->hi, tfp, temp_output); + } + node->dest -= lo_orig - node->lo + hi_orig - node->hi; + } + + /* Update NODE. */ + merged_lo = lo_orig - node->lo; + merged_hi = hi_orig - node->hi; + node->nlo -= merged_lo; + node->nhi -= merged_hi; + return merged_lo + merged_hi; +} + +/* Insert NODE into QUEUE if it passes insertion checks. */ + +static inline void +check_insert (struct merge_node *node, + struct merge_node_queue *const restrict queue) +{ + size_t lo_avail = node->lo - node->end_lo; + size_t hi_avail = node->hi - node->end_hi; + + /* Conditions for insertion: + 1. NODE is not already in heap. + 2. NODE has available lines from both it's children, OR one child has + available lines, but the other has exhausted all its lines. */ + if ((!node->queued) + && ((lo_avail && (hi_avail || !(node->nhi))) + || (hi_avail && !(node->nlo)))) + { + queue_insert (queue, node); + } +} + +/* Update parent merge tree NODE. */ + +static inline void +update_parent (struct merge_node *const restrict node, size_t merged, + struct merge_node_queue *const restrict queue) +{ + if (node->level > MERGE_ROOT) + { + lock_node (node->parent); + *node->dest -= merged; + check_insert (node->parent, queue); + unlock_node (node->parent); + } + else if (node->nlo + node->nhi == 0) + { + /* If the MERGE_ROOT NODE has finished merging, insert the + MERGE_END node. */ + queue_insert (queue, node->parent); + } +} + +/* Repeatedly pop QUEUE for a NODE with lines to merge, and merge at least + some of those lines, until the MERGE_END node is popped. */ + +static void +merge_loop (struct merge_node_queue *const restrict queue, + const size_t total_lines, FILE *tfp, const char *temp_output) +{ + while (1) + { + struct merge_node *node = queue_pop (queue); + + if (node->level == MERGE_END) + { + unlock_node (node); + /* Reinsert so other threads can pop it. */ + queue_insert (queue, node); + break; + } + size_t merged_lines = mergelines_node (node, total_lines, tfp, + temp_output); + check_insert (node, queue); + update_parent (node, merged_lines, queue); + + unlock_node (node); + } +} + + +static void sortlines (struct line *restrict, struct line *restrict, + unsigned long int, const size_t, + struct merge_node *const restrict, bool, + struct merge_node_queue *const restrict, + FILE *, const char *); + +/* Thread arguments for sortlines_thread. */ + +struct thread_args +{ + struct line *lines; + struct line *dest; + unsigned long int nthreads; + const size_t total_lines; + struct merge_node *const restrict parent; + bool lo_child; + struct merge_node_queue *const restrict merge_queue; + FILE *tfp; + const char *output_temp; +}; + +/* Like sortlines, except with a signature acceptable to pthread_create. */ - sortlines_temp (hi, nhi, sorted_hi); +static void * +sortlines_thread (void *data) +{ + struct thread_args const *args = data; + sortlines (args->lines, args->dest, args->nthreads, args->total_lines, + args->parent, args->lo_child, args->merge_queue, + args->tfp, args->output_temp); + return NULL; +} + +/* There are three phases to the algorithm: node creation, sequential sort, + and binary merge. + + During node creation, sortlines recursively visits each node in the + binary merge tree and creates a NODE structure corresponding to all the + future line merging NODE is responsible for. For each call to + sortlines, half the available threads are assigned to each recursive + call, until a leaf node having only 1 available thread is reached. + + Each leaf node then performs two sequential sorts, one on each half of + the lines it is responsible for. It records in its NODE structure that + there are two sorted sublists available to merge from, and inserts its + NODE into the priority queue. + + The binary merge phase then begins. Each thread drops into a loop + where the thread retrieves a NODE from the priority queue, merges lines + available to that NODE, and potentially insert NODE or its parent back + into the queue if there are sufficient available lines for them to + merge. This continues until all lines at all nodes of the merge tree + have been merged. */ + +static void +sortlines (struct line *restrict lines, struct line *restrict dest, + unsigned long int nthreads, const size_t total_lines, + struct merge_node *const restrict parent, bool lo_child, + struct merge_node_queue *const restrict merge_queue, + FILE *tfp, const char *temp_output) +{ + /* Create merge tree NODE. */ + size_t nlines = (lo_child)? parent->nlo : parent->nhi; + size_t nlo = nlines / 2; + size_t nhi = nlines - nlo; + struct line *lo = dest - total_lines; + struct line *hi = lo - nlo; + struct line **parent_end = (lo_child)? &parent->end_lo : &parent->end_hi; + pthread_spinlock_t lock; + pthread_spin_init (&lock, PTHREAD_PROCESS_PRIVATE); + struct merge_node node = {lo, hi, lo, hi, parent_end, nlo, nhi, + parent->level + 1, parent, false, &lock}; + + /* Calculate thread arguments. */ + unsigned long int lo_threads = nthreads / 2; + unsigned long int hi_threads = nthreads - lo_threads; + pthread_t thread; + struct thread_args args = {lines, lo, lo_threads, total_lines, &node, + true, merge_queue, tfp, temp_output}; + + if (nthreads > 1 && SUBTHREAD_LINES_HEURISTIC <= nlines + && pthread_create (&thread, NULL, sortlines_thread, &args) == 0) + { + sortlines (lines - nlo, hi, hi_threads, total_lines, &node, false, + merge_queue, tfp, temp_output); + pthread_join (thread, NULL); + } + else + { + /* Nthreads = 1, this is a leaf NODE, or pthread_create failed. + Sort with 1 thread. */ + struct line *temp = lines - total_lines; + if (1 < nhi) + sequential_sort (lines - nlo, nhi, temp - nlo / 2, false); if (1 < nlo) - sortlines (lo, nlo, temp); + sequential_sort (lines, nlo, temp, false); - mergelines (temp, lo, nlo, sorted_hi, nhi); + /* Update merge NODE. No need to lock yet. */ + node.lo = lines; + node.hi = lines - nlo; + node.end_lo = lines - nlo; + node.end_hi = lines - nlo - nhi; + + queue_insert (merge_queue, &node); + merge_loop (merge_queue, total_lines, tfp, temp_output); } } @@ -3262,7 +3681,8 @@ merge (struct sortfile *files, size_t ntemps, size_t nfiles, /* Sort NFILES FILES onto OUTPUT_FILE. */ static void -sort (char * const *files, size_t nfiles, char const *output_file) +sort (char * const *files, size_t nfiles, char const *output_file, + unsigned long int nthreads) { struct buffer buf; size_t ntemps = 0; @@ -3276,8 +3696,22 @@ sort (char * const *files, size_t nfiles, char const *output_file) char const *file = *files; FILE *fp = xfopen (file, "r"); FILE *tfp; - size_t bytes_per_line = (2 * sizeof (struct line) - - sizeof (struct line) / 2); + + size_t bytes_per_line; + if (nthreads > 1) + { + /* Get log P. */ + unsigned long int tmp = 1; + size_t mult = 1; + while (tmp < nthreads) + { + tmp *= 2; + mult++; + } + bytes_per_line = (mult * sizeof (struct line)); + } + else + bytes_per_line = sizeof (struct line) * 3 / 2; if (! buf.alloc) initbuf (&buf, bytes_per_line, @@ -3289,7 +3723,6 @@ sort (char * const *files, size_t nfiles, char const *output_file) while (fillbuf (&buf, fp, file)) { struct line *line; - struct line *linebase; if (buf.eof && nfiles && (bytes_per_line + 1 @@ -3303,9 +3736,6 @@ sort (char * const *files, size_t nfiles, char const *output_file) } line = buffer_linelim (&buf); - linebase = line - buf.nlines; - if (1 < buf.nlines) - sortlines (line, buf.nlines, linebase); if (buf.eof && !nfiles && !ntemps && !buf.left) { xfclose (fp, file); @@ -3318,16 +3748,23 @@ sort (char * const *files, size_t nfiles, char const *output_file) ++ntemps; temp_output = create_temp (&tfp, NULL); } - - do + if (1 < buf.nlines) { - line--; - write_bytes (line, tfp, temp_output); - if (unique) - while (linebase < line && compare (line, line - 1, false) == 0) - line--; + struct merge_node_queue merge_queue; + queue_init (&merge_queue, 2 * nthreads); + + pthread_spinlock_t lock; + pthread_spin_init (&lock, PTHREAD_PROCESS_PRIVATE); + struct merge_node node = + {NULL, NULL, NULL, NULL, NULL, buf.nlines, + buf.nlines, MERGE_END, NULL, false, &lock}; + + sortlines (line, line, nthreads, buf.nlines, &node, true, + &merge_queue, tfp, temp_output); + queue_destroy (&merge_queue); } - while (linebase < line); + else + write_unique (line - 1, tfp, temp_output); xfclose (tfp, temp_output); @@ -3547,6 +3984,7 @@ main (int argc, char **argv) bool mergeonly = false; char *random_source = NULL; bool need_random = false; + unsigned long int nthreads = 0; size_t nfiles = 0; bool posixly_correct = (getenv ("POSIXLY_CORRECT") != NULL); bool obsolete_usage = (posix2_version () < 200112); @@ -3882,6 +4320,10 @@ main (int argc, char **argv) add_temp_dir (optarg); break; + case PARALLEL_OPTION: + nthreads = specify_nthreads (oi, c, optarg); + break; + case 'u': unique = true; break; @@ -4030,6 +4472,9 @@ main (int argc, char **argv) if (need_random) { + /* Threading does not lock the randread_source structure, so + downgrade to one thread to avoid race conditions. */ + nthreads = 1; randread_source = randread_new (random_source, MD5_DIGEST_SIZE); if (! randread_source) die (_("open failed"), random_source); @@ -4084,7 +4529,15 @@ main (int argc, char **argv) IF_LINT (free (sortfiles)); } else - sort (files, nfiles, outfile); + { + /* If NTHREADS > number of cores on the machine, spinlocking + could be wasteful. */ + unsigned long int np2 = num_processors (NPROC_CURRENT_OVERRIDABLE); + if (!nthreads || nthreads > np2) + nthreads = np2; + + sort (files, nfiles, outfile, nthreads); + } if (have_read_stdin && fclose (stdin) == EOF) die (_("close failed"), "-"); diff --git a/tests/Makefile.am b/tests/Makefile.am index a993e8288..fccd00013 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -223,6 +223,7 @@ TESTS = \ misc/shred-remove \ misc/shuf \ misc/sort \ + misc/sort-benchmark-random \ misc/sort-compress \ misc/sort-continue \ misc/sort-debug-keys \ diff --git a/tests/misc/sort-benchmark-random b/tests/misc/sort-benchmark-random new file mode 100755 index 000000000..332538115 --- /dev/null +++ b/tests/misc/sort-benchmark-random @@ -0,0 +1,52 @@ +#!/bin/sh +# Benchmark sort on randomly generated data. + +# Copyright (C) 2010 Free Software Foundation, Inc. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +# Written by Glen Lenker. + +. "${srcdir=.}/init.sh"; path_prepend_ ../src + +very_expensive_ + +perl -e ' +my $num_lines = 500000; +my $length = 100; + +for (my $i=0; $i < $num_lines; $i++) +{ + for (my $j=0; $j < $length; $j++) + { + printf "%c", 32 + rand(94); + } + print "\n"; +}' > in || framework_failure + +# We need to generate a lot of data for sort to show a noticeable +# improvement in performance. Sorting it in PERL may take awhile. + +perl -e ' +open (FILE, "<in"); +my @list = <FILE>; +print sort(@list); +close (FILE); +' > exp || framework_failure + +time sort in > out || fail=1 + +compare out exp || fail=1 + +Exit $fail |