aboutsummaryrefslogtreecommitdiff
path: root/libgomp/stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'libgomp/stream.c')
-rw-r--r--libgomp/stream.c460
1 files changed, 383 insertions, 77 deletions
diff --git a/libgomp/stream.c b/libgomp/stream.c
index 50b684dcade..5d7836517a7 100644
--- a/libgomp/stream.c
+++ b/libgomp/stream.c
@@ -36,6 +36,30 @@
#include "mutex.h"
#include "libgomp.h"
+#define AGGREGATION_FACTOR 32
+
+//#define debug_log_init(S, V1, V2) printf (S, V1, V2); fflush (stdout)
+#define debug_log_init(S, V1, V2)
+
+//#define debug_log_init3(S, V1, V2, V3) printf (S, V1, V2, V3); fflush (stdout)
+#define debug_log_init3(S, V1, V2, V3)
+
+//#define debug_log(S, V1, V2) printf (S, V1, V2); fflush (stdout)
+#define debug_log(S, V1, V2)
+
+gomp_barrier_t gomp_stream_tasks_wait_until_connected_barrier;
+gomp_barrier_t gomp_stream_tasks_exit_barrier;
+unsigned gomp_stream_tasks_count;
+
+/* This structure is used to communicate across pthread_create. */
+
+struct gomp_stream_thread_start_data
+{
+ void (*fn) (void *);
+ void *fn_data;
+
+ int id;
+};
/* Data structures creation and pipeline initialization. */
@@ -46,39 +70,32 @@
BUFFER for the stream. */
void *
-GOMP_stream_create_stream (size_t element_size,
- unsigned long long stream_buffer_size,
- char *buffer)
+GOMP_stream_create_stream (size_t element_size, size_t buffer_size)
{
gomp_stream_p stream = (gomp_stream_p) gomp_malloc (sizeof (gomp_stream_t));
+ debug_log_init ("GOMP_stream_create_stream %zu %zu\n", element_size, buffer_size);
+
+ if (buffer_size < 2)
+ gomp_fatal ("GOMP_stream: insufficient stream buffer size.");
/* Initialize and allocate the data buffer. We force the
buffer_size to be a power of 2 for efficient modulo computation
of the indices in the circular buffer. */
- stream->element_size = element_size;
+
+ /* To avoid excessive multiplication operations, we convert the
+ accounting from elements to bytes. */
+ buffer_size *= element_size;
stream->buffer_size = 1;
- while(stream->buffer_size < stream_buffer_size)
+ while(stream->buffer_size < buffer_size)
stream->buffer_size <<= 1;
stream->buffer_mask = stream->buffer_size - 1;
-
- /* In case the user provided a pre-allocated buffer, we need to
- ensure it is properly sized. */
- if (buffer != NULL)
- {
- if (stream->buffer_size != stream_buffer_size)
- gomp_fatal ("GOMP_stream: provided buffer size is not power of 2.");
-
- stream->buffer = buffer;
- }
- else
- {
- stream->buffer =
- (void *) gomp_malloc (stream->element_size * stream->buffer_size);
- }
+ stream->buffer =
+ (void *) gomp_malloc (stream->buffer_size * 2);
stream->expected_ready_p = false;
stream->connected_p = false;
stream->eos_p = false;
+ stream->pre_shift = 0;
/* Initialize the view_handles. */
stream->read_views.current_min = stream->buffer_size;
@@ -89,7 +106,7 @@ GOMP_stream_create_stream (size_t element_size,
stream->read_views.nr_expected_views = 0;
stream->read_views.nr_registered_views = 0;
stream->read_views.nr_unregistered_views = 0;
- gomp_mutex_init (&stream->read_views.connect_view_mutex);
+ gomp_mutex_init (&stream->read_views.view_list.connect_view_mutex);
stream->write_views.current_min = 0;
stream->write_views.current_max = stream->buffer_size;
@@ -99,7 +116,7 @@ GOMP_stream_create_stream (size_t element_size,
stream->write_views.nr_expected_views = 0;
stream->write_views.nr_registered_views = 0;
stream->write_views.nr_unregistered_views = 0;
- gomp_mutex_init (&stream->write_views.connect_view_mutex);
+ gomp_mutex_init (&stream->write_views.view_list.connect_view_mutex);
#ifndef HAVE_SYNC_BUILTINS
gomp_mutex_init (&stream->stream_mutex);
@@ -111,10 +128,11 @@ GOMP_stream_create_stream (size_t element_size,
/* Allocate and initialize a generic GOMP_STREAM_VIEW that can be
connected to any stream to give either read or write access
depending on its TYPE. Returns a pointer to the newly allocated
- view. */
+ view. This view accesses VIEW_SIZE bytes in the stream and
+ commits/releases BURST_SIZE bytes per activation. */
static inline void *
-gomp_stream_create_view (int type)
+gomp_stream_create_view (int type, size_t view_size, size_t burst_size)
{
gomp_stream_view_p view =
(gomp_stream_view_p) gomp_malloc (sizeof(gomp_stream_view_t));
@@ -125,45 +143,86 @@ gomp_stream_create_view (int type)
view->end_p = false;
view->type = type;
view->local_min_value = 0;
+ view->view_size = view_size;
+ view->burst_size = burst_size;
+ view->pxxk_size = view_size - burst_size;
return view;
}
-/* Wrapper for creating a READ view. */
+/* Wrapper for creating a READ view . */
void *
-GOMP_stream_create_read_view (void)
+GOMP_stream_create_read_view (size_t view_size, size_t burst_size)
{
- return gomp_stream_create_view (READ_VIEW);
+ debug_log_init ("GOMP_stream_create_read_view %zu %zu\n", view_size, burst_size);
+ return gomp_stream_create_view (READ_VIEW, view_size, burst_size);
}
/* Wrapper for creating a WRITE view. */
void *
-GOMP_stream_create_write_view (void)
+GOMP_stream_create_write_view (size_t view_size, size_t burst_size)
{
- return gomp_stream_create_view (WRITE_VIEW);
+ debug_log_init ("GOMP_stream_create_write_view %zu %zu\n", view_size, burst_size);
+ return gomp_stream_create_view (WRITE_VIEW, view_size, burst_size);
}
/* Allocate and initialize a GOMP_STREAM_TASK data structure. */
void *
-GOMP_stream_create_task (void)
+GOMP_stream_create_task ()
{
- gomp_stream_task_p task =
+ gomp_stream_task_p task =
(gomp_stream_task_p) gomp_malloc (sizeof(gomp_stream_task_t));
+ debug_log_init3 ("GOMP_stream_create_task %d %d \t %15zu\n", 0, 0, (size_t) task);
task->read_view_list.views = NULL;
task->read_view_list.nr_views = 0;
task->read_view_list.size = 0;
+ gomp_mutex_init (&task->read_view_list.connect_view_mutex);
task->write_view_list.views = NULL;
task->write_view_list.nr_views = 0;
task->write_view_list.size = 0;
+ gomp_mutex_init (&task->write_view_list.connect_view_mutex);
+
+ task->activation_counter = 0;
+ task->termination_flag = false;
+
+ task->first_unassigned_activation_counter = 0;
+ task->num_instances = 0;
+
+ __sync_synchronize ();
return task;
}
+volatile void *
+GOMP_stream_get_task_activation_counter (void *t)
+{
+ gomp_stream_task_p task = (gomp_stream_task_p) t;
+
+ return &(task->activation_counter);
+}
+
+void
+GOMP_stream_set_task_termination_flag (void *t)
+{
+ gomp_stream_task_p task = (gomp_stream_task_p) t;
+
+ task->termination_flag = true;
+}
+
+void
+GOMP_stream_task_add_instance (void *t)
+{
+ gomp_stream_task_p task = (gomp_stream_task_p) t;
+
+ __sync_fetch_and_add (&task->num_instances, 1);
+ __sync_synchronize ();
+}
+
/* Declare additional READ_VIEWS and WRITE_VIEWS expected views on
stream S. When possible, the thread that creates the streaming
tasks should declare, for each stream, the number of read/write
@@ -177,6 +236,7 @@ GOMP_stream_add_expected_views (void *s, int read_views, int write_views,
int final)
{
gomp_stream_p stream = (gomp_stream_p) s;
+ debug_log_init ("GOMP_stream_add_expected_views %d %d\n", read_views, write_views);
if (stream->expected_ready_p)
gomp_fatal
@@ -249,14 +309,16 @@ GOMP_stream_connect_view (void *t, void *s, void *v)
/* Register the view with the TASK to which it belongs. This
operation is local to the task, so there is no need to
synchronize. */
+ gomp_mutex_lock (&task_list->connect_view_mutex);
gomp_stream_add_view_to_list (view, task_list);
+ gomp_mutex_unlock (&task_list->connect_view_mutex);
/* Connect the view to the stream. This must be done atomically as
this data structure is shared with the other producer/consumer
tasks. */
- gomp_mutex_lock (&vh->connect_view_mutex);
+ gomp_mutex_lock (&vh->view_list.connect_view_mutex);
gomp_stream_add_view_to_list (view, stream_list);
- gomp_mutex_unlock (&vh->connect_view_mutex);
+ gomp_mutex_unlock (&vh->view_list.connect_view_mutex);
__sync_fetch_and_add (&vh->nr_registered_views, 1);
}
@@ -290,24 +352,27 @@ void
GOMP_stream_wait_until_connected (void *t)
{
gomp_stream_task_p task = (gomp_stream_task_p) t;
- int num_read_views = task->read_view_list.nr_views;
- int num_write_views = task->write_view_list.nr_views;
- int i;
bool done;
+ int i;
do
{
done = true;
- for (i = 0; i < num_read_views; ++i)
+ gomp_mutex_lock (&task->read_view_list.connect_view_mutex);
+ for (i = 0; i < task->read_view_list.nr_views; ++i)
if (!gomp_stream_check_connected (task->read_view_list.views[i]->stream))
done = false;
+ gomp_mutex_unlock (&task->read_view_list.connect_view_mutex);
- for (i = 0; i < num_write_views; ++i)
+ gomp_mutex_lock (&task->write_view_list.connect_view_mutex);
+ for (i = 0; i < task->write_view_list.nr_views; ++i)
if (!gomp_stream_check_connected (task->write_view_list.views[i]->stream))
done = false;
+ gomp_mutex_unlock (&task->write_view_list.connect_view_mutex);
}
while (!done);
+ debug_log_init ("GOMP_stream_wait_until_connected %zu %zu\n", (size_t) task, (size_t) task);
}
/* Stream communication/synchronization. */
@@ -409,76 +474,151 @@ gomp_stream_wait_release (gomp_stream_view_p view,
}
}
-
/* Request read access for the view V to the stream up to INDEX. In
case the producers have finished and there is not enough data, the
returned value is the highest index to which the view is allowed to
access the stream. */
-unsigned long long
-GOMP_stream_update (void *v, const unsigned long long index)
+void *
+GOMP_stream_update (void *v, const unsigned long long act_start,
+ const unsigned long long act_end)
{
+ unsigned long long low_idx, up_idx;
+ size_t low_idx_loc, up_idx_loc;
gomp_stream_view_p view = (gomp_stream_view_p) v;
- view->upper_index = index;
+ gomp_stream_p stream = view->stream;
+ void *buffer_pointer;
+
+ debug_log ("GOMP_stream_update [in] %llu %llu\n", act_start, act_end);
+
+ /* This update requests access to the buffer in [low_idx,up_idx[.
+ We will release up to low_idx-1 and acquire up to up_idx-1. */
+ low_idx = act_start * view->burst_size;
+ up_idx = act_end * view->burst_size + view->pxxk_size - 1;
+
+ if (up_idx - low_idx > stream->buffer_size)
+ gomp_fatal ("GOMP_stream: update requested access to more than buffer_size data.");
+
+
+ view->lower_index = low_idx + view->stream->buffer_size;
+ view->upper_index = up_idx;
/* In case another consumer has received permission to read up to a
yet higher index, then there is no need to check for this one. */
- if (index > view->stream->read_views.current_max)
+ if (up_idx > view->stream->read_views.current_max)
{
- gomp_stream_wait_release (view, &view->stream->write_views, index);
+ gomp_stream_wait_release (view, &view->stream->write_views, up_idx);
+ view->stream->read_views.current_max = up_idx;
+ }
- /* If the producers have finished producing for this stream, we
- need to ensure we do not give read permission to the view
- past the highest fully committed index (committed by all
- producers). */
- if (view->stream->eos_p)
- {
- view->stream->write_views.current_min =
- gomp_stream_compute_lower_min (&view->stream->write_views.view_list);
+ low_idx_loc = low_idx & stream->buffer_mask;
+ up_idx_loc = up_idx & stream->buffer_mask;
- if (index > view->stream->write_views.current_min)
- {
- return view->stream->write_views.current_min;
- }
- }
- view->stream->read_views.current_max = index;
+ /* Once we know enough data is available for reading, we need to
+ check whether the data between the lower and upper buonds is
+ contiguous or if the buffer wrap-around occurs in the middle. */
+ if (low_idx_loc > up_idx_loc)
+ {
+ /* FIXME: does this require synchronization or is concurrent
+ overwriting acceptable as long as enough data has been copied
+ at the end? */
+ memcpy (stream->buffer + stream->buffer_size, stream->buffer,
+ up_idx_loc + 1);
+ //printf ("Update copy: (%llu,%llu) %llu - %llu | %zu - %zu (size: %zu)\n ", act_start, act_end, low_idx, up_idx, low_idx_loc, up_idx_loc, up_idx_loc + 1);
}
- return index;
+
+ /* We return a pointer to a contiguous array where this view is
+ guaranteed access to all the requested data. */
+ buffer_pointer = stream->buffer + low_idx_loc;
+
+ debug_log ("GOMP_stream_update [out] %llu %llu\n", act_start, act_end);
+
+ return buffer_pointer;
}
/* Request write access for the view V to the stream up to INDEX. */
-void
-GOMP_stream_stall (void *v, const unsigned long long index)
+void *
+GOMP_stream_stall (void *v, const unsigned long long act_start,
+ const unsigned long long act_end)
{
+ unsigned long long low_idx, up_idx;
gomp_stream_view_p view = (gomp_stream_view_p) v;
- view->upper_index = index;
+ gomp_stream_p stream = view->stream;
+ void *buffer_pointer;
+
+ debug_log ("GOMP_stream_stall [in] %llu %llu\n", act_start, act_end);
+
+ /* This update requests access to the buffer in [low_idx,up_idx[.
+ We will release up to low_idx-1 and acquire up to up_idx-1. */
+ low_idx = act_start * view->burst_size + stream->pre_shift;
+ up_idx = act_end * view->burst_size + view->pxxk_size + stream->pre_shift - 1;
- if (index > view->stream->write_views.current_max)
+ if (up_idx - low_idx > stream->buffer_size)
{
- gomp_stream_wait_release (view, &view->stream->read_views, index);
- view->stream->write_views.current_max = index;
+ fprintf (stderr, "Requesting data from low: %llu to up: %llu act [%llu,%llu] for burst:%zu size: %zu\n", low_idx, up_idx, act_start, act_end, view->burst_size, view->view_size);
+ gomp_fatal ("GOMP_stream: stall requested access to more than buffer_size data.");
}
+ /* We do not need to worry about wrap-around copying as this
+ "commit" only means that we do not want to write to those
+ indices below low_idx. */
+ view->lower_index = low_idx;
+ view->upper_index = up_idx;
+
+ if (up_idx > stream->write_views.current_max)
+ {
+ gomp_stream_wait_release (view, &stream->read_views, up_idx);
+ stream->write_views.current_max = up_idx;
+ }
+
+ buffer_pointer = stream->buffer + (low_idx & stream->buffer_mask);
+
+ debug_log ("GOMP_stream_stall [out] %llu %llu\n", act_start, act_end);
+
+ return buffer_pointer;
}
/* Relinquish read access for the view V to the stream up to
INDEX. */
void
-GOMP_stream_release (void *v, const unsigned long long index)
+GOMP_stream_release (void *v, const unsigned long long act_idx)
{
gomp_stream_view_p view = (gomp_stream_view_p) v;
- view->lower_index = index + view->stream->buffer_size;
+ view->lower_index = act_idx * view->burst_size + view->stream->buffer_size - 1;
+ debug_log ("GOMP_stream_release %llu %llu\n", act_idx, act_idx);
}
/* Relinquish write access for the view V to the stream up to
INDEX. */
void
-GOMP_stream_commit (void *v, const unsigned long long index)
+GOMP_stream_commit (void *v, const unsigned long long act_idx)
{
gomp_stream_view_p view = (gomp_stream_view_p) v;
- view->lower_index = index;
+ gomp_stream_p stream = view->stream;
+ unsigned long long up_idx = act_idx * view->burst_size + stream->pre_shift - 1;
+ size_t low_idx_loc, up_idx_loc;
+
+ low_idx_loc = view->lower_index & stream->buffer_mask;
+ up_idx_loc = up_idx & stream->buffer_mask;
+
+ /* Once we know enough data is available for reading, we need to
+ check whether the data between the lower and upper buonds is
+ contiguous or if the buffer wrap-around occurs in the middle. */
+ if (low_idx_loc > up_idx_loc)
+ {
+ /* FIXME: does this require synchronization or is concurrent
+ overwriting acceptable as long as enough data has been copied
+ at the end? */
+ memcpy (stream->buffer, stream->buffer + stream->buffer_size,
+ up_idx_loc + 1);
+
+ //printf ("Commit copy: (%llu) %llu - %llu | %zu - %zu (size: %zu)\n ", act_idx, view->lower_index, up_idx, low_idx_loc, up_idx_loc, up_idx_loc + 1);
+ }
+
+ view->lower_index = up_idx;
+ debug_log ("GOMP_stream_commit %llu %llu\n", act_idx, act_idx);
}
/* Finalization and destruction of the streaming data structures. */
@@ -543,20 +683,186 @@ GOMP_stream_task_exit (void *t)
gomp_stream_task_p task = (gomp_stream_task_p) t;
int num_read_views = task->read_view_list.nr_views;
int num_write_views = task->write_view_list.nr_views;
- int i;
+ int i, res;
+ debug_log_init ("GOMP_stream_task_exit %zu %zu\n", (size_t) t, (size_t) t);
+
+ res = __sync_sub_and_fetch (&task->num_instances, 1);
+
+ if (res == 0)
+ {
+ for (i = 0; i < num_read_views; ++i)
+ gomp_stream_unregister_view (task->read_view_list.views[i]);
+
+ for (i = 0; i < num_write_views; ++i)
+ gomp_stream_unregister_view (task->write_view_list.views[i]);
+
+ free (task->read_view_list.views);
+ free (task->write_view_list.views);
+ free (task);
+ }
+}
+
+/* Get info on the amount of work/data available in the stream
+ starting from INDEX and considering a constant burst of size BURST.
+ This function does not wait, except in case there is no work yet,
+ but the EOS flag has not yet been set. The function only returns 0
+ on termination. */
+unsigned long long
+GOMP_stream_get_available_work (void *t, unsigned long long *start_idx)
+{
+ gomp_stream_task_p task = (gomp_stream_task_p) t;
+ long result = 0;
+ unsigned long long start;
+
+ /* Atomically acquire a range, then wait until the range is either
+ fully available or termination occurs. */
+ start = __sync_fetch_and_add (&task->first_unassigned_activation_counter,
+ AGGREGATION_FACTOR);
+ result = task->activation_counter - start;
+ debug_log_init3 ("GOMP_stream_get_available_work [entry] %llu %llu \t %15zu\n", start, task->activation_counter, (size_t) task);
+
+ if (result >= AGGREGATION_FACTOR)
+ {
+ *start_idx = start;
+ return AGGREGATION_FACTOR;
+ }
+
+ while (result < AGGREGATION_FACTOR)
+ {
+ if (task->termination_flag)
+ {
+ __sync_synchronize ();
+ result = task->activation_counter - start;
+ debug_log ("GOMP_stream_get_available_work [final] %llu %ld\n", start, result);
+ *start_idx = start;
+ if (result > AGGREGATION_FACTOR)
+ return AGGREGATION_FACTOR;
+ return (result > 0) ? result : 0;
+ }
+ result = task->activation_counter - start;
+ }
+ debug_log ("GOMP_stream_get_available_work %llu %ld\n", start, result);
+
+ *start_idx = start;
+ return AGGREGATION_FACTOR;
+}
- for (i = 0; i < num_read_views; ++i)
- gomp_stream_unregister_view (task->read_view_list.views[i]);
- for (i = 0; i < num_write_views; ++i)
- gomp_stream_unregister_view (task->write_view_list.views[i]);
+/* Initialize streaming in this region. */
- free (task->read_view_list.views);
- free (task->write_view_list.views);
- free (task);
+void
+GOMP_stream_init ()
+{
+ /* Add self to ensure at least one member of the team barrier will
+ be waiting for the streaming tasks. */
+ gomp_stream_tasks_count = 1;
+ gomp_barrier_init (&gomp_stream_tasks_exit_barrier, gomp_stream_tasks_count);
+}
+
+/* Wait until all streaming threads complete. */
+
+void
+GOMP_stream_exit ()
+{
+ gomp_barrier_wait (&gomp_stream_tasks_exit_barrier);
+}
+
+/* Request SIZE bytes for a PRE operator on stream S. Return a
+ pointer where data should be stored. */
+void *
+GOMP_stream_pre (void *s, const unsigned long long size)
+{
+ gomp_stream_p stream = (gomp_stream_p) s;
+
+ debug_log_init ("GOMP_stream_pre %zu \t %llu\n", (size_t) s, size);
+
+ stream->pre_shift = size;
+ stream->write_views.current_min = size;
+
+ return stream->buffer;
+}
+
+
+/* This function is a pthread_create entry point for streaming
+ tasks. */
+
+static void *
+gomp_stream_thread_start (void *xdata)
+{
+ struct gomp_stream_thread_start_data *data = xdata;
+ void (*local_fn) (void *);
+ void *local_data;
+
+ local_fn = data->fn;
+ local_data = data->fn_data;
+
+ local_fn (local_data);
+
+ gomp_barrier_wait_last (&gomp_stream_tasks_exit_barrier);
+ debug_log_init ("** exiting task: %d (%u)\n", data->id, gomp_stream_tasks_count);
+
+ return NULL;
}
+/* Called for starting a streaming task. These tasks do not partake
+ in existing thread teams and are not subject to scheduling
+ points. */
+void
+GOMP_stream_task (void (*fn) (void *), void *data,
+ void (*cpyfn) (void *, void *),
+ long arg_size, long arg_align,
+ long num_instances, bool auto_replicable)
+{
+ pthread_attr_t thread_attr, *attr;
+ pthread_t pt;
+ int err, i, base_id;
+ char *arg, *buf;
+ base_id = __sync_fetch_and_add (&gomp_stream_tasks_count, num_instances);
+ gomp_barrier_reinit (&gomp_stream_tasks_exit_barrier,
+ gomp_stream_tasks_count);
+ debug_log_init ("** adding tasks: %ld (%u)\n", num_instances, gomp_stream_tasks_count);
+ attr = &gomp_thread_attr;
+ if (__builtin_expect (gomp_cpu_affinity != NULL, 0))
+ {
+ size_t stacksize;
+ pthread_attr_init (&thread_attr);
+ pthread_attr_setdetachstate (&thread_attr, PTHREAD_CREATE_DETACHED);
+ if (! pthread_attr_getstacksize (&gomp_thread_attr, &stacksize))
+ pthread_attr_setstacksize (&thread_attr, stacksize);
+ attr = &thread_attr;
+#if 0 /* This should be handled separately ... we will have to build a
+ stream mapping and prevent other OMP threads from touching
+ the cores running streaming tasks. */
+ gomp_init_thread_affinity (attr);
+#endif
+ }
+
+ for (i = 0; i < num_instances; ++i)
+ {
+ struct gomp_stream_thread_start_data *start_data;
+
+ buf = (char *) gomp_malloc (arg_size + arg_align - 1);
+ arg = (char *) (((uintptr_t) (buf) + arg_align - 1)
+ & ~(uintptr_t) (arg_align - 1));
+ if (cpyfn)
+ cpyfn (arg, data);
+ else
+ memcpy (arg, data, arg_size);
+
+ start_data = gomp_malloc (sizeof (struct gomp_stream_thread_start_data));
+ start_data->fn = fn;
+ start_data->fn_data = arg;
+ start_data->id = base_id + i;
+
+ err = pthread_create (&pt, attr, gomp_stream_thread_start, start_data);
+ if (err != 0)
+ gomp_fatal ("Thread creation failed: %s", strerror (err));
+ }
+
+ if (__builtin_expect (gomp_cpu_affinity != NULL, 0))
+ pthread_attr_destroy (&thread_attr);
+}