1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2007-2017, University of Amsterdam 7 VU University Amsterdam 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36:- module(thread, 37 [ concurrent/3, % +Threads, :Goals, +Options 38 concurrent_maplist/2, % :Goal, +List 39 concurrent_maplist/3, % :Goal, ?List1, ?List2 40 concurrent_maplist/4, % :Goal, ?List1, ?List2, ?List3 41 first_solution/3 % -Var, :Goals, +Options 42 ]). 43:- use_module(library(debug)). 44:- use_module(library(error)). 45:- use_module(library(lists)). 46:- use_module(library(apply)). 47:- use_module(library(option)). 48 49%:- debug(concurrent). 50 51:- meta_predicate 52 concurrent( , , ), 53 concurrent_maplist( , ), 54 concurrent_maplist( , , ), 55 concurrent_maplist( , , , ), 56 first_solution( , , ). 57 58:- predicate_options(concurrent/3, 3, 59 [ pass_to(system:thread_create/3, 3) 60 ]). 61:- predicate_options(first_solution/3, 3, 62 [ on_fail(oneof([stop,continue])), 63 on_error(oneof([stop,continue])), 64 pass_to(system:thread_create/3, 3) 65 ]).
Execution succeeds if all goals have succeeded. If one goal fails or throws an exception, other workers are abandoned as soon as possible and the entire computation fails or re-throws the exception. Note that if multiple goals fail or raise an error it is not defined which error or failure is reported.
On successful completion, variable bindings are returned. Note however that threads have independent stacks and therefore the goal is copied to the worker thread and the result is copied back to the caller of concurrent/3.
Choosing the right number of threads is not always obvious. Here are some scenarios:
144concurrent(1, M:List, _) :- 145 !, 146 maplist(M:call, List). 147concurrent(N, M:List, Options) :- 148 must_be(positive_integer, N), 149 must_be(list(callable), List), 150 length(List, JobCount), 151 message_queue_create(Done), 152 message_queue_create(Queue), 153 WorkerCount is min(N, JobCount), 154 create_workers(WorkerCount, Queue, Done, Workers, Options), 155 submit_goals(List, 1, M, Queue, VarList), 156 forall(between(1, WorkerCount, _), 157 thread_send_message(Queue, done)), 158 VT =.. [vars|VarList], 159 concur_wait(JobCount, Done, VT, cleanup(Workers, Queue), 160 Result, [], Exitted), 161 subtract(Workers, Exitted, RemainingWorkers), 162 concur_cleanup(Result, RemainingWorkers, [Queue, Done]), 163 ( Result == true 164 -> true 165 ; Result = false 166 -> fail 167 ; Result = exception(Error) 168 -> throw(Error) 169 ).
goal(Id, Goal, Vars)
. Vars is unified with a list of
lists of free variables appearing in each goal.177submit_goals([], _, _, _, []). 178submit_goals([H|T], I, M, Queue, [Vars|VT]) :- 179 term_variables(H, Vars), 180 thread_send_message(Queue, goal(I, M:H, Vars)), 181 I2 is I + 1, 182 submit_goals(T, I2, M, Queue, VT).
193concur_wait(0, _, _, _, true, Exited, Exited) :- !. 194concur_wait(N, Done, VT, Cleanup, Status, Exitted0, Exitted) :- 195 debug(concurrent, 'Concurrent: waiting for workers ...', []), 196 catch(thread_get_message(Done, Exit), Error, 197 concur_abort(Error, Cleanup, Done, Exitted0)), 198 debug(concurrent, 'Waiting: received ~p', [Exit]), 199 ( Exit = done(Id, Vars) 200 -> debug(concurrent, 'Concurrent: Job ~p completed with ~p', [Id, Vars]), 201 arg(Id, VT, Vars), 202 N2 is N - 1, 203 concur_wait(N2, Done, VT, Cleanup, Status, Exitted0, Exitted) 204 ; Exit = finished(Thread) 205 -> thread_join(Thread, JoinStatus), 206 debug(concurrent, 'Concurrent: waiter ~p joined: ~p', 207 [Thread, JoinStatus]), 208 ( JoinStatus == true 209 -> concur_wait(N, Done, VT, Cleanup, Status, [Thread|Exitted0], Exitted) 210 ; Status = JoinStatus, 211 Exitted = [Thread|Exitted0] 212 ) 213 ). 214 215concur_abort(Error, cleanup(Workers, Queue), Done, Exitted) :- 216 debug(concurrent, 'Concurrent: got ~p', [Error]), 217 subtract(Workers, Exitted, RemainingWorkers), 218 concur_cleanup(Error, RemainingWorkers, [Queue, Done]), 219 throw(Error). 220 221create_workers(N, Queue, Done, [Id|Ids], Options) :- 222 N > 0, 223 !, 224 thread_create(worker(Queue, Done), Id, 225 [ at_exit(thread_send_message(Done, finished(Id))) 226 | Options 227 ]), 228 N2 is N - 1, 229 create_workers(N2, Queue, Done, Ids, Options). 230create_workers(_, _, _, [], _).
237worker(Queue, Done) :-
238 thread_get_message(Queue, Message),
239 debug(concurrent, 'Worker: received ~p', [Message]),
240 ( Message = goal(Id, Goal, Vars)
241 -> (
242 -> thread_send_message(Done, done(Id, Vars)),
243 worker(Queue, Done)
244 )
245 ; true
246 ).
true
, signal all workers to make them stop prematurely. If
result is true we assume all workers have been instructed to
stop or have stopped themselves.256concur_cleanup(Result, Workers, Queues) :- 257 !, 258 ( Result == true 259 -> true 260 ; kill_workers(Workers) 261 ), 262 join_all(Workers), 263 maplist(message_queue_destroy, Queues). 264 265kill_workers([]). 266kill_workers([Id|T]) :- 267 debug(concurrent, 'Signalling ~w', [Id]), 268 catch(thread_signal(Id, abort), _, true), 269 kill_workers(T). 270 271join_all([]). 272join_all([Id|T]) :- 273 thread_join(Id, _), 274 join_all(T). 275 276 277 /******************************* 278 * MAPLIST * 279 *******************************/
cpu_count
. If this flag is absent or 1 or List has
less than two elements, this predicate simply calls the
corresponding maplist/N version.
Note that the the overhead of this predicate is considerable and therefore Goal must be fairly expensive before one reaches a speedup.
297concurrent_maplist(Goal, List) :- 298 workers(List, WorkerCount), 299 !, 300 maplist(ml_goal(Goal), List, Goals), 301 concurrent(WorkerCount, Goals, []). 302concurrent_maplist(Goal, List) :- 303 maplist(Goal, List). 304 305ml_goal(Goal, Elem, call(Goal, Elem)). 306 307concurrent_maplist(Goal, List1, List2) :- 308 same_length(List1, List2), 309 workers(List1, WorkerCount), 310 !, 311 maplist(ml_goal(Goal), List1, List2, Goals), 312 concurrent(WorkerCount, Goals, []). 313concurrent_maplist(Goal, List1, List2) :- 314 maplist(Goal, List1, List2). 315 316ml_goal(Goal, Elem1, Elem2, call(Goal, Elem1, Elem2)). 317 318concurrent_maplist(Goal, List1, List2, List3) :- 319 same_length(List1, List2, List3), 320 workers(List1, WorkerCount), 321 !, 322 maplist(ml_goal(Goal), List1, List2, List3, Goals), 323 concurrent(WorkerCount, Goals, []). 324concurrent_maplist(Goal, List1, List2, List3) :- 325 maplist(Goal, List1, List2, List3). 326 327ml_goal(Goal, Elem1, Elem2, Elem3, call(Goal, Elem1, Elem2, Elem3)). 328 329workers(List, Count) :- 330 current_prolog_flag(cpu_count, Cores), 331 Cores > 1, 332 length(List, Len), 333 Count is min(Cores,Len), 334 Count > 1, 335 !. 336 337same_length([], [], []). 338same_length([_|T1], [_|T2], [_|T3]) :- 339 same_length(T1, T2, T3). 340 341 342 /******************************* 343 * FIRST * 344 *******************************/
For example, if it is unclear whether it is better to search a graph breadth-first or depth-first we can use:
search_graph(Grap, Path) :- first_solution(Path, [ breadth_first(Graph, Path), depth_first(Graph, Path) ], []).
Options include thread stack-sizes passed to thread_create, as
well as the options on_fail
and on_error
that specify what
to do if a solver fails or triggers an error. By default
execution of all solvers is terminated and the result is
returned. Sometimes one may wish to continue. One such scenario
is if one of the solvers may run out of resources or one of the
solvers is known to be incomplete.
stop
(default), terminate all threads and stop with
the failure. If continue
, keep waiting.384first_solution(X, M:List, Options) :- 385 message_queue_create(Done), 386 thread_options(Options, ThreadOptions, RestOptions), 387 length(List, JobCount), 388 create_solvers(List, M, X, Done, Solvers, ThreadOptions), 389 wait_for_one(JobCount, Done, Result, RestOptions), 390 concur_cleanup(kill, Solvers, [Done]), 391 ( Result = done(_, Var) 392 -> X = Var 393 ; Result = error(_, Error) 394 -> throw(Error) 395 ). 396 397create_solvers([], _, _, _, [], _). 398create_solvers([H|T], M, X, Done, [Id|IDs], Options) :- 399 thread_create(solve(M:H, X, Done), Id, Options), 400 create_solvers(T, M, X, Done, IDs, Options). 401 402solve(Goal, Var, Queue) :- 403 thread_self(Me), 404 ( catch(, E, true) 405 -> ( var(E) 406 -> thread_send_message(Queue, done(Me, Var)) 407 ; thread_send_message(Queue, error(Me, E)) 408 ) 409 ; thread_send_message(Queue, failed(Me)) 410 ). 411 412wait_for_one(0, _, failed, _) :- !. 413wait_for_one(JobCount, Queue, Result, Options) :- 414 thread_get_message(Queue, Msg), 415 LeftCount is JobCount - 1, 416 ( Msg = done(_, _) 417 -> Result = Msg 418 ; Msg = failed(_) 419 -> ( option(on_fail(stop), Options, stop) 420 -> Result = Msg 421 ; wait_for_one(LeftCount, Queue, Result, Options) 422 ) 423 ; Msg = error(_, _) 424 -> ( option(on_error(stop), Options, stop) 425 -> Result = Msg 426 ; wait_for_one(LeftCount, Queue, Result, Options) 427 ) 428 ).
thread(-size)
options and other
options.436thread_options([], [], []). 437thread_options([H|T], [H|Th], O) :- 438 thread_option(H), 439 !, 440 thread_options(T, Th, O). 441thread_options([H|T], Th, [H|O]) :- 442 thread_options(T, Th, O). 443 444thread_option(local(_)). 445thread_option(global(_)). 446thread_option(trail(_)). 447thread_option(argument(_)). 448thread_option(stack(_))
High level thread primitives
This module defines simple to use predicates for running goals concurrently. Where the core multi-threaded API is targeted at communicating long-living threads, the predicates here are defined to run goals concurrently without having to deal with thread creation and maintenance explicitely.
Note that these predicates run goals concurrently and therefore these goals need to be thread-safe. As the predicates in this module also abort branches of the computation that are no longer needed, predicates that have side-effect must act properly. In a nutshell, this has the following consequences: