30 #include <common/config.h>
31 #include <common/timing.h>
32 #include <common/fxt.h>
33 #include <common/thread.h>
34 #include <common/utils.h>
35 #include <core/jobs.h>
36 #include <core/perfmodel/perfmodel.h>
37 #include <core/sched_policy.h>
38 #include <core/topology.h>
39 #include <core/errorcheck.h>
40 #include <core/sched_ctx.h>
41 #include <core/sched_ctx_list.h>
42 #include <core/simgrid.h>
43 #ifdef STARPU_HAVE_HWLOC
47 #include <core/drivers.h>
48 #include <drivers/cuda/driver_cuda.h>
49 #include <drivers/opencl/driver_opencl.h>
52 #include <drivers/mic/driver_mic_source.h>
55 #ifdef STARPU_USE_MPI_MASTER_SLAVE
56 #include <drivers/mpi/driver_mpi_source.h>
59 #include <drivers/cpu/driver_cpu.h>
61 #include <datawizard/datawizard.h>
63 #include <starpu_parameters.h>
65 #define STARPU_MAX_PIPELINE 4
67 enum initialization { UNINITIALIZED = 0, CHANGING, INITIALIZED };
69 struct _starpu_ctx_change_list;
74 starpu_pthread_mutex_t mutex;
75 enum starpu_worker_archtype arch;
77 struct starpu_perfmodel_arch perf_arch;
97 #ifdef STARPU_SPINLOCK_CHECK
98 const char *relax_on_file;
100 const char *relax_on_func;
101 const char *relax_off_file;
103 const char *relax_off_func;
130 struct _starpu_ctx_change_list ctx_change_list;
131 struct starpu_task_list local_tasks;
137 struct starpu_task *current_tasks[STARPU_MAX_PIPELINE];
138 #ifdef STARPU_SIMGRID
139 starpu_pthread_wait_t wait;
142 struct timespec cl_start;
143 struct timespec cl_end;
149 unsigned worker_is_running;
150 unsigned worker_is_initialized;
151 enum _starpu_worker_status status;
156 struct _starpu_driver_ops *driver_ops;
158 struct _starpu_sched_ctx_list *sched_ctx_list;
161 struct _starpu_barrier_counter tasks_barrier;
165 unsigned removed_from_ctx[STARPU_NMAX_SCHED_CTXS+1];
177 unsigned shares_tasks_lists[STARPU_NMAX_SCHED_CTXS+1];
179 unsigned poped_in_ctx[STARPU_NMAX_SCHED_CTXS+1];
185 unsigned reverse_phase[2];
190 struct _starpu_sched_ctx *stream_ctx;
195 #ifdef STARPU_HAVE_HWLOC
196 hwloc_bitmap_t hwloc_cpu_set;
197 hwloc_obj_t hwloc_obj;
207 int combined_workerid[STARPU_NMAXWORKERS];
210 starpu_pthread_mutex_t count_mutex;
216 #ifdef STARPU_HAVE_HWLOC
217 hwloc_bitmap_t hwloc_cpu_set;
227 starpu_pthread_mutex_t mutex;
234 unsigned set_is_initialized;
237 #ifdef STARPU_USE_MPI_MASTER_SLAVE
249 unsigned nsched_ctxs;
251 #ifdef STARPU_HAVE_HWLOC
288 unsigned nworkerpercuda;
289 int cuda_th_per_stream;
297 unsigned nhwmpidevices;
300 unsigned nmpicores[STARPU_MAXMPIDEVS];
305 unsigned nmicdevices;
308 unsigned nmiccores[STARPU_MAXMICDEVS];
350 #ifdef STARPU_HAVE_HWLOC
357 char currently_bound[STARPU_NMAXWORKERS];
358 char currently_shared[STARPU_NMAXWORKERS];
417 struct _starpu_sched_ctx
sched_ctxs[STARPU_NMAX_SCHED_CTXS+1];
424 starpu_pthread_mutex_t submitted_mutex;
427 extern int _starpu_worker_parallel_blocks;
430 extern int _starpu_keys_initialized STARPU_ATTRIBUTE_INTERNAL;
431 extern starpu_pthread_key_t _starpu_worker_key STARPU_ATTRIBUTE_INTERNAL;
432 extern starpu_pthread_key_t _starpu_worker_set_key STARPU_ATTRIBUTE_INTERNAL;
435 void _starpu_set_argc_argv(
int *argc,
char ***argv);
436 int *_starpu_get_argc();
437 char ***_starpu_get_argv();
440 void _starpu_conf_check_environment(
struct starpu_conf *
conf);
443 void _starpu_may_pause(
void);
446 static inline unsigned _starpu_machine_is_running(
void)
452 ANNOTATE_HAPPENS_AFTER(&_starpu_config.running);
453 ret = _starpu_config.running;
454 ANNOTATE_HAPPENS_BEFORE(&_starpu_config.running);
463 uint32_t _starpu_worker_exists(
struct starpu_task *);
466 uint32_t _starpu_can_submit_cuda_task(
void);
469 uint32_t _starpu_can_submit_cpu_task(
void);
472 uint32_t _starpu_can_submit_opencl_task(
void);
476 unsigned _starpu_worker_can_block(
unsigned memnode,
struct _starpu_worker *worker);
481 void _starpu_block_worker(
int workerid, starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex);
484 void _starpu_driver_start(
struct _starpu_worker *worker,
unsigned fut_key,
unsigned sync);
486 void _starpu_worker_start(
struct _starpu_worker *worker,
unsigned fut_key,
unsigned sync);
488 static inline unsigned _starpu_worker_get_count(
void)
490 return _starpu_config.topology.nworkers;
492 #define starpu_worker_get_count _starpu_worker_get_count
497 static inline void _starpu_set_local_worker_key(
struct _starpu_worker *worker)
499 STARPU_ASSERT(_starpu_keys_initialized);
500 STARPU_PTHREAD_SETSPECIFIC(_starpu_worker_key, worker);
505 static inline struct _starpu_worker *_starpu_get_local_worker_key(
void)
507 if (!_starpu_keys_initialized)
509 return (
struct _starpu_worker *) STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_key);
515 static inline void _starpu_set_local_worker_set_key(
struct _starpu_worker_set *worker)
517 STARPU_ASSERT(_starpu_keys_initialized);
518 STARPU_PTHREAD_SETSPECIFIC(_starpu_worker_set_key, worker);
525 if (!_starpu_keys_initialized)
527 return (
struct _starpu_worker_set *) STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_set_key);
532 static inline struct _starpu_worker *_starpu_get_worker_struct(
unsigned id)
534 STARPU_ASSERT(
id < starpu_worker_get_count());
535 return &_starpu_config.workers[id];
540 static inline struct _starpu_sched_ctx *_starpu_get_sched_ctx_struct(
unsigned id)
542 return (
id > STARPU_NMAX_SCHED_CTXS) ? NULL : &_starpu_config.sched_ctxs[id];
551 return &_starpu_config;
555 static inline int _starpu_get_disable_kernels(
void)
557 return _starpu_config.disable_kernels;
561 static inline enum _starpu_worker_status _starpu_worker_get_status(
int workerid)
563 return _starpu_config.workers[workerid].status;
568 static inline void _starpu_worker_set_status(
int workerid,
enum _starpu_worker_status status)
570 _starpu_config.workers[workerid].status = status;
574 static inline struct _starpu_sched_ctx* _starpu_get_initial_sched_ctx(
void)
576 return &_starpu_config.sched_ctxs[STARPU_GLOBAL_SCHED_CTX];
579 int starpu_worker_get_nids_by_type(
enum starpu_worker_archtype type,
int *workerids,
int maxsize);
583 int starpu_worker_get_nids_ctx_free_by_type(
enum starpu_worker_archtype type,
int *workerids,
int maxsize);
585 static inline unsigned _starpu_worker_mutex_is_sched_mutex(
int workerid, starpu_pthread_mutex_t *mutex)
591 static inline int _starpu_worker_get_nsched_ctxs(
int workerid)
593 return _starpu_config.workers[
workerid].nsched_ctxs;
597 static inline unsigned _starpu_get_nsched_ctxs(
void)
601 return _starpu_config.topology.nsched_ctxs;
605 static inline int _starpu_worker_get_id(
void)
609 worker = _starpu_get_local_worker_key();
621 #define starpu_worker_get_id _starpu_worker_get_id
625 static inline unsigned __starpu_worker_get_id_check(
const char *f,
int l)
629 int id = starpu_worker_get_id();
630 STARPU_ASSERT_MSG(
id>=0,
"%s:%d Cannot be called from outside a worker\n", f, l);
633 #define _starpu_worker_get_id_check(f,l) __starpu_worker_get_id_check(f,l)
635 enum starpu_node_kind _starpu_worker_get_node_kind(
enum starpu_worker_archtype type);
637 void _starpu_worker_set_stream_ctx(
unsigned workerid,
struct _starpu_sched_ctx *sched_ctx);
639 struct _starpu_sched_ctx* _starpu_worker_get_ctx_stream(
unsigned stream_workerid);
646 static inline void _starpu_worker_request_blocking_in_parallel(
struct _starpu_worker *
const worker)
648 _starpu_worker_parallel_blocks = 1;
669 STARPU_PTHREAD_COND_BROADCAST(&worker->
sched_cond);
670 #ifdef STARPU_SIMGRID
671 starpu_pthread_queue_broadcast(&_starpu_simgrid_task_queue[worker->
workerid]);
687 STARPU_PTHREAD_COND_BROADCAST(&worker->
sched_cond);
695 static inline void _starpu_worker_request_unblocking_in_parallel(
struct _starpu_worker *
const worker)
716 STARPU_PTHREAD_COND_BROADCAST(&worker->
sched_cond);
730 STARPU_PTHREAD_COND_BROADCAST(&worker->
sched_cond);
744 static inline void _starpu_worker_process_block_in_parallel_requests(
struct _starpu_worker *
const worker)
759 STARPU_PTHREAD_COND_BROADCAST(&worker->
sched_cond);
776 STARPU_PTHREAD_COND_BROADCAST(&worker->
sched_cond);
796 #ifdef STARPU_SPINLOCK_CHECK
797 static inline void __starpu_worker_enter_sched_op(
struct _starpu_worker *
const worker,
const char*file,
int line,
const char* func)
799 static inline void _starpu_worker_enter_sched_op(
struct _starpu_worker *
const worker)
806 _starpu_worker_process_block_in_parallel_requests(worker);
813 _starpu_worker_process_block_in_parallel_requests(worker);
835 #ifdef STARPU_SPINLOCK_CHECK
836 worker->relax_on_file = file;
837 worker->relax_on_line = line;
838 worker->relax_on_func = func;
841 #ifdef STARPU_SPINLOCK_CHECK
842 #define _starpu_worker_enter_sched_op(worker) __starpu_worker_enter_sched_op((worker), __FILE__, __LINE__, __starpu_func__)
849 void _starpu_worker_apply_deferred_ctx_changes(
void);
850 #ifdef STARPU_SPINLOCK_CHECK
851 static inline void __starpu_worker_leave_sched_op(
struct _starpu_worker *
const worker,
const char*file,
int line,
const char* func)
853 static inline void _starpu_worker_leave_sched_op(
struct _starpu_worker *
const worker)
858 #ifdef STARPU_SPINLOCK_CHECK
859 worker->relax_off_file = file;
860 worker->relax_off_line = line;
861 worker->relax_off_func = func;
864 STARPU_PTHREAD_COND_BROADCAST(&worker->
sched_cond);
865 _starpu_worker_apply_deferred_ctx_changes();
867 #ifdef STARPU_SPINLOCK_CHECK
868 #define _starpu_worker_leave_sched_op(worker) __starpu_worker_leave_sched_op((worker), __FILE__, __LINE__, __starpu_func__)
871 static inline int _starpu_worker_sched_op_pending(
void)
873 int workerid = starpu_worker_get_id();
877 STARPU_ASSERT(worker != NULL);
890 static inline void _starpu_worker_enter_changing_ctx_op(
struct _starpu_worker *
const worker)
913 STARPU_PTHREAD_COND_BROADCAST(&worker->
sched_cond);
914 #ifdef STARPU_SIMGRID
915 starpu_pthread_queue_broadcast(&_starpu_simgrid_task_queue[worker->
workerid]);
932 static inline void _starpu_worker_leave_changing_ctx_op(
struct _starpu_worker *
const worker)
936 STARPU_PTHREAD_COND_BROADCAST(&worker->
sched_cond);
941 #ifdef STARPU_SPINLOCK_CHECK
942 static inline void __starpu_worker_relax_on(
const char*file,
int line,
const char* func)
944 static inline void _starpu_worker_relax_on(
void)
952 STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->
sched_mutex);
953 #ifdef STARPU_SPINLOCK_CHECK
954 STARPU_ASSERT_MSG(worker->
state_relax_refcnt<UINT_MAX,
"relax last turn on in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
959 #ifdef STARPU_SPINLOCK_CHECK
960 worker->relax_on_file = file;
961 worker->relax_on_line = line;
962 worker->relax_on_func = func;
964 STARPU_PTHREAD_COND_BROADCAST(&worker->
sched_cond);
965 STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->
sched_mutex);
967 #ifdef STARPU_SPINLOCK_CHECK
968 #define _starpu_worker_relax_on() __starpu_worker_relax_on(__FILE__, __LINE__, __starpu_func__)
970 #define starpu_worker_relax_on _starpu_worker_relax_on
973 #ifdef STARPU_SPINLOCK_CHECK
974 static inline void __starpu_worker_relax_on_locked(
struct _starpu_worker *worker,
const char*file,
int line,
const char* func)
976 static inline void _starpu_worker_relax_on_locked(
struct _starpu_worker *worker)
981 #ifdef STARPU_SPINLOCK_CHECK
982 STARPU_ASSERT_MSG(worker->
state_relax_refcnt<UINT_MAX,
"relax last turn on in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
987 #ifdef STARPU_SPINLOCK_CHECK
988 worker->relax_on_file = file;
989 worker->relax_on_line = line;
990 worker->relax_on_func = func;
992 STARPU_PTHREAD_COND_BROADCAST(&worker->
sched_cond);
994 #ifdef STARPU_SPINLOCK_CHECK
995 #define _starpu_worker_relax_on_locked(worker) __starpu_worker_relax_on_locked(worker,__FILE__, __LINE__, __starpu_func__)
998 #ifdef STARPU_SPINLOCK_CHECK
999 static inline void __starpu_worker_relax_off(
const char*file,
int line,
const char* func)
1001 static inline void _starpu_worker_relax_off(
void)
1004 int workerid = starpu_worker_get_id();
1008 STARPU_ASSERT(worker != NULL);
1011 STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->
sched_mutex);
1012 #ifdef STARPU_SPINLOCK_CHECK
1013 STARPU_ASSERT_MSG(worker->
state_relax_refcnt>0,
"relax last turn off in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
1018 #ifdef STARPU_SPINLOCK_CHECK
1019 worker->relax_off_file = file;
1020 worker->relax_off_line = line;
1021 worker->relax_off_func = func;
1023 STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->
sched_mutex);
1025 #ifdef STARPU_SPINLOCK_CHECK
1026 #define _starpu_worker_relax_off() __starpu_worker_relax_off(__FILE__, __LINE__, __starpu_func__)
1028 #define starpu_worker_relax_off _starpu_worker_relax_off
1030 #ifdef STARPU_SPINLOCK_CHECK
1031 static inline void __starpu_worker_relax_off_locked(
const char*file,
int line,
const char* func)
1033 static inline void _starpu_worker_relax_off_locked(
void)
1036 int workerid = starpu_worker_get_id();
1040 STARPU_ASSERT(worker != NULL);
1043 #ifdef STARPU_SPINLOCK_CHECK
1044 STARPU_ASSERT_MSG(worker->
state_relax_refcnt>0,
"relax last turn off in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
1049 #ifdef STARPU_SPINLOCK_CHECK
1050 worker->relax_off_file = file;
1051 worker->relax_off_line = line;
1052 worker->relax_off_func = func;
1055 #ifdef STARPU_SPINLOCK_CHECK
1056 #define _starpu_worker_relax_off_locked() __starpu_worker_relax_off_locked(__FILE__, __LINE__, __starpu_func__)
1059 static inline int _starpu_worker_get_relax_state(
void)
1061 int workerid = starpu_worker_get_id();
1065 STARPU_ASSERT(worker != NULL);
1068 #define starpu_worker_get_relax_state _starpu_worker_get_relax_state
1074 static inline void _starpu_worker_lock(
int workerid)
1077 STARPU_ASSERT(worker != NULL);
1078 int cur_workerid = starpu_worker_get_id();
1081 starpu_worker_relax_on();
1083 STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->
sched_mutex);
1091 STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->
sched_mutex);
1095 static inline int _starpu_worker_trylock(
int workerid)
1097 struct _starpu_worker *cur_worker = _starpu_get_local_worker_key();
1098 int cur_workerid = cur_worker->
workerid;
1100 STARPU_ASSERT(worker != NULL);
1103 int ret = STARPU_PTHREAD_MUTEX_TRYLOCK_SCHED(&cur_worker->
sched_mutex);
1111 ret = STARPU_PTHREAD_MUTEX_TRYLOCK_SCHED(&worker->
sched_mutex);
1117 STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->
sched_mutex);
1120 _starpu_worker_relax_on_locked(cur_worker);
1121 STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&cur_worker->
sched_mutex);
1125 static inline void _starpu_worker_unlock(
int workerid)
1128 STARPU_ASSERT(worker != NULL);
1129 STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->
sched_mutex);
1130 int cur_workerid = starpu_worker_get_id();
1133 starpu_worker_relax_off();
1137 static inline void _starpu_worker_lock_self(
void)
1139 int workerid = starpu_worker_get_id_check();
1141 STARPU_ASSERT(worker != NULL);
1142 STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->
sched_mutex);
1145 static inline void _starpu_worker_unlock_self(
void)
1147 int workerid = starpu_worker_get_id_check();
1149 STARPU_ASSERT(worker != NULL);
1150 STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->
sched_mutex);
1153 static inline int _starpu_wake_worker_relax(
int workerid)
1156 int ret = starpu_wake_worker_locked(
workerid);
1161 int starpu_wake_worker_relax_light(
int workerid);
1167 void _starpu_worker_refuse_task(
struct _starpu_worker *worker,
struct starpu_task *task);
1171 #endif // __WORKERS_H__