summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChen Guo <chenguo4@yahoo.com>2010-07-09 08:03:50 +0100
committerPádraig Brady <P@draigBrady.com>2010-07-13 01:44:46 +0100
commit9face836f36c507f01a7d7a33138c5a303e3b1df (patch)
treee71865d7bb7882ba3b8d06a8cff2c38ad9511baf /src
parentf6e8c18f8d2077ea84aef679852622c8bd73789c (diff)
downloadcoreutils-9face836f36c507f01a7d7a33138c5a303e3b1df.tar.xz
sort: parallelize internal sort
This patch is by Gene Auyeung, Chris Dickens, Chen Guo, and Mike Nichols, based off of a patch by Paul Eggert, Glen Lenker, et. al., with a basic heap implementation based off of the GDSL heap, originally by Nicolas Darnis. The number of sorts done in parallel is limited to the number of available processors by default, or can be further restricted with the --parallel option. On a dual-die, 8 core Intel Xeon, results show sorting with 8 threads is almost 4 times faster than using a single thread. Timings when sorting a 96MB file: THREADS TIME (s) 1 5.10 2 2.87 4 1.75 8 1.31 Single threaded sorting has also been improved, especially for cheaper comparison operations: COMMAND BEFORE (s) AFTER (s) sort 8.822 8.716 sort -g 10.336 10.222 sort -n 3.077 2.961 LANG=C sort 2.169 2.066 * bootstrap.conf: Add heap, pthread. * coreutils.texi (sort): Describe the new --parallel option. * gl/lib/heap.c: New file. Very basic heap implementation. * gl/lib/heap.h: New file. * gl/modules/heap: New file. * src/Makefile.am: Add LIB_PTHREAD. * src/sort.c: Include heap.h, nproc.h, pthread.h. (MAX_MERGE): New macro. (SUBTHREAD_LINES_HEURISTIC, PARALLEL_OPTION): New constants. (MERGE_END, MERGE_ROOT): New constants. (struct merge_node): New struct. (struct merge_node_queue): New struct. (sortlines temp): Remove declaration. (usage, long_options, main): New option, --parallel. (specify_nthreads): New function. (mergelines): New signature, to emphasize the fact that the HI area must be part of the destination. All callers changed. (sequential_sort): New function, renamed from sortlines. Merge in the functionality of sortlines_temp. (compare_nodes): New function. (lock_node, unlock_node): New functions. (queue_destroy): New function. (queue_init): New function. (queue_insert): New function. (queue_pop): New function. (write_unique): New function. (mergelines_node): New function. (check_insert): New function. (update_parent): New function. (merge_loop): New function. (sortlines): Rewrite to support and use parallelism, with a new signature. All callers changed. (struct thread_args): New struct. (sortlines_thread): New function. (sortlines_temp): Remove. (sort): New argument NTHREADS. All uses changed. Output moved to mergelines_node. (main): disable threading if we are sorting at random. * tests/Makefile.am (TESTS): Add misc/sort-benchmark-random. * tests/misc/sort-benchmark-random: New file. Signed-off-by: Pádraig Brady <P@draigBrady.com>
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am3
-rw-r--r--src/sort.c567
2 files changed, 513 insertions, 57 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 0630a069d..2e0e32088 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -397,6 +397,9 @@ uname_LDADD += $(GETHOSTNAME_LIB)
# for strsignal
kill_LDADD += $(LIBTHREAD)
+# for pthread
+sort_LDADD += $(LIB_PTHREAD)
+
$(PROGRAMS): ../lib/libcoreutils.a
# Get the release year from ../lib/version-etc.c.
diff --git a/src/sort.c b/src/sort.c
index ff8a97a99..5ea1b3476 100644
--- a/src/sort.c
+++ b/src/sort.c
@@ -23,6 +23,7 @@
#include <config.h>
#include <getopt.h>
+#include <pthread.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <signal.h>
@@ -32,9 +33,11 @@
#include "filevercmp.h"
#include "hard-locale.h"
#include "hash.h"
+#include "heap.h"
#include "ignore-value.h"
#include "md5.h"
#include "mbswidth.h"
+#include "nproc.h"
#include "physmem.h"
#include "posixver.h"
#include "quote.h"
@@ -93,6 +96,18 @@ struct rlimit { size_t rlim_cur; };
# define DEFAULT_TMPDIR "/tmp"
#endif
+/* Maximum number of lines to merge every time a NODE is taken from
+ the MERGE_QUEUE. Node is at LEVEL in the binary merge tree,
+ and is responsible for merging TOTAL lines. */
+#define MAX_MERGE(total, level) ((total) / ((2 << level) * (2 << level)) + 1)
+
+/* Heuristic value for the number of lines for which it is worth
+ creating a subthread, during an internal merge sort, on a machine
+ that has processors galore. Currently this number is just a guess.
+ This value must be at least 4. We don't know of any machine where
+ this number has any practical effect. */
+enum { SUBTHREAD_LINES_HEURISTIC = 4 };
+
/* Exit statuses. */
enum
{
@@ -119,6 +134,15 @@ enum
MAX_FORK_TRIES_DECOMPRESS = 9
};
+enum
+ {
+ /* Level of the end-of-merge node, one level above the root. */
+ MERGE_END = 0,
+
+ /* Level of the root node in merge tree. */
+ MERGE_ROOT = 1
+ };
+
/* The representation of the decimal point in the current locale. */
static int decimal_point;
@@ -196,6 +220,31 @@ struct month
int val;
};
+/* Binary merge tree node. */
+struct merge_node
+{
+ struct line *lo; /* Lines to merge from LO child node. */
+ struct line *hi; /* Lines to merge from HI child ndoe. */
+ struct line *end_lo; /* End of available lines from LO. */
+ struct line *end_hi; /* End of available lines from HI. */
+ struct line **dest; /* Pointer to destination of merge. */
+ size_t nlo; /* Total Lines remaining from LO. */
+ size_t nhi; /* Total lines remaining from HI. */
+ size_t level; /* Level in merge tree. */
+ struct merge_node *parent; /* Parent node. */
+ bool queued; /* Node is already in heap. */
+ pthread_spinlock_t *lock; /* Lock for node operations. */
+};
+
+/* Priority queue of merge nodes. */
+struct merge_node_queue
+{
+ struct heap *priority_queue; /* Priority queue of merge tree nodes. */
+ pthread_mutex_t mutex; /* Lock for queue operations. */
+ pthread_cond_t cond; /* Conditional wait for empty queue to populate
+ when popping. */
+};
+
/* FIXME: None of these tables work with multibyte character sets.
Also, there are many other bugs when handling multibyte characters.
One way to fix this is to rewrite `sort' to use wide characters
@@ -300,8 +349,6 @@ static bool debug;
number are present, temp files will be used. */
static unsigned int nmerge = NMERGE_DEFAULT;
-static void sortlines_temp (struct line *, size_t, struct line *);
-
/* Report MESSAGE for FILE, then clean up and exit.
If FILE is null, it represents standard output. */
@@ -398,6 +445,7 @@ Other options:\n\
-t, --field-separator=SEP use SEP instead of non-blank to blank transition\n\
-T, --temporary-directory=DIR use DIR for temporaries, not $TMPDIR or %s;\n\
multiple options specify multiple directories\n\
+ --parallel=N limit the number of sorts run concurrently to N\n\
-u, --unique with -c, check for strict ordering;\n\
without -c, output only the first of an equal run\n\
"), DEFAULT_TMPDIR);
@@ -442,7 +490,8 @@ enum
FILES0_FROM_OPTION,
NMERGE_OPTION,
RANDOM_SOURCE_OPTION,
- SORT_OPTION
+ SORT_OPTION,
+ PARALLEL_OPTION
};
static char const short_options[] = "-bcCdfghik:mMno:rRsS:t:T:uVy:z";
@@ -476,6 +525,7 @@ static struct option const long_options[] =
{"temporary-directory", required_argument, NULL, 'T'},
{"unique", no_argument, NULL, 'u'},
{"zero-terminated", no_argument, NULL, 'z'},
+ {"parallel", required_argument, NULL, PARALLEL_OPTION},
{GETOPT_HELP_OPTION_DECL},
{GETOPT_VERSION_OPTION_DECL},
{NULL, 0, NULL, 0},
@@ -1333,6 +1383,22 @@ specify_sort_size (int oi, char c, char const *s)
xstrtol_fatal (e, oi, c, long_options, s);
}
+/* Specify the number of threads to spawn during internal sort. */
+static unsigned long int
+specify_nthreads (int oi, char c, char const *s)
+{
+ unsigned long int nthreads;
+ enum strtol_error e = xstrtoul (s, NULL, 10, &nthreads, "");
+ if (e == LONGINT_OVERFLOW)
+ return ULONG_MAX;
+ if (e != LONGINT_OK)
+ xstrtol_fatal (e, oi, c, long_options, s);
+ if (nthreads == 0)
+ error (SORT_FAILURE, 0, _("number in parallel must be nonzero"));
+ return nthreads;
+}
+
+
/* Return the default sort size. */
static size_t
default_sort_size (void)
@@ -2951,25 +3017,28 @@ mergefiles (struct sortfile *files, size_t ntemps, size_t nfiles,
return nopened;
}
-/* Merge into T the two sorted arrays of lines LO (with NLO members)
- and HI (with NHI members). T, LO, and HI point just past their
- respective arrays, and the arrays are in reverse order. NLO and
- NHI must be positive, and HI - NHI must equal T - (NLO + NHI). */
+/* Merge into T (of size NLINES) the two sorted arrays of lines
+ LO (with NLINES / 2 members), and
+ T - (NLINES / 2) (with NLINES - NLINES / 2 members).
+ T and LO point just past their respective arrays, and the arrays
+ are in reverse order. NLINES must be at least 2. */
static inline void
-mergelines (struct line *t,
- struct line const *lo, size_t nlo,
- struct line const *hi, size_t nhi)
+mergelines (struct line *restrict t, size_t nlines,
+ struct line const *restrict lo)
{
+ size_t nlo = nlines / 2;
+ size_t nhi = nlines - nlo;
+ struct line *hi = t - nlo;
+
while (true)
if (compare (lo - 1, hi - 1, false) <= 0)
{
*--t = *--lo;
if (! --nlo)
{
- /* HI - NHI equalled T - (NLO + NHI) when this function
- began. Therefore HI must equal T now, and there is no
- need to copy from HI to T. */
+ /* HI must equal T now, and there is no need to copy from
+ HI to T. */
return;
}
}
@@ -3000,15 +3069,25 @@ mergelines (struct line *t,
D. A. Bell, Comp J. 1 (1958), 75. */
static void
-sortlines (struct line *lines, size_t nlines, struct line *temp)
+sequential_sort (struct line *restrict lines, size_t nlines,
+ struct line *restrict temp, bool to_temp)
{
if (nlines == 2)
{
- if (0 < compare (&lines[-1], &lines[-2], false))
+ /* Declare `swap' as int, not bool, to work around a bug
+ <http://lists.gnu.org/archive/html/bug-coreutils/2005-10/msg00086.html>
+ in the IBM xlc 6.0.0.0 compiler in 64-bit mode. */
+ int swap = (0 < compare (&lines[-1], &lines[-2], false));
+ if (to_temp)
{
- struct line tmp = lines[-1];
+ temp[-1] = lines[-1 - swap];
+ temp[-2] = lines[-2 + swap];
+ }
+ else if (swap)
+ {
+ temp[-1] = lines[-1];
lines[-1] = lines[-2];
- lines[-2] = tmp;
+ lines[-2] = temp[-1];
}
}
else
@@ -3017,46 +3096,386 @@ sortlines (struct line *lines, size_t nlines, struct line *temp)
size_t nhi = nlines - nlo;
struct line *lo = lines;
struct line *hi = lines - nlo;
- struct line *sorted_lo = temp;
- sortlines (hi, nhi, temp);
+ sequential_sort (hi, nhi, temp - (to_temp ? nlo : 0), to_temp);
if (1 < nlo)
- sortlines_temp (lo, nlo, sorted_lo);
+ sequential_sort (lo, nlo, temp, !to_temp);
+ else if (!to_temp)
+ temp[-1] = lo[-1];
+
+ struct line *dest;
+ struct line const *sorted_lo;
+ if (to_temp)
+ {
+ dest = temp;
+ sorted_lo = lines;
+ }
else
- sorted_lo[-1] = lo[-1];
+ {
+ dest = lines;
+ sorted_lo = temp;
+ }
+ mergelines (dest, nlines, sorted_lo);
+ }
+}
+
+/* Compare two NODEs for priority. The NODE with the higher (numerically
+ lower) level has priority. If tie, the NODE with the most remaining
+ lines has priority. */
+
+static int
+compare_nodes (const void *a, const void *b)
+{
+ const struct merge_node *nodea = (const struct merge_node *) a;
+ const struct merge_node *nodeb = (const struct merge_node *) b;
+ if (nodea->level == nodeb->level)
+ return (nodea->nlo + nodea->nhi) < (nodeb->nlo + nodeb->nhi);
+ return nodea->level < nodeb->level;
+}
+
+/* Lock a merge tree NODE.
+ Note spin locks were seen to perform better than mutexes
+ as long as the number of threads is limited to the
+ number of processors. */
- mergelines (lines, sorted_lo, nlo, hi, nhi);
+static inline void
+lock_node (struct merge_node *const restrict node)
+{
+ pthread_spin_lock (node->lock);
+}
+
+/* Unlock a merge tree NODE. */
+
+static inline void
+unlock_node (struct merge_node *const restrict node)
+{
+ pthread_spin_unlock (node->lock);
+}
+
+/* Destroy merge QUEUE. */
+
+static inline void
+queue_destroy (struct merge_node_queue *const restrict queue)
+{
+ heap_free (queue->priority_queue);
+ pthread_cond_destroy (&queue->cond);
+ pthread_mutex_destroy (&queue->mutex);
+}
+
+/* Initialize merge QUEUE, allocating space for a maximum of RESERVE nodes.
+ Though it's highly unlikely all nodes are in the heap at the same time,
+ RESERVE should accommodate all of them. Counting a NULL dummy head for the
+ heap, RESERVE should be 2 * NTHREADS. */
+
+static inline void
+queue_init (struct merge_node_queue *const restrict queue, size_t reserve)
+{
+ queue->priority_queue = (struct heap *) heap_alloc (compare_nodes, reserve);
+ pthread_mutex_init (&queue->mutex, NULL);
+ pthread_cond_init (&queue->cond, NULL);
+}
+
+/* Insert NODE into priority QUEUE. Assume caller either holds lock on NODE
+ or does not need to lock NODE. */
+
+static inline void
+queue_insert (struct merge_node_queue *const restrict queue,
+ struct merge_node *const restrict node)
+{
+ pthread_mutex_lock (&queue->mutex);
+ heap_insert (queue->priority_queue, node);
+ node->queued = true;
+ pthread_mutex_unlock (&queue->mutex);
+ pthread_cond_signal (&queue->cond);
+}
+
+/* Pop NODE off priority QUEUE. Guarantee a non-null, spinlocked NODE. */
+
+static inline struct merge_node *
+queue_pop (struct merge_node_queue *const restrict queue)
+{
+ struct merge_node *node = NULL;
+
+ while (!node)
+ {
+ pthread_mutex_lock (&queue->mutex);
+ if (queue->priority_queue->count)
+ node = (struct merge_node *) heap_remove_top (queue->priority_queue);
+ else
+ {
+ /* Go into conditional wait if no NODE is immediately available. */
+ pthread_cond_wait (&queue->cond, &queue->mutex);
+ }
+ pthread_mutex_unlock (&queue->mutex);
}
+ lock_node (node);
+ node->queued = false;
+ return node;
}
-/* Like sortlines (LINES, NLINES, TEMP), except output into TEMP
- rather than sorting in place. */
+/* If UNQIUE is set, checks to make sure line isn't a duplicate before
+ outputting. If UNIQUE is not set, output the passed in line. Note that
+ this function does not actually save the line, nor any key information,
+ thus is only appropriate for internal sort. */
-static void
-sortlines_temp (struct line *lines, size_t nlines, struct line *temp)
+static inline void
+write_unique (struct line *const restrict line, FILE *tfp,
+ const char *temp_output)
{
- if (nlines == 2)
+ static struct line *saved = NULL;
+
+ if (!unique)
+ write_bytes (line, tfp, temp_output);
+ else if (!saved || compare (line, saved, false))
{
- /* Declare `swap' as int, not bool, to work around a bug
- <http://lists.gnu.org/archive/html/bug-coreutils/2005-10/msg00086.html>
- in the IBM xlc 6.0.0.0 compiler in 64-bit mode. */
- int swap = (0 < compare (&lines[-1], &lines[-2], false));
- temp[-1] = lines[-1 - swap];
- temp[-2] = lines[-2 + swap];
+ saved = line;
+ write_bytes (line, tfp, temp_output);
+ }
+}
+
+/* Merge the lines currently available to a NODE in the binary
+ merge tree, up to a maximum specified by MAX_MERGE. */
+
+static inline size_t
+mergelines_node (struct merge_node *const restrict node, size_t total_lines,
+ FILE *tfp, const char *temp_output)
+{
+ struct line *lo_orig = node->lo;
+ struct line *hi_orig = node->hi;
+ size_t to_merge = MAX_MERGE (total_lines, node->level);
+ size_t merged_lo;
+ size_t merged_hi;
+
+ if (node->level > MERGE_ROOT)
+ {
+ /* Merge to destination buffer. */
+ struct line *dest = *node->dest;
+ while (node->lo != node->end_lo && node->hi != node->end_hi && to_merge--)
+ if (compare (node->lo - 1, node->hi - 1, false) <= 0)
+ *--dest = *--node->lo;
+ else
+ *--dest = *--node->hi;
+
+ merged_lo = lo_orig - node->lo;
+ merged_hi = hi_orig - node->hi;
+
+ if (node->nhi == merged_hi)
+ while (node->lo != node->end_lo && to_merge--)
+ *--dest = *--node->lo;
+ else if (node->nlo == merged_lo)
+ while (node->hi != node->end_hi && to_merge--)
+ *--dest = *--node->hi;
}
else
{
- size_t nlo = nlines / 2;
- size_t nhi = nlines - nlo;
- struct line *lo = lines;
- struct line *hi = lines - nlo;
- struct line *sorted_hi = temp - nlo;
+ /* Merge directly to output. */
+ while (node->lo != node->end_lo && node->hi != node->end_hi && to_merge--)
+ {
+ if (compare (node->lo - 1, node->hi - 1, false) <= 0)
+ write_unique (--node->lo, tfp, temp_output);
+ else
+ write_unique (--node->hi, tfp, temp_output);
+ }
+
+ merged_lo = lo_orig - node->lo;
+ merged_hi = hi_orig - node->hi;
+
+ if (node->nhi == merged_hi)
+ {
+ while (node->lo != node->end_lo && to_merge--)
+ write_unique (--node->lo, tfp, temp_output);
+ }
+ else if (node->nlo == merged_lo)
+ {
+ while (node->hi != node->end_hi && to_merge--)
+ write_unique (--node->hi, tfp, temp_output);
+ }
+ node->dest -= lo_orig - node->lo + hi_orig - node->hi;
+ }
+
+ /* Update NODE. */
+ merged_lo = lo_orig - node->lo;
+ merged_hi = hi_orig - node->hi;
+ node->nlo -= merged_lo;
+ node->nhi -= merged_hi;
+ return merged_lo + merged_hi;
+}
+
+/* Insert NODE into QUEUE if it passes insertion checks. */
+
+static inline void
+check_insert (struct merge_node *node,
+ struct merge_node_queue *const restrict queue)
+{
+ size_t lo_avail = node->lo - node->end_lo;
+ size_t hi_avail = node->hi - node->end_hi;
+
+ /* Conditions for insertion:
+ 1. NODE is not already in heap.
+ 2. NODE has available lines from both it's children, OR one child has
+ available lines, but the other has exhausted all its lines. */
+ if ((!node->queued)
+ && ((lo_avail && (hi_avail || !(node->nhi)))
+ || (hi_avail && !(node->nlo))))
+ {
+ queue_insert (queue, node);
+ }
+}
+
+/* Update parent merge tree NODE. */
+
+static inline void
+update_parent (struct merge_node *const restrict node, size_t merged,
+ struct merge_node_queue *const restrict queue)
+{
+ if (node->level > MERGE_ROOT)
+ {
+ lock_node (node->parent);
+ *node->dest -= merged;
+ check_insert (node->parent, queue);
+ unlock_node (node->parent);
+ }
+ else if (node->nlo + node->nhi == 0)
+ {
+ /* If the MERGE_ROOT NODE has finished merging, insert the
+ MERGE_END node. */
+ queue_insert (queue, node->parent);
+ }
+}
+
+/* Repeatedly pop QUEUE for a NODE with lines to merge, and merge at least
+ some of those lines, until the MERGE_END node is popped. */
+
+static void
+merge_loop (struct merge_node_queue *const restrict queue,
+ const size_t total_lines, FILE *tfp, const char *temp_output)
+{
+ while (1)
+ {
+ struct merge_node *node = queue_pop (queue);
+
+ if (node->level == MERGE_END)
+ {
+ unlock_node (node);
+ /* Reinsert so other threads can pop it. */
+ queue_insert (queue, node);
+ break;
+ }
+ size_t merged_lines = mergelines_node (node, total_lines, tfp,
+ temp_output);
+ check_insert (node, queue);
+ update_parent (node, merged_lines, queue);
+
+ unlock_node (node);
+ }
+}
+
+
+static void sortlines (struct line *restrict, struct line *restrict,
+ unsigned long int, const size_t,
+ struct merge_node *const restrict, bool,
+ struct merge_node_queue *const restrict,
+ FILE *, const char *);
+
+/* Thread arguments for sortlines_thread. */
+
+struct thread_args
+{
+ struct line *lines;
+ struct line *dest;
+ unsigned long int nthreads;
+ const size_t total_lines;
+ struct merge_node *const restrict parent;
+ bool lo_child;
+ struct merge_node_queue *const restrict merge_queue;
+ FILE *tfp;
+ const char *output_temp;
+};
+
+/* Like sortlines, except with a signature acceptable to pthread_create. */
- sortlines_temp (hi, nhi, sorted_hi);
+static void *
+sortlines_thread (void *data)
+{
+ struct thread_args const *args = data;
+ sortlines (args->lines, args->dest, args->nthreads, args->total_lines,
+ args->parent, args->lo_child, args->merge_queue,
+ args->tfp, args->output_temp);
+ return NULL;
+}
+
+/* There are three phases to the algorithm: node creation, sequential sort,
+ and binary merge.
+
+ During node creation, sortlines recursively visits each node in the
+ binary merge tree and creates a NODE structure corresponding to all the
+ future line merging NODE is responsible for. For each call to
+ sortlines, half the available threads are assigned to each recursive
+ call, until a leaf node having only 1 available thread is reached.
+
+ Each leaf node then performs two sequential sorts, one on each half of
+ the lines it is responsible for. It records in its NODE structure that
+ there are two sorted sublists available to merge from, and inserts its
+ NODE into the priority queue.
+
+ The binary merge phase then begins. Each thread drops into a loop
+ where the thread retrieves a NODE from the priority queue, merges lines
+ available to that NODE, and potentially insert NODE or its parent back
+ into the queue if there are sufficient available lines for them to
+ merge. This continues until all lines at all nodes of the merge tree
+ have been merged. */
+
+static void
+sortlines (struct line *restrict lines, struct line *restrict dest,
+ unsigned long int nthreads, const size_t total_lines,
+ struct merge_node *const restrict parent, bool lo_child,
+ struct merge_node_queue *const restrict merge_queue,
+ FILE *tfp, const char *temp_output)
+{
+ /* Create merge tree NODE. */
+ size_t nlines = (lo_child)? parent->nlo : parent->nhi;
+ size_t nlo = nlines / 2;
+ size_t nhi = nlines - nlo;
+ struct line *lo = dest - total_lines;
+ struct line *hi = lo - nlo;
+ struct line **parent_end = (lo_child)? &parent->end_lo : &parent->end_hi;
+ pthread_spinlock_t lock;
+ pthread_spin_init (&lock, PTHREAD_PROCESS_PRIVATE);
+ struct merge_node node = {lo, hi, lo, hi, parent_end, nlo, nhi,
+ parent->level + 1, parent, false, &lock};
+
+ /* Calculate thread arguments. */
+ unsigned long int lo_threads = nthreads / 2;
+ unsigned long int hi_threads = nthreads - lo_threads;
+ pthread_t thread;
+ struct thread_args args = {lines, lo, lo_threads, total_lines, &node,
+ true, merge_queue, tfp, temp_output};
+
+ if (nthreads > 1 && SUBTHREAD_LINES_HEURISTIC <= nlines
+ && pthread_create (&thread, NULL, sortlines_thread, &args) == 0)
+ {
+ sortlines (lines - nlo, hi, hi_threads, total_lines, &node, false,
+ merge_queue, tfp, temp_output);
+ pthread_join (thread, NULL);
+ }
+ else
+ {
+ /* Nthreads = 1, this is a leaf NODE, or pthread_create failed.
+ Sort with 1 thread. */
+ struct line *temp = lines - total_lines;
+ if (1 < nhi)
+ sequential_sort (lines - nlo, nhi, temp - nlo / 2, false);
if (1 < nlo)
- sortlines (lo, nlo, temp);
+ sequential_sort (lines, nlo, temp, false);
- mergelines (temp, lo, nlo, sorted_hi, nhi);
+ /* Update merge NODE. No need to lock yet. */
+ node.lo = lines;
+ node.hi = lines - nlo;
+ node.end_lo = lines - nlo;
+ node.end_hi = lines - nlo - nhi;
+
+ queue_insert (merge_queue, &node);
+ merge_loop (merge_queue, total_lines, tfp, temp_output);
}
}
@@ -3262,7 +3681,8 @@ merge (struct sortfile *files, size_t ntemps, size_t nfiles,
/* Sort NFILES FILES onto OUTPUT_FILE. */
static void
-sort (char * const *files, size_t nfiles, char const *output_file)
+sort (char * const *files, size_t nfiles, char const *output_file,
+ unsigned long int nthreads)
{
struct buffer buf;
size_t ntemps = 0;
@@ -3276,8 +3696,22 @@ sort (char * const *files, size_t nfiles, char const *output_file)
char const *file = *files;
FILE *fp = xfopen (file, "r");
FILE *tfp;
- size_t bytes_per_line = (2 * sizeof (struct line)
- - sizeof (struct line) / 2);
+
+ size_t bytes_per_line;
+ if (nthreads > 1)
+ {
+ /* Get log P. */
+ unsigned long int tmp = 1;
+ size_t mult = 1;
+ while (tmp < nthreads)
+ {
+ tmp *= 2;
+ mult++;
+ }
+ bytes_per_line = (mult * sizeof (struct line));
+ }
+ else
+ bytes_per_line = sizeof (struct line) * 3 / 2;
if (! buf.alloc)
initbuf (&buf, bytes_per_line,
@@ -3289,7 +3723,6 @@ sort (char * const *files, size_t nfiles, char const *output_file)
while (fillbuf (&buf, fp, file))
{
struct line *line;
- struct line *linebase;
if (buf.eof && nfiles
&& (bytes_per_line + 1
@@ -3303,9 +3736,6 @@ sort (char * const *files, size_t nfiles, char const *output_file)
}
line = buffer_linelim (&buf);
- linebase = line - buf.nlines;
- if (1 < buf.nlines)
- sortlines (line, buf.nlines, linebase);
if (buf.eof && !nfiles && !ntemps && !buf.left)
{
xfclose (fp, file);
@@ -3318,16 +3748,23 @@ sort (char * const *files, size_t nfiles, char const *output_file)
++ntemps;
temp_output = create_temp (&tfp, NULL);
}
-
- do
+ if (1 < buf.nlines)
{
- line--;
- write_bytes (line, tfp, temp_output);
- if (unique)
- while (linebase < line && compare (line, line - 1, false) == 0)
- line--;
+ struct merge_node_queue merge_queue;
+ queue_init (&merge_queue, 2 * nthreads);
+
+ pthread_spinlock_t lock;
+ pthread_spin_init (&lock, PTHREAD_PROCESS_PRIVATE);
+ struct merge_node node =
+ {NULL, NULL, NULL, NULL, NULL, buf.nlines,
+ buf.nlines, MERGE_END, NULL, false, &lock};
+
+ sortlines (line, line, nthreads, buf.nlines, &node, true,
+ &merge_queue, tfp, temp_output);
+ queue_destroy (&merge_queue);
}
- while (linebase < line);
+ else
+ write_unique (line - 1, tfp, temp_output);
xfclose (tfp, temp_output);
@@ -3547,6 +3984,7 @@ main (int argc, char **argv)
bool mergeonly = false;
char *random_source = NULL;
bool need_random = false;
+ unsigned long int nthreads = 0;
size_t nfiles = 0;
bool posixly_correct = (getenv ("POSIXLY_CORRECT") != NULL);
bool obsolete_usage = (posix2_version () < 200112);
@@ -3882,6 +4320,10 @@ main (int argc, char **argv)
add_temp_dir (optarg);
break;
+ case PARALLEL_OPTION:
+ nthreads = specify_nthreads (oi, c, optarg);
+ break;
+
case 'u':
unique = true;
break;
@@ -4030,6 +4472,9 @@ main (int argc, char **argv)
if (need_random)
{
+ /* Threading does not lock the randread_source structure, so
+ downgrade to one thread to avoid race conditions. */
+ nthreads = 1;
randread_source = randread_new (random_source, MD5_DIGEST_SIZE);
if (! randread_source)
die (_("open failed"), random_source);
@@ -4084,7 +4529,15 @@ main (int argc, char **argv)
IF_LINT (free (sortfiles));
}
else
- sort (files, nfiles, outfile);
+ {
+ /* If NTHREADS > number of cores on the machine, spinlocking
+ could be wasteful. */
+ unsigned long int np2 = num_processors (NPROC_CURRENT_OVERRIDABLE);
+ if (!nthreads || nthreads > np2)
+ nthreads = np2;
+
+ sort (files, nfiles, outfile, nthreads);
+ }
if (have_read_stdin && fclose (stdin) == EOF)
die (_("close failed"), "-");