View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2008-2016, University of Amsterdam
    7                              VU University Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(thread_pool,
   37          [ thread_pool_create/3,       % +Pool, +Size, +Options
   38            thread_pool_destroy/1,      % +Pool
   39            thread_create_in_pool/4,    % +Pool, :Goal, -Id, +Options
   40
   41            current_thread_pool/1,      % ?Pool
   42            thread_pool_property/2      % ?Pool, ?Property
   43          ]).   44:- use_module(library(error)).   45:- use_module(library(lists)).   46:- use_module(library(option)).   47:- use_module(library(rbtrees)).   48:- use_module(library(debug)).   49
   50
   51/** <module> Resource bounded thread management
   52
   53The module library(thread_pool) manages threads in pools. A pool defines
   54properties of its member threads and the  maximum number of threads that
   55can coexist in the pool. The   call  thread_create_in_pool/4 allocates a
   56thread in the pool, just like  thread_create/3.   If  the  pool is fully
   57allocated it can be asked to wait or raise an error.
   58
   59The library has been designed  to   deal  with  server applications that
   60receive a variety of requests, such as   HTTP servers. Simply starting a
   61thread for each request is a bit too simple minded for such servers:
   62
   63    * Creating many CPU intensive threads often leads to a slow-down
   64    rather than a speedup.
   65    * Creating many memory intensive threads may exhaust resources
   66    * Tasks that require little CPU and memory but take long waiting
   67    for external resources can run many threads.
   68
   69Using this library, one can define a  pool   for  each set of tasks with
   70comparable characteristics and create threads in   this pool. Unlike the
   71worker-pool model, threads are not started immediately. Depending on the
   72design, both approaches can be attractive.
   73
   74The library is implemented by means of   a manager thread with the fixed
   75thread id =|__thread_pool_manager|=. All  state   is  maintained in this
   76manager thread, which receives and  processes   requests  to  create and
   77destroy pools, create  threads  in  a   pool  and  handle  messages from
   78terminated threads. Thread pools are _not_ saved   in  a saved state and
   79must therefore be recreated  using   the  initialization/1  directive or
   80otherwise during startup of the application.
   81
   82@see http_handler/3 and http_spawn/2.
   83*/
   84
   85:- meta_predicate
   86    thread_create_in_pool(+, 0, -, :).   87:- predicate_options(thread_create_in_pool/4, 4,
   88                     [ wait(boolean),
   89                       pass_to(system:thread_create/3, 3)
   90                     ]).   91
   92:- multifile
   93    create_pool/1.   94
   95%!  thread_pool_create(+Pool, +Size, +Options) is det.
   96%
   97%   Create a pool of threads. A pool of threads is a declaration for
   98%   creating threads with shared  properties   (stack  sizes)  and a
   99%   limited  number  of  threads.   Threads    are   created   using
  100%   thread_create_in_pool/4. If all threads in the  pool are in use,
  101%   the   behaviour   depends   on    the     =wait=    option    of
  102%   thread_create_in_pool/4  and  the  =backlog=   option  described
  103%   below.  Options are passed to thread_create/3, except for
  104%
  105%       * backlog(+MaxBackLog)
  106%       Maximum number of requests that can be suspended.  Default
  107%       is =infinite=.  Otherwise it must be a non-negative integer.
  108%       Using backlog(0) will never delay thread creation for this
  109%       pool.
  110%
  111%   The pooling mechanism does _not_   interact  with the =detached=
  112%   state of a thread. Threads can   be  created both =detached= and
  113%   normal and must be joined using   thread_join/2  if they are not
  114%   detached.
  115
  116thread_pool_create(Name, Size, Options) :-
  117    must_be(list, Options),
  118    pool_manager(Manager),
  119    thread_self(Me),
  120    thread_send_message(Manager, create_pool(Name, Size, Options, Me)),
  121    wait_reply.
  122
  123%!  thread_pool_destroy(+Name) is det.
  124%
  125%   Destroy the thread pool named Name.
  126%
  127%   @error  existence_error(thread_pool, Name).
  128
  129thread_pool_destroy(Name) :-
  130    pool_manager(Manager),
  131    thread_self(Me),
  132    thread_send_message(Manager, destroy_pool(Name, Me)),
  133    wait_reply.
  134
  135
  136%!  current_thread_pool(?Name) is nondet.
  137%
  138%   True if Name refers to a defined thread pool.
  139
  140current_thread_pool(Name) :-
  141    pool_manager(Manager),
  142    thread_self(Me),
  143    thread_send_message(Manager, current_pools(Me)),
  144    wait_reply(Pools),
  145    (   atom(Name)
  146    ->  memberchk(Name, Pools)
  147    ;   member(Name, Pools)
  148    ).
  149
  150%!  thread_pool_property(?Name, ?Property) is nondet.
  151%
  152%   True if Property is a property of thread pool Name. Defined
  153%   properties are:
  154%
  155%       * options(Options)
  156%       Thread creation options for this pool
  157%       * free(Size)
  158%       Number of free slots on this pool
  159%       * size(Size)
  160%       Total number of slots on this pool
  161%       * members(ListOfIDs)
  162%       ListOfIDs is the list or threads running in this pool
  163%       * running(Running)
  164%       Number of running threads in this pool
  165%       * backlog(Size)
  166%       Number of delayed thread creations on this pool
  167
  168thread_pool_property(Name, Property) :-
  169    current_thread_pool(Name),
  170    pool_manager(Manager),
  171    thread_self(Me),
  172    thread_send_message(Manager, pool_properties(Me, Name, Property)),
  173    wait_reply(Props),
  174    (   nonvar(Property)
  175    ->  memberchk(Property, Props)
  176    ;   member(Property, Props)
  177    ).
  178
  179
  180%!  thread_create_in_pool(+Pool, :Goal, -Id, +Options) is det.
  181%
  182%   Create  a  thread  in  Pool.  Options  overrule  default  thread
  183%   creation options associated  to  the   pool.  In  addition,  the
  184%   following option is defined:
  185%
  186%       * wait(+Boolean)
  187%       If =true= (default) and the pool is full, wait until a
  188%       member of the pool completes.  If =false=, throw a
  189%       resource_error.
  190%
  191%   @error  resource_error(threads_in_pool(Pool)) is raised if wait
  192%           is =false= or the backlog limit has been reached.
  193%   @error  existence_error(thread_pool, Pool) if Pool does not
  194%           exist.
  195
  196thread_create_in_pool(Pool, Goal, Id, QOptions) :-
  197    meta_options(is_meta, QOptions, Options),
  198    catch(thread_create_in_pool_(Pool, Goal, Id, Options),
  199          Error, true),
  200    (   var(Error)
  201    ->  true
  202    ;   Error = error(existence_error(thread_pool, Pool), _),
  203        create_pool_lazily(Pool)
  204    ->  thread_create_in_pool_(Pool, Goal, Id, Options)
  205    ;   throw(Error)
  206    ).
  207
  208thread_create_in_pool_(Pool, Goal, Id, Options) :-
  209    select_option(wait(Wait), Options, ThreadOptions, true),
  210    pool_manager(Manager),
  211    thread_self(Me),
  212    thread_send_message(Manager,
  213                        create(Pool, Goal, Me, Wait, Id, ThreadOptions)),
  214    wait_reply(Id).
  215
  216is_meta(at_exit).
  217
  218
  219%!  create_pool_lazily(+Pool) is semidet.
  220%
  221%   Call the hook create_pool/1 to create the pool lazily.
  222
  223create_pool_lazily(Pool) :-
  224    with_mutex(Pool,
  225               ( mutex_destroy(Pool),
  226                 create_pool_sync(Pool)
  227               )).
  228
  229create_pool_sync(Pool) :-
  230    current_thread_pool(Pool),
  231    !.
  232create_pool_sync(Pool) :-
  233    create_pool(Pool).
  234
  235
  236                 /*******************************
  237                 *        START MANAGER         *
  238                 *******************************/
  239
  240%!  pool_manager(-ThreadID) is det.
  241%
  242%   ThreadID is the thread (alias) identifier of the manager. Starts
  243%   the manager if it is not running.
  244
  245pool_manager(TID) :-
  246    TID = '__thread_pool_manager',
  247    (   thread_running(TID)
  248    ->  true
  249    ;   with_mutex('__thread_pool', create_pool_manager(TID))
  250    ).
  251
  252thread_running(Thread) :-
  253    catch(thread_property(Thread, status(Status)),
  254          E, true),
  255    (   var(E)
  256    ->  (   Status == running
  257        ->  true
  258        ;   thread_join(Thread, _),
  259            print_message(warning, thread_pool(manager_died(Status))),
  260            fail
  261        )
  262    ;   E = error(existence_error(thread, Thread), _)
  263    ->  fail
  264    ;   throw(E)
  265    ).
  266
  267create_pool_manager(Thread) :-
  268    thread_running(Thread),
  269    !.
  270create_pool_manager(Thread) :-
  271    thread_create(pool_manager_main, _,
  272                  [ alias(Thread),
  273                    inherit_from(main)
  274                  ]).
  275
  276
  277pool_manager_main :-
  278    rb_new(State0),
  279    manage_thread_pool(State0).
  280
  281
  282                 /*******************************
  283                 *        MANAGER LOGIC         *
  284                 *******************************/
  285
  286%!  manage_thread_pool(+State)
  287
  288manage_thread_pool(State0) :-
  289    thread_get_message(Message),
  290    (   update_thread_pool(Message, State0, State)
  291    ->  debug(thread_pool(state), 'Message ~p --> ~p', [Message, State]),
  292        manage_thread_pool(State)
  293    ;   format(user_error, 'Update failed: ~p~n', [Message])
  294    ).
  295
  296
  297update_thread_pool(create_pool(Name, Size, Options, For), State0, State) :-
  298    !,
  299    (   rb_insert_new(State0,
  300                      Name, tpool(Options, Size, Size, WP, WP, []),
  301                      State)
  302    ->  thread_send_message(For, thread_pool(true))
  303    ;   reply_error(For, permission_error(create, thread_pool, Name)),
  304        State = State0
  305    ).
  306update_thread_pool(destroy_pool(Name, For), State0, State) :-
  307    !,
  308    (   rb_delete(State0, Name, State)
  309    ->  thread_send_message(For, thread_pool(true))
  310    ;   reply_error(For, existence_error(thread_pool, Name)),
  311        State = State0
  312    ).
  313update_thread_pool(current_pools(For), State, State) :-
  314    !,
  315    rb_keys(State, Keys),
  316    debug(thread_pool(current), 'Reply to ~w: ~p', [For, Keys]),
  317    reply(For, Keys).
  318update_thread_pool(pool_properties(For, Name, P), State, State) :-
  319    !,
  320    (   rb_lookup(Name, Pool, State)
  321    ->  findall(P, pool_property(P, Pool), List),
  322        reply(For, List)
  323    ;   reply_error(For, existence_error(thread_pool, Name))
  324    ).
  325update_thread_pool(Message, State0, State) :-
  326    arg(1, Message, Name),
  327    (   rb_lookup(Name, Pool0, State0)
  328    ->  update_pool(Message, Pool0, Pool),
  329        rb_update(State0, Name, Pool, State)
  330    ;   State = State0,
  331        (   Message = create(Name, _, For, _, _, _)
  332        ->  reply_error(For, existence_error(thread_pool, Name))
  333        ;   true
  334        )
  335    ).
  336
  337pool_property(options(Options),
  338              tpool(Options, _Free, _Size, _WP, _WPT, _Members)).
  339pool_property(backlog(Size),
  340              tpool(_, _Free, _Size, WP, WPT, _Members)) :-
  341    diff_list_length(WP, WPT, Size).
  342pool_property(free(Free),
  343              tpool(_, Free, _Size, _, _, _)).
  344pool_property(size(Size),
  345              tpool(_, _Free, Size, _, _, _)).
  346pool_property(running(Count),
  347              tpool(_, Free, Size, _, _, _)) :-
  348    Count is Size - Free.
  349pool_property(members(IDList),
  350              tpool(_, _, _, _, _, IDList)).
  351
  352diff_list_length(List, Tail, Size) :-
  353    '$skip_list'(Length, List, Rest),
  354    (   Rest == Tail
  355    ->  Size = Length
  356    ;   type_error(difference_list, List/Tail)
  357    ).
  358
  359
  360%!  update_pool(+Message, +Pool0, -Pool) is det.
  361%
  362%   Deal with create requests and  completion   messages  on a given
  363%   pool.  There are two messages:
  364%
  365%       * create(PoolName, Goal, ForThread, Wait, Id, Options)
  366%       Create a new thread on behalf of ForThread.  There are
  367%       two cases:
  368%            * Free slots: create the thread
  369%            * No free slots: error or add to waiting
  370%       * exitted(PoolName, Thread)
  371%       A thread completed.  If there is a request waiting,
  372%       create a new one.
  373
  374update_pool(create(Name, Goal, For, _, Id, MyOptions),
  375            tpool(Options, Free0, Size, WP, WPT, Members0),
  376            tpool(Options, Free, Size, WP, WPT, Members)) :-
  377    succ(Free, Free0),
  378    !,
  379    merge_options(MyOptions, Options, ThreadOptions),
  380    select_option(at_exit(AtExit), ThreadOptions, ThreadOptions1, true),
  381    catch(thread_create(Goal, Id,
  382                        [ at_exit(worker_exitted(Name, Id, AtExit))
  383                        | ThreadOptions1
  384                        ]),
  385          E, true),
  386    (   var(E)
  387    ->  Members = [Id|Members0],
  388        reply(For, Id)
  389    ;   reply_error(For, E),
  390        Members = Members0
  391    ).
  392update_pool(Create,
  393            tpool(Options, 0, Size, WP, WPT0, Members),
  394            tpool(Options, 0, Size, WP, WPT, Members)) :-
  395    Create = create(Name, _Goal, For, Wait, _, _Options),
  396    !,
  397    option(backlog(BackLog), Options, infinite),
  398    (   can_delay(Wait, BackLog, WP, WPT0)
  399    ->  WPT0 = [Create|WPT],
  400        debug(thread_pool, 'Delaying ~p', [Create])
  401    ;   WPT = WPT0,
  402        reply_error(For, resource_error(threads_in_pool(Name)))
  403    ).
  404update_pool(exitted(_Name, Id),
  405            tpool(Options, Free0, Size, WP0, WPT, Members0),
  406            Pool) :-
  407    succ(Free0, Free),
  408    delete(Members0, Id, Members1),
  409    Pool1 = tpool(Options, Free, Size, WP, WPT, Members1),
  410    (   WP0 == WPT
  411    ->  WP = WP0,
  412        Pool = Pool1
  413    ;   WP0 = [Waiting|WP],
  414        debug(thread_pool, 'Start delayed ~p', [Waiting]),
  415        update_pool(Waiting, Pool1, Pool)
  416    ).
  417
  418
  419can_delay(true, infinite, _, _) :- !.
  420can_delay(true, BackLog, WP, WPT) :-
  421    diff_list_length(WP, WPT, Size),
  422    BackLog > Size.
  423
  424%!  worker_exitted(+PoolName, +WorkerId, :AtExit)
  425%
  426%   It is possible that  '__thread_pool_manager'   no  longer exists
  427%   while closing down the process because   the  manager was killed
  428%   before the worker.
  429%
  430%   @tbd Find a way to discover that we are terminating Prolog.
  431
  432worker_exitted(Name, Id, AtExit) :-
  433    catch(thread_send_message('__thread_pool_manager', exitted(Name, Id)),
  434          _, true),
  435    call(AtExit).
  436
  437
  438                 /*******************************
  439                 *             UTIL             *
  440                 *******************************/
  441
  442reply(To, Term) :-
  443    thread_send_message(To, thread_pool(true(Term))).
  444
  445reply_error(To, Error) :-
  446    thread_send_message(To, thread_pool(error(Error, _))).
  447
  448wait_reply :-
  449    thread_get_message(thread_pool(Result)),
  450    (   Result == true
  451    ->  true
  452    ;   Result == fail
  453    ->  fail
  454    ;   throw(Result)
  455    ).
  456
  457wait_reply(Value) :-
  458    thread_get_message(thread_pool(Reply)),
  459    (   Reply = true(Value0)
  460    ->  Value = Value0
  461    ;   Reply == fail
  462    ->  fail
  463    ;   throw(Reply)
  464    ).
  465
  466
  467                 /*******************************
  468                 *             HOOKS            *
  469                 *******************************/
  470
  471%!  create_pool(+PoolName) is semidet.
  472%
  473%   Hook to create a thread  pool  lazily.   The  hook  is called if
  474%   thread_create_in_pool/4 discovers that the thread  pool does not
  475%   exist. If the  hook   succeeds,  thread_create_in_pool/4 retries
  476%   creating the thread. For  example,  we   can  use  the following
  477%   declaration to create threads in the pool =media=, which holds a
  478%   maximum of 20 threads.
  479%
  480%     ==
  481%     :- multifile thread_pool:create_pool/1.
  482%
  483%     thread_pool:create_pool(media) :-
  484%         thread_pool_create(media, 20, []).
  485%     ==
  486
  487                 /*******************************
  488                 *            MESSAGES          *
  489                 *******************************/
  490:- multifile
  491    prolog:message/3.  492
  493prolog:message(thread_pool(Message)) -->
  494    message(Message).
  495
  496message(manager_died(Status)) -->
  497    [ 'Thread-pool: manager died on status ~p; restarting'-[Status] ]