View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2008-2016, University of Amsterdam
    7                              VU University Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(thread_pool,
   37          [ thread_pool_create/3,       % +Pool, +Size, +Options
   38            thread_pool_destroy/1,      % +Pool
   39            thread_create_in_pool/4,    % +Pool, :Goal, -Id, +Options
   40
   41            current_thread_pool/1,      % ?Pool
   42            thread_pool_property/2      % ?Pool, ?Property
   43          ]).   44:- use_module(library(error)).   45:- use_module(library(lists)).   46:- use_module(library(option)).   47:- use_module(library(rbtrees)).   48:- use_module(library(debug)).

Resource bounded thread management

The module library(thread_pool) manages threads in pools. A pool defines properties of its member threads and the maximum number of threads that can coexist in the pool. The call thread_create_in_pool/4 allocates a thread in the pool, just like thread_create/3. If the pool is fully allocated it can be asked to wait or raise an error.

The library has been designed to deal with server applications that receive a variety of requests, such as HTTP servers. Simply starting a thread for each request is a bit too simple minded for such servers:

Using this library, one can define a pool for each set of tasks with comparable characteristics and create threads in this pool. Unlike the worker-pool model, threads are not started immediately. Depending on the design, both approaches can be attractive.

The library is implemented by means of a manager thread with the fixed thread id __thread_pool_manager. All state is maintained in this manager thread, which receives and processes requests to create and destroy pools, create threads in a pool and handle messages from terminated threads. Thread pools are not saved in a saved state and must therefore be recreated using the initialization/1 directive or otherwise during startup of the application.

See also
- http_handler/3 and http_spawn/2. */
   85:- meta_predicate
   86    thread_create_in_pool(+, 0, -, :).   87:- predicate_options(thread_create_in_pool/4, 4,
   88                     [ wait(boolean),
   89                       pass_to(system:thread_create/3, 3)
   90                     ]).   91
   92:- multifile
   93    create_pool/1.
 thread_pool_create(+Pool, +Size, +Options) is det
Create a pool of threads. A pool of threads is a declaration for creating threads with shared properties (stack sizes) and a limited number of threads. Threads are created using thread_create_in_pool/4. If all threads in the pool are in use, the behaviour depends on the wait option of thread_create_in_pool/4 and the backlog option described below. Options are passed to thread_create/3, except for
backlog(+MaxBackLog)
Maximum number of requests that can be suspended. Default is infinite. Otherwise it must be a non-negative integer. Using backlog(0) will never delay thread creation for this pool.

The pooling mechanism does not interact with the detached state of a thread. Threads can be created both detached and normal and must be joined using thread_join/2 if they are not detached.

  116thread_pool_create(Name, Size, Options) :-
  117    must_be(list, Options),
  118    pool_manager(Manager),
  119    thread_self(Me),
  120    thread_send_message(Manager, create_pool(Name, Size, Options, Me)),
  121    wait_reply.
 thread_pool_destroy(+Name) is det
Destroy the thread pool named Name.
Errors
- existence_error(thread_pool, Name).
  129thread_pool_destroy(Name) :-
  130    pool_manager(Manager),
  131    thread_self(Me),
  132    thread_send_message(Manager, destroy_pool(Name, Me)),
  133    wait_reply.
 current_thread_pool(?Name) is nondet
True if Name refers to a defined thread pool.
  140current_thread_pool(Name) :-
  141    pool_manager(Manager),
  142    thread_self(Me),
  143    thread_send_message(Manager, current_pools(Me)),
  144    wait_reply(Pools),
  145    (   atom(Name)
  146    ->  memberchk(Name, Pools)
  147    ;   member(Name, Pools)
  148    ).
 thread_pool_property(?Name, ?Property) is nondet
True if Property is a property of thread pool Name. Defined properties are:
options(Options)
Thread creation options for this pool
free(Size)
Number of free slots on this pool
size(Size)
Total number of slots on this pool
members(ListOfIDs)
ListOfIDs is the list or threads running in this pool
running(Running)
Number of running threads in this pool
backlog(Size)
Number of delayed thread creations on this pool
  168thread_pool_property(Name, Property) :-
  169    current_thread_pool(Name),
  170    pool_manager(Manager),
  171    thread_self(Me),
  172    thread_send_message(Manager, pool_properties(Me, Name, Property)),
  173    wait_reply(Props),
  174    (   nonvar(Property)
  175    ->  memberchk(Property, Props)
  176    ;   member(Property, Props)
  177    ).
 thread_create_in_pool(+Pool, :Goal, -Id, +Options) is det
Create a thread in Pool. Options overrule default thread creation options associated to the pool. In addition, the following option is defined:
wait(+Boolean)
If true (default) and the pool is full, wait until a member of the pool completes. If false, throw a resource_error.
Errors
- resource_error(threads_in_pool(Pool)) is raised if wait is false or the backlog limit has been reached.
- existence_error(thread_pool, Pool) if Pool does not exist.
  196thread_create_in_pool(Pool, Goal, Id, QOptions) :-
  197    meta_options(is_meta, QOptions, Options),
  198    catch(thread_create_in_pool_(Pool, Goal, Id, Options),
  199          Error, true),
  200    (   var(Error)
  201    ->  true
  202    ;   Error = error(existence_error(thread_pool, Pool), _),
  203        create_pool_lazily(Pool)
  204    ->  thread_create_in_pool_(Pool, Goal, Id, Options)
  205    ;   throw(Error)
  206    ).
  207
  208thread_create_in_pool_(Pool, Goal, Id, Options) :-
  209    select_option(wait(Wait), Options, ThreadOptions, true),
  210    pool_manager(Manager),
  211    thread_self(Me),
  212    thread_send_message(Manager,
  213                        create(Pool, Goal, Me, Wait, Id, ThreadOptions)),
  214    wait_reply(Id).
  215
  216is_meta(at_exit).
 create_pool_lazily(+Pool) is semidet
Call the hook create_pool/1 to create the pool lazily.
  223create_pool_lazily(Pool) :-
  224    with_mutex(Pool,
  225               ( mutex_destroy(Pool),
  226                 create_pool_sync(Pool)
  227               )).
  228
  229create_pool_sync(Pool) :-
  230    current_thread_pool(Pool),
  231    !.
  232create_pool_sync(Pool) :-
  233    create_pool(Pool).
  234
  235
  236                 /*******************************
  237                 *        START MANAGER         *
  238                 *******************************/
 pool_manager(-ThreadID) is det
ThreadID is the thread (alias) identifier of the manager. Starts the manager if it is not running.
  245pool_manager(TID) :-
  246    TID = '__thread_pool_manager',
  247    (   thread_running(TID)
  248    ->  true
  249    ;   with_mutex('__thread_pool', create_pool_manager(TID))
  250    ).
  251
  252thread_running(Thread) :-
  253    catch(thread_property(Thread, status(Status)),
  254          E, true),
  255    (   var(E)
  256    ->  (   Status == running
  257        ->  true
  258        ;   thread_join(Thread, _),
  259            print_message(warning, thread_pool(manager_died(Status))),
  260            fail
  261        )
  262    ;   E = error(existence_error(thread, Thread), _)
  263    ->  fail
  264    ;   throw(E)
  265    ).
  266
  267create_pool_manager(Thread) :-
  268    thread_running(Thread),
  269    !.
  270create_pool_manager(Thread) :-
  271    thread_create(pool_manager_main, _,
  272                  [ alias(Thread),
  273                    inherit_from(main)
  274                  ]).
  275
  276
  277pool_manager_main :-
  278    rb_new(State0),
  279    manage_thread_pool(State0).
  280
  281
  282                 /*******************************
  283                 *        MANAGER LOGIC         *
  284                 *******************************/
 manage_thread_pool(+State)
  288manage_thread_pool(State0) :-
  289    thread_get_message(Message),
  290    (   update_thread_pool(Message, State0, State)
  291    ->  debug(thread_pool(state), 'Message ~p --> ~p', [Message, State]),
  292        manage_thread_pool(State)
  293    ;   format(user_error, 'Update failed: ~p~n', [Message])
  294    ).
  295
  296
  297update_thread_pool(create_pool(Name, Size, Options, For), State0, State) :-
  298    !,
  299    (   rb_insert_new(State0,
  300                      Name, tpool(Options, Size, Size, WP, WP, []),
  301                      State)
  302    ->  thread_send_message(For, thread_pool(true))
  303    ;   reply_error(For, permission_error(create, thread_pool, Name)),
  304        State = State0
  305    ).
  306update_thread_pool(destroy_pool(Name, For), State0, State) :-
  307    !,
  308    (   rb_delete(State0, Name, State)
  309    ->  thread_send_message(For, thread_pool(true))
  310    ;   reply_error(For, existence_error(thread_pool, Name)),
  311        State = State0
  312    ).
  313update_thread_pool(current_pools(For), State, State) :-
  314    !,
  315    rb_keys(State, Keys),
  316    debug(thread_pool(current), 'Reply to ~w: ~p', [For, Keys]),
  317    reply(For, Keys).
  318update_thread_pool(pool_properties(For, Name, P), State, State) :-
  319    !,
  320    (   rb_lookup(Name, Pool, State)
  321    ->  findall(P, pool_property(P, Pool), List),
  322        reply(For, List)
  323    ;   reply_error(For, existence_error(thread_pool, Name))
  324    ).
  325update_thread_pool(Message, State0, State) :-
  326    arg(1, Message, Name),
  327    (   rb_lookup(Name, Pool0, State0)
  328    ->  update_pool(Message, Pool0, Pool),
  329        rb_update(State0, Name, Pool, State)
  330    ;   State = State0,
  331        (   Message = create(Name, _, For, _, _, _)
  332        ->  reply_error(For, existence_error(thread_pool, Name))
  333        ;   true
  334        )
  335    ).
  336
  337pool_property(options(Options),
  338              tpool(Options, _Free, _Size, _WP, _WPT, _Members)).
  339pool_property(backlog(Size),
  340              tpool(_, _Free, _Size, WP, WPT, _Members)) :-
  341    diff_list_length(WP, WPT, Size).
  342pool_property(free(Free),
  343              tpool(_, Free, _Size, _, _, _)).
  344pool_property(size(Size),
  345              tpool(_, _Free, Size, _, _, _)).
  346pool_property(running(Count),
  347              tpool(_, Free, Size, _, _, _)) :-
  348    Count is Size - Free.
  349pool_property(members(IDList),
  350              tpool(_, _, _, _, _, IDList)).
  351
  352diff_list_length(List, Tail, Size) :-
  353    '$skip_list'(Length, List, Rest),
  354    (   Rest == Tail
  355    ->  Size = Length
  356    ;   type_error(difference_list, List/Tail)
  357    ).
 update_pool(+Message, +Pool0, -Pool) is det
Deal with create requests and completion messages on a given pool. There are two messages:
create(PoolName, Goal, ForThread, Wait, Id, Options)
Create a new thread on behalf of ForThread. There are two cases:
  • Free slots: create the thread
  • No free slots: error or add to waiting
exitted(PoolName, Thread)
A thread completed. If there is a request waiting, create a new one.
  374update_pool(create(Name, Goal, For, _, Id, MyOptions),
  375            tpool(Options, Free0, Size, WP, WPT, Members0),
  376            tpool(Options, Free, Size, WP, WPT, Members)) :-
  377    succ(Free, Free0),
  378    !,
  379    merge_options(MyOptions, Options, ThreadOptions),
  380    select_option(at_exit(AtExit), ThreadOptions, ThreadOptions1, true),
  381    catch(thread_create(Goal, Id,
  382                        [ at_exit(worker_exitted(Name, Id, AtExit))
  383                        | ThreadOptions1
  384                        ]),
  385          E, true),
  386    (   var(E)
  387    ->  Members = [Id|Members0],
  388        reply(For, Id)
  389    ;   reply_error(For, E),
  390        Members = Members0
  391    ).
  392update_pool(Create,
  393            tpool(Options, 0, Size, WP, WPT0, Members),
  394            tpool(Options, 0, Size, WP, WPT, Members)) :-
  395    Create = create(Name, _Goal, For, Wait, _, _Options),
  396    !,
  397    option(backlog(BackLog), Options, infinite),
  398    (   can_delay(Wait, BackLog, WP, WPT0)
  399    ->  WPT0 = [Create|WPT],
  400        debug(thread_pool, 'Delaying ~p', [Create])
  401    ;   WPT = WPT0,
  402        reply_error(For, resource_error(threads_in_pool(Name)))
  403    ).
  404update_pool(exitted(_Name, Id),
  405            tpool(Options, Free0, Size, WP0, WPT, Members0),
  406            Pool) :-
  407    succ(Free0, Free),
  408    delete(Members0, Id, Members1),
  409    Pool1 = tpool(Options, Free, Size, WP, WPT, Members1),
  410    (   WP0 == WPT
  411    ->  WP = WP0,
  412        Pool = Pool1
  413    ;   WP0 = [Waiting|WP],
  414        debug(thread_pool, 'Start delayed ~p', [Waiting]),
  415        update_pool(Waiting, Pool1, Pool)
  416    ).
  417
  418
  419can_delay(true, infinite, _, _) :- !.
  420can_delay(true, BackLog, WP, WPT) :-
  421    diff_list_length(WP, WPT, Size),
  422    BackLog > Size.
 worker_exitted(+PoolName, +WorkerId, :AtExit)
It is possible that '__thread_pool_manager' no longer exists while closing down the process because the manager was killed before the worker.
To be done
- Find a way to discover that we are terminating Prolog.
  432worker_exitted(Name, Id, AtExit) :-
  433    catch(thread_send_message('__thread_pool_manager', exitted(Name, Id)),
  434          _, true),
  435    call(AtExit).
  436
  437
  438                 /*******************************
  439                 *             UTIL             *
  440                 *******************************/
  441
  442reply(To, Term) :-
  443    thread_send_message(To, thread_pool(true(Term))).
  444
  445reply_error(To, Error) :-
  446    thread_send_message(To, thread_pool(error(Error, _))).
  447
  448wait_reply :-
  449    thread_get_message(thread_pool(Result)),
  450    (   Result == true
  451    ->  true
  452    ;   Result == fail
  453    ->  fail
  454    ;   throw(Result)
  455    ).
  456
  457wait_reply(Value) :-
  458    thread_get_message(thread_pool(Reply)),
  459    (   Reply = true(Value0)
  460    ->  Value = Value0
  461    ;   Reply == fail
  462    ->  fail
  463    ;   throw(Reply)
  464    ).
  465
  466
  467                 /*******************************
  468                 *             HOOKS            *
  469                 *******************************/
 create_pool(+PoolName) is semidet
Hook to create a thread pool lazily. The hook is called if thread_create_in_pool/4 discovers that the thread pool does not exist. If the hook succeeds, thread_create_in_pool/4 retries creating the thread. For example, we can use the following declaration to create threads in the pool media, which holds a maximum of 20 threads.
:- multifile thread_pool:create_pool/1.

thread_pool:create_pool(media) :-
    thread_pool_create(media, 20, []).
  487                 /*******************************
  488                 *            MESSAGES          *
  489                 *******************************/
  490:- multifile
  491    prolog:message/3.  492
  493prolog:message(thread_pool(Message)) -->
  494    message(Message).
  495
  496message(manager_died(Status)) -->
  497    [ 'Thread-pool: manager died on status ~p; restarting'-[Status] ]