View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2002-2016, University of Amsterdam
    7                              VU University Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(thread_httpd,
   37          [ http_current_server/2,      % ?:Goal, ?Port
   38            http_server_property/2,     % ?Port, ?Property
   39            http_server/2,              % :Goal, +Options
   40            http_workers/2,             % +Port, ?WorkerCount
   41            http_add_worker/2,          % +Port, +Options
   42            http_current_worker/2,      % ?Port, ?ThreadID
   43            http_stop_server/2,         % +Port, +Options
   44            http_spawn/2,               % :Goal, +Options
   45
   46            http_requeue/1,             % +Request
   47            http_close_connection/1,    % +Request
   48            http_enough_workers/3       % +Queue, +Why, +Peer
   49          ]).   50:- use_module(library(debug)).   51:- use_module(library(error)).   52:- use_module(library(option)).   53:- use_module(library(socket)).   54:- use_module(library(thread_pool)).   55:- use_module(library(gensym)).   56:- use_module(http_wrapper).   57:- use_module(http_path).   58
   59
   60:- predicate_options(http_server/2, 2,
   61                     [ port(any),
   62                       tcp_socket(any),
   63                       workers(positive_integer),
   64                       timeout(number),
   65                       keep_alive_timeout(number),
   66                       ssl(list(any)),  % if http/http_ssl_plugin is loaded
   67                       pass_to(system:thread_create/3, 3)
   68                     ]).   69:- predicate_options(http_spawn/2, 2,
   70                     [ pool(atom),
   71                       pass_to(system:thread_create/3, 3),
   72                       pass_to(thread_pool:thread_create_in_pool/4, 4)
   73                     ]).   74:- predicate_options(http_add_worker/2, 2,
   75                     [ timeout(number),
   76                       keep_alive_timeout(number),
   77                       max_idle_time(number),
   78                       pass_to(system:thread_create/3, 3)
   79                     ]).

Threaded HTTP server

This library defines the HTTP server frontend of choice for SWI-Prolog. It is based on the multi-threading capabilities of SWI-Prolog and thus exploits multiple cores to serve requests concurrently. The server scales well and can cooperate with library(thread_pool) to control the number of concurrent requests of a given type. For example, it can be configured to handle 200 file download requests concurrently, 2 requests that potentially uses a lot of memory and 8 requests that use a lot of CPU resources.

On Unix systems, this library can be combined with library(http/http_unix_daemon) to realise a proper Unix service process that creates a web server at port 80, runs under a specific account, optionally detaches from the controlling terminal, etc.

Combined with library(http/http_ssl_plugin) from the SSL package, this library can be used to create an HTTPS server. See <plbase>/doc/packages/examples/ssl/https for an example server using a self-signed SSL certificate. */

  103:- meta_predicate
  104    http_server(1, :),
  105    http_current_server(1, ?),
  106    http_spawn(0, +).  107
  108:- dynamic
  109    current_server/6,       % Port, Goal, Thread, Queue, Scheme, StartTime
  110    queue_worker/2,         % Queue, ThreadID
  111    queue_options/2.        % Queue, Options
  112
  113:- multifile
  114    make_socket_hook/3,
  115    accept_hook/2,
  116    close_hook/1,
  117    open_client_hook/6,
  118    http:create_pool/1,
  119    http:schedule_workers/1.
 http_server(:Goal, :Options) is det
Create a server at Port that calls Goal for each parsed request. Options provide a list of options. Defined options are
port(?Address)
Port to bind to. Address is either a port or a term Host:Port. The port may be a variable, causing the system to select a free port. See tcp_bind/2.
tcp_socket(+Socket)
If provided, use this socket instead of the creating one and binding it to an address. The socket must be bound to an address.
workers(+Count)
Determine the number of worker threads. Default is 5. This is fine for small scale usage. Public servers typically need a higher number.
timeout(+Seconds)
Max time of inactivity trying to read the request after a connection has been opened. Default is 60 seconds. See set_stream/1 using the timeout option.
keep_alive_timeout(+Seconds)
Time to keep `Keep alive' connections alive. Default is 2 seconds.
local(+Kbytes)
global(+Kbytes)
trail(+Kbytes)
Stack sizes to use for the workers. The default is inherited from the main thread. As of version 5.9 stacks are no longer pre-allocated and the given sizes only act as a limit. If you need to control resource usage look at the spawn option of http_handler/3 and library(thread_pool).

A typical initialization for an HTTP server that uses http_dispatch/1 to relay requests to predicates is:

:- use_module(library(http/thread_httpd)).
:- use_module(library(http/http_dispatch)).

start_server(Port) :-
    http_server(http_dispatch, [port(Port)]).

Note that multiple servers can coexist in the same Prolog process. A notable application of this is to have both an HTTP and HTTPS server, where the HTTP server redirects to the HTTPS server for handling sensitive requests.

  175http_server(Goal, M:Options0) :-
  176    option(port(Port), Options0),
  177    !,
  178    make_socket(Port, M:Options0, Options),
  179    create_workers(Options),
  180    create_server(Goal, Port, Options),
  181    print_message(informational,
  182                  httpd_started_server(Port)).
  183http_server(_Goal, _Options) :-
  184    existence_error(option, port).
 make_socket(?Port, :OptionsIn, -OptionsOut) is det
Create the HTTP server socket and worker pool queue. OptionsOut is quaranteed to hold the option queue(QueueId).
Arguments:
OptionsIn- is qualified to allow passing the module-sensitive ssl option argument.
  195make_socket(Port, Options0, Options) :-
  196    make_socket_hook(Port, Options0, Options),
  197    !.
  198make_socket(Port, _:Options0, Options) :-
  199    option(tcp_socket(_), Options0),
  200    !,
  201    make_addr_atom('httpd', Port, Queue),
  202    Options = [ queue(Queue)
  203              | Options0
  204              ].
  205make_socket(Port, _:Options0, Options) :-
  206    tcp_socket(Socket),
  207    tcp_setopt(Socket, reuseaddr),
  208    tcp_bind(Socket, Port),
  209    tcp_listen(Socket, 5),
  210    make_addr_atom('httpd', Port, Queue),
  211    Options = [ queue(Queue),
  212                tcp_socket(Socket)
  213              | Options0
  214              ].
 make_addr_atom(+Scheme, +Address, -Atom) is det
Create an atom that identifies the server's queue and thread resources.
  221make_addr_atom(Scheme, Address, Atom) :-
  222    phrase(address_parts(Address), Parts),
  223    atomic_list_concat([Scheme,@|Parts], Atom).
  224
  225address_parts(Atomic) -->
  226    { atomic(Atomic) },
  227    !,
  228    [Atomic].
  229address_parts(Host:Port) -->
  230    !,
  231    address_parts(Host), [:], address_parts(Port).
  232address_parts(ip(A,B,C,D)) -->
  233    !,
  234    [ A, '.', B, '.', C, '.', D ].
 create_server(:Goal, +Address, +Options) is det
Create the main server thread that runs accept_server/2 to listen to new requests.
  241create_server(Goal, Address, Options) :-
  242    get_time(StartTime),
  243    memberchk(queue(Queue), Options),
  244    scheme(Scheme, Options),
  245    address_port(Address, Port),
  246    make_addr_atom(Scheme, Port, Alias),
  247    thread_create(accept_server(Goal, Options), _,
  248                  [ alias(Alias)
  249                  ]),
  250    assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)).
  251
  252scheme(Scheme, Options) :-
  253    option(scheme(Scheme), Options),
  254    !.
  255scheme(Scheme, Options) :-
  256    (   option(ssl(_), Options)
  257    ;   option(ssl_instance(_), Options)
  258    ),
  259    !,
  260    Scheme = https.
  261scheme(http, _).
  262
  263address_port(_Host:Port, Port) :- !.
  264address_port(Port, Port).
 http_current_server(:Goal, ?Port) is nondet
True if Goal is the goal of a server at Port.
deprecated
- Use http_server_property(Port, goal(Goal))
  273http_current_server(Goal, Port) :-
  274    current_server(Port, Goal, _, _, _, _).
 http_server_property(?Port, ?Property) is nondet
True if Property is a property of the HTTP server running at Port. Defined properties are:
goal(:Goal)
Goal used to start the server. This is often http_dispatch/1.
scheme(-Scheme)
Scheme is one of http or https.
start_time(?Time)
Time-stamp when the server was created.
  290http_server_property(_:Port, Property) :-
  291    integer(Port),
  292    !,
  293    server_property(Property, Port).
  294http_server_property(Port, Property) :-
  295    server_property(Property, Port).
  296
  297server_property(goal(Goal), Port) :-
  298    current_server(Port, Goal, _, _, _, _).
  299server_property(scheme(Scheme), Port) :-
  300    current_server(Port, _, _, _, Scheme, _).
  301server_property(start_time(Time), Port) :-
  302    current_server(Port, _, _, _, _, Time).
 http_workers(+Port, -Workers) is det
http_workers(+Port, +Workers:int) is det
Query or set the number of workers for the server at this port. The number of workers is dynamically modified. Setting it to 1 (one) can be used to profile the worker using tprofile/1.
  312http_workers(Port, Workers) :-
  313    must_be(ground, Port),
  314    current_server(Port, _, _, Queue, _, _),
  315    !,
  316    (   integer(Workers)
  317    ->  resize_pool(Queue, Workers)
  318    ;   findall(W, queue_worker(Queue, W), WorkerIDs),
  319        length(WorkerIDs, Workers)
  320    ).
  321http_workers(Port, _) :-
  322    existence_error(http_server, Port).
 http_add_worker(+Port, +Options) is det
Add a new worker to the HTTP server for port Port. Options overrule the default queue options. The following additional options are processed:
max_idle_time(+Seconds)
The created worker will automatically terminate if there is no new work within Seconds.
  335http_add_worker(Port, Options) :-
  336    must_be(ground, Port),
  337    current_server(Port, _, _, Queue, _, _),
  338    !,
  339    queue_options(Queue, QueueOptions),
  340    merge_options(Options, QueueOptions, WorkerOptions),
  341    atom_concat(Queue, '_', AliasBase),
  342    create_workers(1, 1, Queue, AliasBase, WorkerOptions).
  343http_add_worker(Port, _) :-
  344    existence_error(http_server, Port).
 http_current_worker(?Port, ?ThreadID) is nondet
True if ThreadID is the identifier of a Prolog thread serving Port. This predicate is motivated to allow for the use of arbitrary interaction with the worker thread for development and statistics.
  354http_current_worker(Port, ThreadID) :-
  355    current_server(Port, _, _, Queue, _, _),
  356    queue_worker(Queue, ThreadID).
 accept_server(:Goal, +Options)
The goal of a small server-thread accepting new requests and posting them to the queue of workers.
  364accept_server(Goal, Options) :-
  365    catch(accept_server2(Goal, Options), http_stop, true),
  366    thread_self(Thread),
  367    retract(current_server(_Port, _, Thread, _Queue, _Scheme, _StartTime)),
  368    close_server_socket(Options).
  369
  370accept_server2(Goal, Options) :-
  371    repeat,
  372      (   catch(accept_server3(Goal, Options), E, true)
  373      ->  (   var(E)
  374          ->  fail
  375          ;   accept_rethrow_error(E)
  376          ->  throw(E)
  377          ;   print_message(error, E),
  378              fail
  379          )
  380      ;   print_message(error,      % internal error
  381                        goal_failed(accept_server3(Goal, Options))),
  382          fail
  383      ).
  384
  385accept_server3(Goal, Options) :-
  386    accept_hook(Goal, Options),
  387    !.
  388accept_server3(Goal, Options) :-
  389    memberchk(tcp_socket(Socket), Options),
  390    memberchk(queue(Queue), Options),
  391    tcp_accept(Socket, Client, Peer),
  392    debug(http(connection), 'New HTTP connection from ~p', [Peer]),
  393    http_enough_workers(Queue, accept, Peer),
  394    thread_send_message(Queue, tcp_client(Client, Goal, Peer)).
  395
  396accept_rethrow_error(http_stop).
  397accept_rethrow_error('$aborted').
 close_server_socket(+Options)
Close the server socket.
  404close_server_socket(Options) :-
  405    close_hook(Options),
  406    !.
  407close_server_socket(Options) :-
  408    memberchk(tcp_socket(Socket), Options),
  409    !,
  410    tcp_close_socket(Socket).
 http_stop_server(+Port, +Options)
Stop the indicated HTTP server gracefully. First stops all workers, then stops the server.
To be done
- Realise non-graceful stop
  420http_stop_server(Host:Port, Options) :-         % e.g., localhost:4000
  421    ground(Host),
  422    !,
  423    http_stop_server(Port, Options).
  424http_stop_server(Port, _Options) :-
  425    http_workers(Port, 0),                  % checks Port is ground
  426    current_server(Port, _, Thread, Queue, _Scheme, _Start),
  427    retractall(queue_options(Queue, _)),
  428    thread_signal(Thread, throw(http_stop)),
  429    catch(connect(localhost:Port), _, true),
  430    thread_join(Thread, _),
  431    message_queue_destroy(Queue).
  432
  433connect(Address) :-
  434    setup_call_cleanup(
  435        tcp_socket(Socket),
  436        tcp_connect(Socket, Address),
  437        tcp_close_socket(Socket)).
 http_enough_workers(+Queue, +Why, +Peer) is det
Check that we have enough workers in our queue. If not, call the hook schedule_workers/1 to extend the worker pool. This predicate can be used by accept_hook/2.
  445http_enough_workers(Queue, Why, Peer) :-
  446    message_queue_property(Queue, size(Size)),
  447    (   enough(Size, Why)
  448    ->  true
  449    ;   current_server(Port, _, _, Queue, _, _),
  450        catch(http:schedule_workers(_{port:Port,
  451                                      reason:Why,
  452                                      peer:Peer,
  453                                      waiting:Size}),
  454              Error,
  455              print_message(error, Error))
  456    ->  true
  457    ;   true
  458    ).
  459
  460enough(0, _).
  461enough(1, keep_alive).                  % I will be ready myself
 http:schedule_workers(+Data:dict) is semidet
Hook called if a new connection or a keep-alive connection cannot be scheduled immediately to a worker. Dict contains the following keys:
port:Port
Port number that identifies the server.
reason:Reason
One of accept for a new connection or keep_alive if a worker tries to reschedule itself.
peer:Peer
Identify the other end of the connection
waiting:Size
Number of messages waiting in the queue.

Note that, when called with reason:accept, we are called in the time critical main accept loop. An implementation of this hook shall typically send the event to thread dedicated to dynamic worker-pool management.

See also
- http_add_worker/2 may be used to create (temporary) extra workers.
  489                 /*******************************
  490                 *    WORKER QUEUE OPERATIONS   *
  491                 *******************************/
 create_workers(+Options)
Create the pool of HTTP worker-threads. Each worker has the alias http_worker_N.
  498create_workers(Options) :-
  499    option(workers(N), Options, 5),
  500    option(queue(Queue), Options),
  501    catch(message_queue_create(Queue), _, true),
  502    atom_concat(Queue, '_', AliasBase),
  503    create_workers(1, N, Queue, AliasBase, Options),
  504    assert(queue_options(Queue, Options)).
  505
  506create_workers(I, N, _, _, _) :-
  507    I > N,
  508    !.
  509create_workers(I, N, Queue, AliasBase, Options) :-
  510    gensym(AliasBase, Alias),
  511    thread_create(http_worker(Options), Id,
  512                  [ alias(Alias)
  513                  | Options
  514                  ]),
  515    assertz(queue_worker(Queue, Id)),
  516    I2 is I + 1,
  517    create_workers(I2, N, Queue, AliasBase, Options).
 resize_pool(+Queue, +Workers) is det
Create or destroy workers. If workers are destroyed, the call waits until the desired number of waiters is reached.
  525resize_pool(Queue, Size) :-
  526    findall(W, queue_worker(Queue, W), Workers),
  527    length(Workers, Now),
  528    (   Now < Size
  529    ->  queue_options(Queue, Options),
  530        atom_concat(Queue, '_', AliasBase),
  531        I0 is Now+1,
  532        create_workers(I0, Size, Queue, AliasBase, Options)
  533    ;   Now == Size
  534    ->  true
  535    ;   Now > Size
  536    ->  Excess is Now - Size,
  537        thread_self(Me),
  538        forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
  539        forall(between(1, Excess, _), thread_get_message(quitted(_)))
  540    ).
 http_worker(+Options)
Run HTTP worker main loop. Workers simply wait until they are passed an accepted socket to process a client.

If the message quit(Sender) is read from the queue, the worker stops.

  551http_worker(Options) :-
  552    thread_at_exit(done_worker),
  553    option(queue(Queue), Options),
  554    option(max_idle_time(MaxIdle), Options, infinite),
  555    repeat,
  556      garbage_collect,
  557      trim_stacks,
  558      debug(http(worker), 'Waiting for a job ...', []),
  559      (   MaxIdle == infinite
  560      ->  thread_get_message(Queue, Message)
  561      ;   thread_get_message(Queue, Message, [timeout(MaxIdle)])
  562      ->  true
  563      ;   Message = quit(idle)
  564      ),
  565      debug(http(worker), 'Got job ~p', [Message]),
  566      (   Message = quit(Sender)
  567      ->  !,
  568          thread_self(Self),
  569          thread_detach(Self),
  570          (   Sender == idle
  571          ->  true
  572          ;   thread_send_message(Sender, quitted(Self))
  573          )
  574      ;   open_client(Message, Queue, Goal, In, Out,
  575                      Options, ClientOptions),
  576          (   catch(http_process(Goal, In, Out, ClientOptions),
  577                    Error, true)
  578          ->  true
  579          ;   Error = goal_failed(http_process/4)
  580          ),
  581          (   var(Error)
  582          ->  fail
  583          ;   current_message_level(Error, Level),
  584              print_message(Level, Error),
  585              memberchk(peer(Peer), ClientOptions),
  586              close_connection(Peer, In, Out),
  587              fail
  588          )
  589      ).
 open_client(+Message, +Queue, -Goal, -In, -Out, +Options, -ClientOptions) is semidet
Opens the connection to the client in a worker from the message sent to the queue by accept_server/2.
  598open_client(requeue(In, Out, Goal, ClOpts),
  599            _, Goal, In, Out, Opts, ClOpts) :-
  600    !,
  601    memberchk(peer(Peer), ClOpts),
  602    option(keep_alive_timeout(KeepAliveTMO), Opts, 2),
  603    check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out).
  604open_client(Message, Queue, Goal, In, Out, Opts,
  605            [ pool(client(Queue, Goal, In, Out)),
  606              timeout(Timeout)
  607            | Options
  608            ]) :-
  609    catch(open_client(Message, Goal, In, Out, Options, Opts),
  610          E, report_error(E)),
  611    option(timeout(Timeout), Opts, 60),
  612    (   debugging(http(connection))
  613    ->  memberchk(peer(Peer), Options),
  614        debug(http(connection), 'Opened connection from ~p', [Peer])
  615    ;   true
  616    ).
 open_client(+Message, +Goal, -In, -Out, -ClientOptions, +Options) is det
  622open_client(Message, Goal, In, Out, ClientOptions, Options) :-
  623    open_client_hook(Message, Goal, In, Out, ClientOptions, Options),
  624    !.
  625open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out,
  626            [ peer(Peer),
  627              protocol(http)
  628            ], _) :-
  629    tcp_open_socket(Socket, In, Out).
  630
  631report_error(E) :-
  632    print_message(error, E),
  633    fail.
 check_keep_alive_connection(+In, +TimeOut, +Peer, +In, +Out) is semidet
Wait for the client for at most TimeOut seconds. Succeed if the client starts a new request within this time. Otherwise close the connection and fail.
  642check_keep_alive_connection(In, TMO, Peer, In, Out) :-
  643    stream_property(In, timeout(Old)),
  644    set_stream(In, timeout(TMO)),
  645    debug(http(keep_alive), 'Waiting for keep-alive ...', []),
  646    catch(peek_code(In, Code), E, true),
  647    (   var(E),                     % no exception
  648        Code \== -1                 % no end-of-file
  649    ->  set_stream(In, timeout(Old)),
  650        debug(http(keep_alive), '\tre-using keep-alive connection', [])
  651    ;   (   Code == -1
  652        ->  debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
  653        ;   debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
  654        ),
  655        close_connection(Peer, In, Out),
  656        fail
  657    ).
 done_worker
Called when worker is terminated due to http_workers/2 or a (debugging) exception. In the latter case, recreate_worker/2 creates a new worker.
  666done_worker :-
  667    thread_self(Self),
  668    thread_property(Self, status(Status)),
  669    retract(queue_worker(Queue, Self)),
  670    (   catch(recreate_worker(Status, Queue), _, fail)
  671    ->  thread_detach(Self),
  672        print_message(informational,
  673                      httpd_restarted_worker(Self))
  674    ;   done_status_message_level(Status, Level),
  675        print_message(Level,
  676                      httpd_stopped_worker(Self, Status))
  677    ).
  678
  679done_status_message_level(true, silent) :- !.
  680done_status_message_level(exception('$aborted'), silent) :- !.
  681done_status_message_level(_, informational).
 recreate_worker(+Status, +Queue) is semidet
Deal with the possibility that threads are, during development, killed with abort/0. We recreate the worker to avoid that eventually we run out of workers. If we are aborted due to a halt/0 call, thread_create/3 will raise a permission error.

The first clause deals with the possibility that we cannot write to user_error. This is possible when Prolog is started as a service using some service managers. Would be nice if we could write an error, but where?

  696recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :-
  697    halt(2).
  698recreate_worker(exception(Error), Queue) :-
  699    recreate_on_error(Error),
  700    queue_options(Queue, Options),
  701    atom_concat(Queue, '_', AliasBase),
  702    create_workers(1, 1, Queue, AliasBase, Options).
  703
  704recreate_on_error('$aborted').
  705recreate_on_error(time_limit_exceeded).
  706
  707%       thread_httpd:message_level(+Exception, -Level)
  708%
  709%       Determine the message stream used for  exceptions that may occur
  710%       during server_loop/5. Being multifile, clauses   can be added by
  711%       the   application   to   refine   error   handling.   See   also
  712%       message_hook/3 for further programming error handling.
  713
  714:- multifile
  715    message_level/2.  716
  717message_level(error(io_error(read, _), _),      silent).
  718message_level(error(timeout_error(read, _), _), informational).
  719message_level(keep_alive_timeout,               silent).
  720
  721current_message_level(Term, Level) :-
  722    (   message_level(Term, Level)
  723    ->  true
  724    ;   Level = error
  725    ).
 http_requeue(+Header)
Re-queue a connection to the worker pool. This deals with processing additional requests on keep-alive connections.
  733http_requeue(Header) :-
  734    requeue_header(Header, ClientOptions),
  735    memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions),
  736    memberchk(peer(Peer), ClientOptions),
  737    http_enough_workers(Queue, keep_alive, Peer),
  738    thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)),
  739    !.
  740http_requeue(Header) :-
  741    debug(http(error), 'Re-queue failed: ~p', [Header]),
  742    fail.
  743
  744requeue_header([], []).
  745requeue_header([H|T0], [H|T]) :-
  746    requeue_keep(H),
  747    !,
  748    requeue_header(T0, T).
  749requeue_header([_|T0], T) :-
  750    requeue_header(T0, T).
  751
  752requeue_keep(pool(_)).
  753requeue_keep(peer(_)).
  754requeue_keep(protocol(_)).
 http_process(Message, Queue, +Options)
Handle a single client message on the given stream.
  761http_process(Goal, In, Out, Options) :-
  762    debug(http(server), 'Running server goal ~p on ~p -> ~p',
  763          [Goal, In, Out]),
  764    option(timeout(TMO), Options, 60),
  765    set_stream(In, timeout(TMO)),
  766    set_stream(Out, timeout(TMO)),
  767    http_wrapper(Goal, In, Out, Connection,
  768                 [ request(Request)
  769                 | Options
  770                 ]),
  771    next(Connection, Request).
  772
  773next(switch_protocol(SwitchGoal, _SwitchOptions), Request) :-
  774    !,
  775    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  776    (   catch(call(SwitchGoal, In, Out), E,
  777              (   print_message(error, E),
  778                  fail))
  779    ->  true
  780    ;   http_close_connection(Request)
  781    ).
  782next(spawned(ThreadId), _) :-
  783    !,
  784    debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]).
  785next(Connection, Request) :-
  786    downcase_atom(Connection, 'keep-alive'),
  787    http_requeue(Request),
  788    !.
  789next(_, Request) :-
  790    http_close_connection(Request).
 http_close_connection(+Request)
Close connection associated to Request. See also http_requeue/1.
  797http_close_connection(Request) :-
  798    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  799    memberchk(peer(Peer), Request),
  800    close_connection(Peer, In, Out).
 close_connection(+Peer, +In, +Out)
Closes the connection from the server to the client. Errors are currently silently ignored.
  807close_connection(Peer, In, Out) :-
  808    debug(http(connection), 'Closing connection from ~p', [Peer]),
  809    catch(close(In, [force(true)]), _, true),
  810    catch(close(Out, [force(true)]), _, true).
 http_spawn(:Goal, +Options) is det
Continue this connection on a new thread. A handler may call http_spawn/2 to start a new thread that continues processing the current request using Goal. The original thread returns to the worker pool for processing new requests. Options are passed to thread_create/3, except for:
pool(+Pool)
Interfaces to library(thread_pool), starting the thread on the given pool.

If a pool does not exist, this predicate calls the multifile hook http:create_pool/1 to create it. If this predicate succeeds the operation is retried.

  828http_spawn(Goal, Options) :-
  829    select_option(pool(Pool), Options, ThreadOptions),
  830    !,
  831    current_output(CGI),
  832    catch(thread_create_in_pool(Pool,
  833                                wrap_spawned(CGI, Goal), Id,
  834                                [ detached(true)
  835                                | ThreadOptions
  836                                ]),
  837          Error,
  838          true),
  839    (   var(Error)
  840    ->  http_spawned(Id)
  841    ;   Error = error(resource_error(threads_in_pool(_)), _)
  842    ->  throw(http_reply(busy))
  843    ;   Error = error(existence_error(thread_pool, Pool), _),
  844        create_pool(Pool)
  845    ->  http_spawn(Goal, Options)
  846    ;   throw(Error)
  847    ).
  848http_spawn(Goal, Options) :-
  849    current_output(CGI),
  850    thread_create(wrap_spawned(CGI, Goal), Id,
  851                  [ detached(true)
  852                  | Options
  853                  ]),
  854    http_spawned(Id).
  855
  856wrap_spawned(CGI, Goal) :-
  857    set_output(CGI),
  858    http_wrap_spawned(Goal, Request, Connection),
  859    next(Connection, Request).
 create_pool(+Pool)
Lazy creation of worker-pools for the HTTP server. This predicate calls the hook http:create_pool/1. If the hook fails it creates a default pool of size 10. This should suffice most typical usecases. Note that we get a permission error if the pool is already created. We can ignore this.
  869create_pool(Pool) :-
  870    E = error(permission_error(create, thread_pool, Pool), _),
  871    catch(http:create_pool(Pool), E, true).
  872create_pool(Pool) :-
  873    print_message(informational, httpd(created_pool(Pool))),
  874    thread_pool_create(Pool, 10, []).
  875
  876
  877
  878                 /*******************************
  879                 *            MESSAGES          *
  880                 *******************************/
  881
  882:- multifile
  883    prolog:message/3.  884
  885prolog:message(httpd_started_server(Port)) -->
  886    [ 'Started server at '-[] ],
  887    http_root(Port).
  888prolog:message(httpd_stopped_worker(Self, Status)) -->
  889    [ 'Stopped worker ~p: ~p'-[Self, Status] ].
  890prolog:message(httpd_restarted_worker(Self)) -->
  891    [ 'Replaced aborted worker ~p'-[Self] ].
  892prolog:message(httpd(created_pool(Pool))) -->
  893    [ 'Created thread-pool ~p of size 10'-[Pool], nl,
  894      'Create this pool at startup-time or define the hook ', nl,
  895      'http:create_pool/1 to avoid this message and create a ', nl,
  896      'pool that fits the usage-profile.'
  897    ].
  898
  899http_root(Host:Port) -->
  900    !,
  901    http_scheme(Port),
  902    { http_absolute_location(root(.), URI, []) },
  903    [ '~w:~w~w'-[Host, Port, URI] ].
  904http_root(Port) -->
  905    http_scheme(Port),
  906    { http_absolute_location(root(.), URI, []) },
  907    [ 'localhost:~w~w'-[Port, URI] ].
  908
  909http_scheme(Port) -->
  910    { http_server_property(Port, scheme(Scheme)) },
  911    [ '~w://'-[Scheme] ]