1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2008-2016, University of Amsterdam 7 VU University Amsterdam 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36:- module(thread_pool, 37 [ thread_pool_create/3, % +Pool, +Size, +Options 38 thread_pool_destroy/1, % +Pool 39 thread_create_in_pool/4, % +Pool, :Goal, -Id, +Options 40 41 current_thread_pool/1, % ?Pool 42 thread_pool_property/2 % ?Pool, ?Property 43 ]). 44:- use_module(library(error)). 45:- use_module(library(lists)). 46:- use_module(library(option)). 47:- use_module(library(rbtrees)). 48:- use_module(library(debug)).
85:- meta_predicate 86 thread_create_in_pool( , , , ). 87:- predicate_options(thread_create_in_pool/4, 4, 88 [ wait(boolean), 89 pass_to(system:thread_create/3, 3) 90 ]). 91 92:- multifile 93 create_pool/1.
wait
option of
thread_create_in_pool/4 and the backlog
option described
below. Options are passed to thread_create/3, except for
infinite
. Otherwise it must be a non-negative integer.
Using backlog(0)
will never delay thread creation for this
pool.
The pooling mechanism does not interact with the detached
state of a thread. Threads can be created both detached
and
normal and must be joined using thread_join/2 if they are not
detached.
116thread_pool_create(Name, Size, Options) :-
117 must_be(list, Options),
118 pool_manager(Manager),
119 thread_self(Me),
120 thread_send_message(Manager, create_pool(Name, Size, Options, Me)),
121 wait_reply.
129thread_pool_destroy(Name) :-
130 pool_manager(Manager),
131 thread_self(Me),
132 thread_send_message(Manager, destroy_pool(Name, Me)),
133 wait_reply.
140current_thread_pool(Name) :-
141 pool_manager(Manager),
142 thread_self(Me),
143 thread_send_message(Manager, current_pools(Me)),
144 wait_reply(Pools),
145 ( atom(Name)
146 -> memberchk(Name, Pools)
147 ; member(Name, Pools)
148 ).
168thread_pool_property(Name, Property) :-
169 current_thread_pool(Name),
170 pool_manager(Manager),
171 thread_self(Me),
172 thread_send_message(Manager, pool_properties(Me, Name, Property)),
173 wait_reply(Props),
174 ( nonvar(Property)
175 -> memberchk(Property, Props)
176 ; member(Property, Props)
177 ).
true
(default) and the pool is full, wait until a
member of the pool completes. If false
, throw a
resource_error.196thread_create_in_pool(Pool, Goal, Id, QOptions) :- 197 meta_options(is_meta, QOptions, Options), 198 catch(thread_create_in_pool_(Pool, Goal, Id, Options), 199 Error, true), 200 ( var(Error) 201 -> true 202 ; Error = error(existence_error(thread_pool, Pool), _), 203 create_pool_lazily(Pool) 204 -> thread_create_in_pool_(Pool, Goal, Id, Options) 205 ; throw(Error) 206 ). 207 208thread_create_in_pool_(Pool, Goal, Id, Options) :- 209 select_option(wait(Wait), Options, ThreadOptions, true), 210 pool_manager(Manager), 211 thread_self(Me), 212 thread_send_message(Manager, 213 create(Pool, Goal, Me, Wait, Id, ThreadOptions)), 214 wait_reply(Id). 215 216is_meta(at_exit).
223create_pool_lazily(Pool) :- 224 with_mutex(Pool, 225 ( mutex_destroy(Pool), 226 create_pool_sync(Pool) 227 )). 228 229create_pool_sync(Pool) :- 230 current_thread_pool(Pool), 231 !. 232create_pool_sync(Pool) :- 233 create_pool(Pool). 234 235 236 /******************************* 237 * START MANAGER * 238 *******************************/
245pool_manager(TID) :- 246 TID = '__thread_pool_manager', 247 ( thread_running(TID) 248 -> true 249 ; with_mutex('__thread_pool', create_pool_manager(TID)) 250 ). 251 252thread_running(Thread) :- 253 catch(thread_property(Thread, status(Status)), 254 E, true), 255 ( var(E) 256 -> ( Status == running 257 -> true 258 ; thread_join(Thread, _), 259 print_message(warning, thread_pool(manager_died(Status))), 260 fail 261 ) 262 ; E = error(existence_error(thread, Thread), _) 263 -> fail 264 ; throw(E) 265 ). 266 267create_pool_manager(Thread) :- 268 thread_running(Thread), 269 !. 270create_pool_manager(Thread) :- 271 thread_create(pool_manager_main, _, 272 [ alias(Thread), 273 inherit_from(main) 274 ]). 275 276 277pool_manager_main :- 278 rb_new(State0), 279 manage_thread_pool(State0). 280 281 282 /******************************* 283 * MANAGER LOGIC * 284 *******************************/
288manage_thread_pool(State0) :- 289 thread_get_message(Message), 290 ( update_thread_pool(Message, State0, State) 291 -> debug(thread_pool(state), 'Message ~p --> ~p', [Message, State]), 292 manage_thread_pool(State) 293 ; format(user_error, 'Update failed: ~p~n', [Message]) 294 ). 295 296 297update_thread_pool(create_pool(Name, Size, Options, For), State0, State) :- 298 !, 299 ( rb_insert_new(State0, 300 Name, tpool(Options, Size, Size, WP, WP, []), 301 State) 302 -> thread_send_message(For, thread_pool(true)) 303 ; reply_error(For, permission_error(create, thread_pool, Name)), 304 State = State0 305 ). 306update_thread_pool(destroy_pool(Name, For), State0, State) :- 307 !, 308 ( rb_delete(State0, Name, State) 309 -> thread_send_message(For, thread_pool(true)) 310 ; reply_error(For, existence_error(thread_pool, Name)), 311 State = State0 312 ). 313update_thread_pool(current_pools(For), State, State) :- 314 !, 315 rb_keys(State, Keys), 316 debug(thread_pool(current), 'Reply to ~w: ~p', [For, Keys]), 317 reply(For, Keys). 318update_thread_pool(pool_properties(For, Name, P), State, State) :- 319 !, 320 ( rb_lookup(Name, Pool, State) 321 -> findall(P, pool_property(P, Pool), List), 322 reply(For, List) 323 ; reply_error(For, existence_error(thread_pool, Name)) 324 ). 325update_thread_pool(Message, State0, State) :- 326 arg(1, Message, Name), 327 ( rb_lookup(Name, Pool0, State0) 328 -> update_pool(Message, Pool0, Pool), 329 rb_update(State0, Name, Pool, State) 330 ; State = State0, 331 ( Message = create(Name, _, For, _, _, _) 332 -> reply_error(For, existence_error(thread_pool, Name)) 333 ; true 334 ) 335 ). 336 337pool_property(options(Options), 338 tpool(Options, _Free, _Size, _WP, _WPT, _Members)). 339pool_property(backlog(Size), 340 tpool(_, _Free, _Size, WP, WPT, _Members)) :- 341 diff_list_length(WP, WPT, Size). 342pool_property(free(Free), 343 tpool(_, Free, _Size, _, _, _)). 344pool_property(size(Size), 345 tpool(_, _Free, Size, _, _, _)). 346pool_property(running(Count), 347 tpool(_, Free, Size, _, _, _)) :- 348 Count is Size - Free. 349pool_property(members(IDList), 350 tpool(_, _, _, _, _, IDList)). 351 352diff_list_length(List, Tail, Size) :- 353 '$skip_list'(Length, List, Rest), 354 ( Rest == Tail 355 -> Size = Length 356 ; type_error(difference_list, List/Tail) 357 ).
374update_pool(create(Name, Goal, For, _, Id, MyOptions), 375 tpool(Options, Free0, Size, WP, WPT, Members0), 376 tpool(Options, Free, Size, WP, WPT, Members)) :- 377 succ(Free, Free0), 378 !, 379 merge_options(MyOptions, Options, ThreadOptions), 380 select_option(at_exit(AtExit), ThreadOptions, ThreadOptions1, true), 381 catch(thread_create(, Id, 382 [ at_exit(worker_exitted(Name, Id, AtExit)) 383 | ThreadOptions1 384 ]), 385 E, true), 386 ( var(E) 387 -> Members = [Id|Members0], 388 reply(For, Id) 389 ; reply_error(For, E), 390 Members = Members0 391 ). 392update_pool(Create, 393 tpool(Options, 0, Size, WP, WPT0, Members), 394 tpool(Options, 0, Size, WP, WPT, Members)) :- 395 Create = create(Name, _Goal, For, Wait, _, _Options), 396 !, 397 option(backlog(BackLog), Options, infinite), 398 ( can_delay(Wait, BackLog, WP, WPT0) 399 -> WPT0 = [Create|WPT], 400 debug(thread_pool, 'Delaying ~p', [Create]) 401 ; WPT = WPT0, 402 reply_error(For, resource_error(threads_in_pool(Name))) 403 ). 404update_pool(exitted(_Name, Id), 405 tpool(Options, Free0, Size, WP0, WPT, Members0), 406 Pool) :- 407 succ(Free0, Free), 408 delete(Members0, Id, Members1), 409 Pool1 = tpool(Options, Free, Size, WP, WPT, Members1), 410 ( WP0 == WPT 411 -> WP = WP0, 412 Pool = Pool1 413 ; WP0 = [Waiting|WP], 414 debug(thread_pool, 'Start delayed ~p', [Waiting]), 415 update_pool(Waiting, Pool1, Pool) 416 ). 417 418 419can_delay(true, infinite, _, _) :- !. 420can_delay(true, BackLog, WP, WPT) :- 421 diff_list_length(WP, WPT, Size), 422 BackLog > Size.
432worker_exitted(Name, Id, AtExit) :- 433 catch(thread_send_message('__thread_pool_manager', exitted(Name, Id)), 434 _, true), 435 call(). 436 437 438 /******************************* 439 * UTIL * 440 *******************************/ 441 442reply(To, Term) :- 443 thread_send_message(To, thread_pool(true(Term))). 444 445reply_error(To, Error) :- 446 thread_send_message(To, thread_pool(error(Error, _))). 447 448wait_reply :- 449 thread_get_message(thread_pool(Result)), 450 ( Result == true 451 -> true 452 ; Result == fail 453 -> fail 454 ; throw(Result) 455 ). 456 457wait_reply(Value) :- 458 thread_get_message(thread_pool(Reply)), 459 ( Reply = true(Value0) 460 -> Value = Value0 461 ; Reply == fail 462 -> fail 463 ; throw(Reply) 464 ). 465 466 467 /******************************* 468 * HOOKS * 469 *******************************/
media
, which holds a
maximum of 20 threads.
:- multifile thread_pool:create_pool/1. thread_pool:create_pool(media) :- thread_pool_create(media, 20, []).
487 /******************************* 488 * MESSAGES * 489 *******************************/ 490:- multifile 491 prolog:message/3. 492 493prologmessage(thread_pool(Message)) --> 494 message(Message). 495 496message(manager_died(Status)) --> 497 [ 'Thread-pool: manager died on status ~p; restarting'-[Status] ]
Resource bounded thread management
The module
library(thread_pool)
manages threads in pools. A pool defines properties of its member threads and the maximum number of threads that can coexist in the pool. The call thread_create_in_pool/4 allocates a thread in the pool, just like thread_create/3. If the pool is fully allocated it can be asked to wait or raise an error.The library has been designed to deal with server applications that receive a variety of requests, such as HTTP servers. Simply starting a thread for each request is a bit too simple minded for such servers:
Using this library, one can define a pool for each set of tasks with comparable characteristics and create threads in this pool. Unlike the worker-pool model, threads are not started immediately. Depending on the design, both approaches can be attractive.
The library is implemented by means of a manager thread with the fixed thread id
__thread_pool_manager
. All state is maintained in this manager thread, which receives and processes requests to create and destroy pools, create threads in a pool and handle messages from terminated threads. Thread pools are not saved in a saved state and must therefore be recreated using the initialization/1 directive or otherwise during startup of the application.