View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2002-2016, University of Amsterdam
    7                              VU University Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(thread_httpd,
   37          [ http_current_server/2,      % ?:Goal, ?Port
   38            http_server_property/2,     % ?Port, ?Property
   39            http_server/2,              % :Goal, +Options
   40            http_workers/2,             % +Port, ?WorkerCount
   41            http_add_worker/2,          % +Port, +Options
   42            http_current_worker/2,      % ?Port, ?ThreadID
   43            http_stop_server/2,         % +Port, +Options
   44            http_spawn/2,               % :Goal, +Options
   45
   46            http_requeue/1,             % +Request
   47            http_close_connection/1,    % +Request
   48            http_enough_workers/3       % +Queue, +Why, +Peer
   49          ]).   50:- use_module(library(debug)).   51:- use_module(library(error)).   52:- use_module(library(option)).   53:- use_module(library(socket)).   54:- use_module(library(thread_pool)).   55:- use_module(library(gensym)).   56:- use_module(http_wrapper).   57:- use_module(http_path).   58
   59
   60:- predicate_options(http_server/2, 2,
   61                     [ port(any),
   62                       tcp_socket(any),
   63                       workers(positive_integer),
   64                       timeout(number),
   65                       keep_alive_timeout(number),
   66                       ssl(list(any)),  % if http/http_ssl_plugin is loaded
   67                       pass_to(system:thread_create/3, 3)
   68                     ]).   69:- predicate_options(http_spawn/2, 2,
   70                     [ pool(atom),
   71                       pass_to(system:thread_create/3, 3),
   72                       pass_to(thread_pool:thread_create_in_pool/4, 4)
   73                     ]).   74:- predicate_options(http_add_worker/2, 2,
   75                     [ timeout(number),
   76                       keep_alive_timeout(number),
   77                       max_idle_time(number),
   78                       pass_to(system:thread_create/3, 3)
   79                     ]).   80
   81/** <module> Threaded HTTP server
   82
   83This library defines the HTTP server  frontend of choice for SWI-Prolog.
   84It is based on the multi-threading   capabilities of SWI-Prolog and thus
   85exploits multiple cores  to  serve   requests  concurrently.  The server
   86scales well and can cooperate with   library(thread_pool) to control the
   87number of concurrent requests of a given   type.  For example, it can be
   88configured to handle 200 file download requests concurrently, 2 requests
   89that potentially uses a lot of memory and   8 requests that use a lot of
   90CPU resources.
   91
   92On   Unix   systems,    this    library     can    be    combined   with
   93library(http/http_unix_daemon) to realise a proper  Unix service process
   94that creates a web server at  port   80,  runs under a specific account,
   95optionally detaches from the controlling terminal, etc.
   96
   97Combined with library(http/http_ssl_plugin) from the   SSL package, this
   98library   can   be   used   to    create     an    HTTPS   server.   See
   99<plbase>/doc/packages/examples/ssl/https for an example   server using a
  100self-signed SSL certificate.
  101*/
  102
  103:- meta_predicate
  104    http_server(1, :),
  105    http_current_server(1, ?),
  106    http_spawn(0, +).  107
  108:- dynamic
  109    current_server/6,       % Port, Goal, Thread, Queue, Scheme, StartTime
  110    queue_worker/2,         % Queue, ThreadID
  111    queue_options/2.        % Queue, Options
  112
  113:- multifile
  114    make_socket_hook/3,
  115    accept_hook/2,
  116    close_hook/1,
  117    open_client_hook/6,
  118    http:create_pool/1,
  119    http:schedule_workers/1.  120
  121%!  http_server(:Goal, :Options) is det.
  122%
  123%   Create a server at Port that calls Goal for each parsed request.
  124%   Options provide a list of options. Defined options are
  125%
  126%     * port(?Address)
  127%     Port to bind to.  Address is either a port or a term
  128%     Host:Port. The port may be a variable, causing the system
  129%     to select a free port.  See tcp_bind/2.
  130%
  131%     * tcp_socket(+Socket)
  132%     If provided, use this socket instead of the creating one and
  133%     binding it to an address.  The socket must be bound to an
  134%     address.
  135%
  136%     * workers(+Count)
  137%     Determine the number of worker threads.  Default is 5.  This
  138%     is fine for small scale usage.  Public servers typically need
  139%     a higher number.
  140%
  141%     * timeout(+Seconds)
  142%     Max time of inactivity trying to read the request after a
  143%     connection has been opened.  Default is 60 seconds.  See
  144%     set_stream/1 using the _timeout_ option.
  145%
  146%     * keep_alive_timeout(+Seconds)
  147%     Time to keep `Keep alive' connections alive.  Default is
  148%     2 seconds.
  149%
  150%     * local(+Kbytes)
  151%     * global(+Kbytes)
  152%     * trail(+Kbytes)
  153%     Stack sizes to use for the workers.  The default is inherited
  154%     from the `main` thread. As of version 5.9 stacks are no longer
  155%     _pre-allocated_ and the given sizes only act as a limit.
  156%     If you need to control resource usage look at the `spawn`
  157%     option of http_handler/3 and library(thread_pool).
  158%
  159%   A  typical  initialization  for  an    HTTP   server  that  uses
  160%   http_dispatch/1 to relay requests to predicates is:
  161%
  162%     ==
  163%     :- use_module(library(http/thread_httpd)).
  164%     :- use_module(library(http/http_dispatch)).
  165%
  166%     start_server(Port) :-
  167%         http_server(http_dispatch, [port(Port)]).
  168%     ==
  169%
  170%   Note that multiple servers  can  coexist   in  the  same  Prolog
  171%   process. A notable application of this is   to have both an HTTP
  172%   and HTTPS server, where the HTTP   server redirects to the HTTPS
  173%   server for handling sensitive requests.
  174
  175http_server(Goal, M:Options0) :-
  176    option(port(Port), Options0),
  177    !,
  178    make_socket(Port, M:Options0, Options),
  179    create_workers(Options),
  180    create_server(Goal, Port, Options),
  181    print_message(informational,
  182                  httpd_started_server(Port)).
  183http_server(_Goal, _Options) :-
  184    existence_error(option, port).
  185
  186
  187%!  make_socket(?Port, :OptionsIn, -OptionsOut) is det.
  188%
  189%   Create the HTTP server socket and  worker pool queue. OptionsOut
  190%   is quaranteed to hold the option queue(QueueId).
  191%
  192%   @arg   OptionsIn   is   qualified   to     allow   passing   the
  193%   module-sensitive ssl option argument.
  194
  195make_socket(Port, Options0, Options) :-
  196    make_socket_hook(Port, Options0, Options),
  197    !.
  198make_socket(Port, _:Options0, Options) :-
  199    option(tcp_socket(_), Options0),
  200    !,
  201    make_addr_atom('httpd', Port, Queue),
  202    Options = [ queue(Queue)
  203              | Options0
  204              ].
  205make_socket(Port, _:Options0, Options) :-
  206    tcp_socket(Socket),
  207    tcp_setopt(Socket, reuseaddr),
  208    tcp_bind(Socket, Port),
  209    tcp_listen(Socket, 5),
  210    make_addr_atom('httpd', Port, Queue),
  211    Options = [ queue(Queue),
  212                tcp_socket(Socket)
  213              | Options0
  214              ].
  215
  216%!  make_addr_atom(+Scheme, +Address, -Atom) is det.
  217%
  218%   Create an atom that identifies  the   server's  queue and thread
  219%   resources.
  220
  221make_addr_atom(Scheme, Address, Atom) :-
  222    phrase(address_parts(Address), Parts),
  223    atomic_list_concat([Scheme,@|Parts], Atom).
  224
  225address_parts(Atomic) -->
  226    { atomic(Atomic) },
  227    !,
  228    [Atomic].
  229address_parts(Host:Port) -->
  230    !,
  231    address_parts(Host), [:], address_parts(Port).
  232address_parts(ip(A,B,C,D)) -->
  233    !,
  234    [ A, '.', B, '.', C, '.', D ].
  235
  236%!  create_server(:Goal, +Address, +Options) is det.
  237%
  238%   Create the main server thread that runs accept_server/2 to
  239%   listen to new requests.
  240
  241create_server(Goal, Address, Options) :-
  242    get_time(StartTime),
  243    memberchk(queue(Queue), Options),
  244    scheme(Scheme, Options),
  245    address_port(Address, Port),
  246    make_addr_atom(Scheme, Port, Alias),
  247    thread_create(accept_server(Goal, Options), _,
  248                  [ alias(Alias)
  249                  ]),
  250    assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)).
  251
  252scheme(Scheme, Options) :-
  253    option(scheme(Scheme), Options),
  254    !.
  255scheme(Scheme, Options) :-
  256    (   option(ssl(_), Options)
  257    ;   option(ssl_instance(_), Options)
  258    ),
  259    !,
  260    Scheme = https.
  261scheme(http, _).
  262
  263address_port(_Host:Port, Port) :- !.
  264address_port(Port, Port).
  265
  266
  267%!  http_current_server(:Goal, ?Port) is nondet.
  268%
  269%   True if Goal is the goal of a server at Port.
  270%
  271%   @deprecated Use http_server_property(Port, goal(Goal))
  272
  273http_current_server(Goal, Port) :-
  274    current_server(Port, Goal, _, _, _, _).
  275
  276
  277%!  http_server_property(?Port, ?Property) is nondet.
  278%
  279%   True if Property is a property of the HTTP server running at
  280%   Port.  Defined properties are:
  281%
  282%       * goal(:Goal)
  283%       Goal used to start the server. This is often
  284%       http_dispatch/1.
  285%       * scheme(-Scheme)
  286%       Scheme is one of `http` or `https`.
  287%       * start_time(?Time)
  288%       Time-stamp when the server was created.
  289
  290http_server_property(_:Port, Property) :-
  291    integer(Port),
  292    !,
  293    server_property(Property, Port).
  294http_server_property(Port, Property) :-
  295    server_property(Property, Port).
  296
  297server_property(goal(Goal), Port) :-
  298    current_server(Port, Goal, _, _, _, _).
  299server_property(scheme(Scheme), Port) :-
  300    current_server(Port, _, _, _, Scheme, _).
  301server_property(start_time(Time), Port) :-
  302    current_server(Port, _, _, _, _, Time).
  303
  304
  305%!  http_workers(+Port, -Workers) is det.
  306%!  http_workers(+Port, +Workers:int) is det.
  307%
  308%   Query or set the number of workers  for the server at this port.
  309%   The number of workers is dynamically   modified. Setting it to 1
  310%   (one) can be used to profile the worker using tprofile/1.
  311
  312http_workers(Port, Workers) :-
  313    must_be(ground, Port),
  314    current_server(Port, _, _, Queue, _, _),
  315    !,
  316    (   integer(Workers)
  317    ->  resize_pool(Queue, Workers)
  318    ;   findall(W, queue_worker(Queue, W), WorkerIDs),
  319        length(WorkerIDs, Workers)
  320    ).
  321http_workers(Port, _) :-
  322    existence_error(http_server, Port).
  323
  324
  325%!  http_add_worker(+Port, +Options) is det.
  326%
  327%   Add a new worker to  the  HTTP   server  for  port Port. Options
  328%   overrule the default queue  options.   The  following additional
  329%   options are processed:
  330%
  331%     - max_idle_time(+Seconds)
  332%     The created worker will automatically terminate if there is
  333%     no new work within Seconds.
  334
  335http_add_worker(Port, Options) :-
  336    must_be(ground, Port),
  337    current_server(Port, _, _, Queue, _, _),
  338    !,
  339    queue_options(Queue, QueueOptions),
  340    merge_options(Options, QueueOptions, WorkerOptions),
  341    atom_concat(Queue, '_', AliasBase),
  342    create_workers(1, 1, Queue, AliasBase, WorkerOptions).
  343http_add_worker(Port, _) :-
  344    existence_error(http_server, Port).
  345
  346
  347%!  http_current_worker(?Port, ?ThreadID) is nondet.
  348%
  349%   True if ThreadID is the identifier   of  a Prolog thread serving
  350%   Port. This predicate is  motivated  to   allow  for  the  use of
  351%   arbitrary interaction with the worker thread for development and
  352%   statistics.
  353
  354http_current_worker(Port, ThreadID) :-
  355    current_server(Port, _, _, Queue, _, _),
  356    queue_worker(Queue, ThreadID).
  357
  358
  359%!  accept_server(:Goal, +Options)
  360%
  361%   The goal of a small server-thread accepting new requests and
  362%   posting them to the queue of workers.
  363
  364accept_server(Goal, Options) :-
  365    catch(accept_server2(Goal, Options), http_stop, true),
  366    thread_self(Thread),
  367    retract(current_server(_Port, _, Thread, _Queue, _Scheme, _StartTime)),
  368    close_server_socket(Options).
  369
  370accept_server2(Goal, Options) :-
  371    repeat,
  372      (   catch(accept_server3(Goal, Options), E, true)
  373      ->  (   var(E)
  374          ->  fail
  375          ;   accept_rethrow_error(E)
  376          ->  throw(E)
  377          ;   print_message(error, E),
  378              fail
  379          )
  380      ;   print_message(error,      % internal error
  381                        goal_failed(accept_server3(Goal, Options))),
  382          fail
  383      ).
  384
  385accept_server3(Goal, Options) :-
  386    accept_hook(Goal, Options),
  387    !.
  388accept_server3(Goal, Options) :-
  389    memberchk(tcp_socket(Socket), Options),
  390    memberchk(queue(Queue), Options),
  391    tcp_accept(Socket, Client, Peer),
  392    debug(http(connection), 'New HTTP connection from ~p', [Peer]),
  393    http_enough_workers(Queue, accept, Peer),
  394    thread_send_message(Queue, tcp_client(Client, Goal, Peer)).
  395
  396accept_rethrow_error(http_stop).
  397accept_rethrow_error('$aborted').
  398
  399
  400%!  close_server_socket(+Options)
  401%
  402%   Close the server socket.
  403
  404close_server_socket(Options) :-
  405    close_hook(Options),
  406    !.
  407close_server_socket(Options) :-
  408    memberchk(tcp_socket(Socket), Options),
  409    !,
  410    tcp_close_socket(Socket).
  411
  412
  413%!  http_stop_server(+Port, +Options)
  414%
  415%   Stop the indicated  HTTP  server   gracefully.  First  stops all
  416%   workers, then stops the server.
  417%
  418%   @tbd    Realise non-graceful stop
  419
  420http_stop_server(Host:Port, Options) :-         % e.g., localhost:4000
  421    ground(Host),
  422    !,
  423    http_stop_server(Port, Options).
  424http_stop_server(Port, _Options) :-
  425    http_workers(Port, 0),                  % checks Port is ground
  426    current_server(Port, _, Thread, Queue, _Scheme, _Start),
  427    retractall(queue_options(Queue, _)),
  428    thread_signal(Thread, throw(http_stop)),
  429    catch(connect(localhost:Port), _, true),
  430    thread_join(Thread, _),
  431    message_queue_destroy(Queue).
  432
  433connect(Address) :-
  434    setup_call_cleanup(
  435        tcp_socket(Socket),
  436        tcp_connect(Socket, Address),
  437        tcp_close_socket(Socket)).
  438
  439%!  http_enough_workers(+Queue, +Why, +Peer) is det.
  440%
  441%   Check that we have enough workers in our queue. If not, call the
  442%   hook http:schedule_workers/1 to extend  the   worker  pool. This
  443%   predicate can be used by accept_hook/2.
  444
  445http_enough_workers(Queue, Why, Peer) :-
  446    message_queue_property(Queue, size(Size)),
  447    (   enough(Size, Why)
  448    ->  true
  449    ;   current_server(Port, _, _, Queue, _, _),
  450        catch(http:schedule_workers(_{port:Port,
  451                                      reason:Why,
  452                                      peer:Peer,
  453                                      waiting:Size}),
  454              Error,
  455              print_message(error, Error))
  456    ->  true
  457    ;   true
  458    ).
  459
  460enough(0, _).
  461enough(1, keep_alive).                  % I will be ready myself
  462
  463
  464%!  http:schedule_workers(+Data:dict) is semidet.
  465%
  466%   Hook called if a  new  connection   or  a  keep-alive connection
  467%   cannot be scheduled _immediately_ to a worker. Dict contains the
  468%   following keys:
  469%
  470%     - port:Port
  471%     Port number that identifies the server.
  472%     - reason:Reason
  473%     One of =accept= for a new connection or =keep_alive= if a
  474%     worker tries to reschedule itself.
  475%     - peer:Peer
  476%     Identify the other end of the connection
  477%     - waiting:Size
  478%     Number of messages waiting in the queue.
  479%
  480%   Note that, when called with `reason:accept`,   we  are called in
  481%   the time critical main accept loop.   An  implementation of this
  482%   hook shall typically send  the  event   to  thread  dedicated to
  483%   dynamic worker-pool management.
  484%
  485%   @see    http_add_worker/2 may be used to create (temporary) extra
  486%           workers.
  487
  488
  489                 /*******************************
  490                 *    WORKER QUEUE OPERATIONS   *
  491                 *******************************/
  492
  493%!  create_workers(+Options)
  494%
  495%   Create the pool of HTTP worker-threads. Each worker has the
  496%   alias http_worker_N.
  497
  498create_workers(Options) :-
  499    option(workers(N), Options, 5),
  500    option(queue(Queue), Options),
  501    catch(message_queue_create(Queue), _, true),
  502    atom_concat(Queue, '_', AliasBase),
  503    create_workers(1, N, Queue, AliasBase, Options),
  504    assert(queue_options(Queue, Options)).
  505
  506create_workers(I, N, _, _, _) :-
  507    I > N,
  508    !.
  509create_workers(I, N, Queue, AliasBase, Options) :-
  510    gensym(AliasBase, Alias),
  511    thread_create(http_worker(Options), Id,
  512                  [ alias(Alias)
  513                  | Options
  514                  ]),
  515    assertz(queue_worker(Queue, Id)),
  516    I2 is I + 1,
  517    create_workers(I2, N, Queue, AliasBase, Options).
  518
  519
  520%!  resize_pool(+Queue, +Workers) is det.
  521%
  522%   Create or destroy workers. If workers   are  destroyed, the call
  523%   waits until the desired number of waiters is reached.
  524
  525resize_pool(Queue, Size) :-
  526    findall(W, queue_worker(Queue, W), Workers),
  527    length(Workers, Now),
  528    (   Now < Size
  529    ->  queue_options(Queue, Options),
  530        atom_concat(Queue, '_', AliasBase),
  531        I0 is Now+1,
  532        create_workers(I0, Size, Queue, AliasBase, Options)
  533    ;   Now == Size
  534    ->  true
  535    ;   Now > Size
  536    ->  Excess is Now - Size,
  537        thread_self(Me),
  538        forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
  539        forall(between(1, Excess, _), thread_get_message(quitted(_)))
  540    ).
  541
  542
  543%!  http_worker(+Options)
  544%
  545%   Run HTTP worker main loop. Workers   simply  wait until they are
  546%   passed an accepted socket to process  a client.
  547%
  548%   If the message quit(Sender) is read   from the queue, the worker
  549%   stops.
  550
  551http_worker(Options) :-
  552    thread_at_exit(done_worker),
  553    option(queue(Queue), Options),
  554    option(max_idle_time(MaxIdle), Options, infinite),
  555    repeat,
  556      garbage_collect,
  557      trim_stacks,
  558      debug(http(worker), 'Waiting for a job ...', []),
  559      (   MaxIdle == infinite
  560      ->  thread_get_message(Queue, Message)
  561      ;   thread_get_message(Queue, Message, [timeout(MaxIdle)])
  562      ->  true
  563      ;   Message = quit(idle)
  564      ),
  565      debug(http(worker), 'Got job ~p', [Message]),
  566      (   Message = quit(Sender)
  567      ->  !,
  568          thread_self(Self),
  569          thread_detach(Self),
  570          (   Sender == idle
  571          ->  true
  572          ;   thread_send_message(Sender, quitted(Self))
  573          )
  574      ;   open_client(Message, Queue, Goal, In, Out,
  575                      Options, ClientOptions),
  576          (   catch(http_process(Goal, In, Out, ClientOptions),
  577                    Error, true)
  578          ->  true
  579          ;   Error = goal_failed(http_process/4)
  580          ),
  581          (   var(Error)
  582          ->  fail
  583          ;   current_message_level(Error, Level),
  584              print_message(Level, Error),
  585              memberchk(peer(Peer), ClientOptions),
  586              close_connection(Peer, In, Out),
  587              fail
  588          )
  589      ).
  590
  591
  592%!  open_client(+Message, +Queue, -Goal, -In, -Out,
  593%!              +Options, -ClientOptions) is semidet.
  594%
  595%   Opens the connection to the client in a worker from the message
  596%   sent to the queue by accept_server/2.
  597
  598open_client(requeue(In, Out, Goal, ClOpts),
  599            _, Goal, In, Out, Opts, ClOpts) :-
  600    !,
  601    memberchk(peer(Peer), ClOpts),
  602    option(keep_alive_timeout(KeepAliveTMO), Opts, 2),
  603    check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out).
  604open_client(Message, Queue, Goal, In, Out, Opts,
  605            [ pool(client(Queue, Goal, In, Out)),
  606              timeout(Timeout)
  607            | Options
  608            ]) :-
  609    catch(open_client(Message, Goal, In, Out, Options, Opts),
  610          E, report_error(E)),
  611    option(timeout(Timeout), Opts, 60),
  612    (   debugging(http(connection))
  613    ->  memberchk(peer(Peer), Options),
  614        debug(http(connection), 'Opened connection from ~p', [Peer])
  615    ;   true
  616    ).
  617
  618
  619%!  open_client(+Message, +Goal, -In, -Out,
  620%!              -ClientOptions, +Options) is det.
  621
  622open_client(Message, Goal, In, Out, ClientOptions, Options) :-
  623    open_client_hook(Message, Goal, In, Out, ClientOptions, Options),
  624    !.
  625open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out,
  626            [ peer(Peer),
  627              protocol(http)
  628            ], _) :-
  629    tcp_open_socket(Socket, In, Out).
  630
  631report_error(E) :-
  632    print_message(error, E),
  633    fail.
  634
  635
  636%!  check_keep_alive_connection(+In, +TimeOut, +Peer, +In, +Out) is semidet.
  637%
  638%   Wait for the client for at most  TimeOut seconds. Succeed if the
  639%   client starts a new request within   this  time. Otherwise close
  640%   the connection and fail.
  641
  642check_keep_alive_connection(In, TMO, Peer, In, Out) :-
  643    stream_property(In, timeout(Old)),
  644    set_stream(In, timeout(TMO)),
  645    debug(http(keep_alive), 'Waiting for keep-alive ...', []),
  646    catch(peek_code(In, Code), E, true),
  647    (   var(E),                     % no exception
  648        Code \== -1                 % no end-of-file
  649    ->  set_stream(In, timeout(Old)),
  650        debug(http(keep_alive), '\tre-using keep-alive connection', [])
  651    ;   (   Code == -1
  652        ->  debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
  653        ;   debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
  654        ),
  655        close_connection(Peer, In, Out),
  656        fail
  657    ).
  658
  659
  660%!  done_worker
  661%
  662%   Called when worker is terminated  due   to  http_workers/2  or a
  663%   (debugging) exception. In  the   latter  case, recreate_worker/2
  664%   creates a new worker.
  665
  666done_worker :-
  667    thread_self(Self),
  668    thread_property(Self, status(Status)),
  669    retract(queue_worker(Queue, Self)),
  670    (   catch(recreate_worker(Status, Queue), _, fail)
  671    ->  thread_detach(Self),
  672        print_message(informational,
  673                      httpd_restarted_worker(Self))
  674    ;   done_status_message_level(Status, Level),
  675        print_message(Level,
  676                      httpd_stopped_worker(Self, Status))
  677    ).
  678
  679done_status_message_level(true, silent) :- !.
  680done_status_message_level(exception('$aborted'), silent) :- !.
  681done_status_message_level(_, informational).
  682
  683
  684%!  recreate_worker(+Status, +Queue) is semidet.
  685%
  686%   Deal with the possibility  that   threads  are,  during development,
  687%   killed with abort/0. We recreate the worker to avoid that eventually
  688%   we run out of workers. If  we  are   aborted  due  to a halt/0 call,
  689%   thread_create/3 will raise a permission error.
  690%
  691%   The first clause deals with the possibility  that we cannot write to
  692%   `user_error`. This is possible when Prolog   is started as a service
  693%   using some service managers. Would be  nice   if  we  could write an
  694%   error, but where?
  695
  696recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :-
  697    halt(2).
  698recreate_worker(exception(Error), Queue) :-
  699    recreate_on_error(Error),
  700    queue_options(Queue, Options),
  701    atom_concat(Queue, '_', AliasBase),
  702    create_workers(1, 1, Queue, AliasBase, Options).
  703
  704recreate_on_error('$aborted').
  705recreate_on_error(time_limit_exceeded).
  706
  707%       thread_httpd:message_level(+Exception, -Level)
  708%
  709%       Determine the message stream used for  exceptions that may occur
  710%       during server_loop/5. Being multifile, clauses   can be added by
  711%       the   application   to   refine   error   handling.   See   also
  712%       message_hook/3 for further programming error handling.
  713
  714:- multifile
  715    message_level/2.  716
  717message_level(error(io_error(read, _), _),      silent).
  718message_level(error(timeout_error(read, _), _), informational).
  719message_level(keep_alive_timeout,               silent).
  720
  721current_message_level(Term, Level) :-
  722    (   message_level(Term, Level)
  723    ->  true
  724    ;   Level = error
  725    ).
  726
  727
  728%!  http_requeue(+Header)
  729%
  730%   Re-queue a connection to  the  worker   pool.  This  deals  with
  731%   processing additional requests on keep-alive connections.
  732
  733http_requeue(Header) :-
  734    requeue_header(Header, ClientOptions),
  735    memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions),
  736    memberchk(peer(Peer), ClientOptions),
  737    http_enough_workers(Queue, keep_alive, Peer),
  738    thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)),
  739    !.
  740http_requeue(Header) :-
  741    debug(http(error), 'Re-queue failed: ~p', [Header]),
  742    fail.
  743
  744requeue_header([], []).
  745requeue_header([H|T0], [H|T]) :-
  746    requeue_keep(H),
  747    !,
  748    requeue_header(T0, T).
  749requeue_header([_|T0], T) :-
  750    requeue_header(T0, T).
  751
  752requeue_keep(pool(_)).
  753requeue_keep(peer(_)).
  754requeue_keep(protocol(_)).
  755
  756
  757%!  http_process(Message, Queue, +Options)
  758%
  759%   Handle a single client message on the given stream.
  760
  761http_process(Goal, In, Out, Options) :-
  762    debug(http(server), 'Running server goal ~p on ~p -> ~p',
  763          [Goal, In, Out]),
  764    option(timeout(TMO), Options, 60),
  765    set_stream(In, timeout(TMO)),
  766    set_stream(Out, timeout(TMO)),
  767    http_wrapper(Goal, In, Out, Connection,
  768                 [ request(Request)
  769                 | Options
  770                 ]),
  771    next(Connection, Request).
  772
  773next(switch_protocol(SwitchGoal, _SwitchOptions), Request) :-
  774    !,
  775    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  776    (   catch(call(SwitchGoal, In, Out), E,
  777              (   print_message(error, E),
  778                  fail))
  779    ->  true
  780    ;   http_close_connection(Request)
  781    ).
  782next(spawned(ThreadId), _) :-
  783    !,
  784    debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]).
  785next(Connection, Request) :-
  786    downcase_atom(Connection, 'keep-alive'),
  787    http_requeue(Request),
  788    !.
  789next(_, Request) :-
  790    http_close_connection(Request).
  791
  792
  793%!  http_close_connection(+Request)
  794%
  795%   Close connection associated to Request.  See also http_requeue/1.
  796
  797http_close_connection(Request) :-
  798    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  799    memberchk(peer(Peer), Request),
  800    close_connection(Peer, In, Out).
  801
  802%!  close_connection(+Peer, +In, +Out)
  803%
  804%   Closes the connection from the server to the client.  Errors are
  805%   currently silently ignored.
  806
  807close_connection(Peer, In, Out) :-
  808    debug(http(connection), 'Closing connection from ~p', [Peer]),
  809    catch(close(In, [force(true)]), _, true),
  810    catch(close(Out, [force(true)]), _, true).
  811
  812%!  http_spawn(:Goal, +Options) is det.
  813%
  814%   Continue this connection on a  new   thread.  A handler may call
  815%   http_spawn/2 to start a new thread that continues processing the
  816%   current request using Goal. The original   thread returns to the
  817%   worker pool for processing new requests.   Options are passed to
  818%   thread_create/3, except for:
  819%
  820%       * pool(+Pool)
  821%       Interfaces to library(thread_pool), starting the thread
  822%       on the given pool.
  823%
  824%   If a pool does not exist, this predicate calls the multifile
  825%   hook http:create_pool/1 to create it. If this predicate succeeds
  826%   the operation is retried.
  827
  828http_spawn(Goal, Options) :-
  829    select_option(pool(Pool), Options, ThreadOptions),
  830    !,
  831    current_output(CGI),
  832    catch(thread_create_in_pool(Pool,
  833                                wrap_spawned(CGI, Goal), Id,
  834                                [ detached(true)
  835                                | ThreadOptions
  836                                ]),
  837          Error,
  838          true),
  839    (   var(Error)
  840    ->  http_spawned(Id)
  841    ;   Error = error(resource_error(threads_in_pool(_)), _)
  842    ->  throw(http_reply(busy))
  843    ;   Error = error(existence_error(thread_pool, Pool), _),
  844        create_pool(Pool)
  845    ->  http_spawn(Goal, Options)
  846    ;   throw(Error)
  847    ).
  848http_spawn(Goal, Options) :-
  849    current_output(CGI),
  850    thread_create(wrap_spawned(CGI, Goal), Id,
  851                  [ detached(true)
  852                  | Options
  853                  ]),
  854    http_spawned(Id).
  855
  856wrap_spawned(CGI, Goal) :-
  857    set_output(CGI),
  858    http_wrap_spawned(Goal, Request, Connection),
  859    next(Connection, Request).
  860
  861%!  create_pool(+Pool)
  862%
  863%   Lazy  creation  of  worker-pools  for   the  HTTP  server.  This
  864%   predicate calls the hook http:create_pool/1.   If the hook fails
  865%   it creates a default pool of size   10. This should suffice most
  866%   typical usecases. Note that we  get   a  permission error if the
  867%   pool is already created.  We can ignore this.
  868
  869create_pool(Pool) :-
  870    E = error(permission_error(create, thread_pool, Pool), _),
  871    catch(http:create_pool(Pool), E, true).
  872create_pool(Pool) :-
  873    print_message(informational, httpd(created_pool(Pool))),
  874    thread_pool_create(Pool, 10, []).
  875
  876
  877
  878                 /*******************************
  879                 *            MESSAGES          *
  880                 *******************************/
  881
  882:- multifile
  883    prolog:message/3.  884
  885prolog:message(httpd_started_server(Port)) -->
  886    [ 'Started server at '-[] ],
  887    http_root(Port).
  888prolog:message(httpd_stopped_worker(Self, Status)) -->
  889    [ 'Stopped worker ~p: ~p'-[Self, Status] ].
  890prolog:message(httpd_restarted_worker(Self)) -->
  891    [ 'Replaced aborted worker ~p'-[Self] ].
  892prolog:message(httpd(created_pool(Pool))) -->
  893    [ 'Created thread-pool ~p of size 10'-[Pool], nl,
  894      'Create this pool at startup-time or define the hook ', nl,
  895      'http:create_pool/1 to avoid this message and create a ', nl,
  896      'pool that fits the usage-profile.'
  897    ].
  898
  899http_root(Host:Port) -->
  900    !,
  901    http_scheme(Port),
  902    { http_absolute_location(root(.), URI, []) },
  903    [ '~w:~w~w'-[Host, Port, URI] ].
  904http_root(Port) -->
  905    http_scheme(Port),
  906    { http_absolute_location(root(.), URI, []) },
  907    [ 'localhost:~w~w'-[Port, URI] ].
  908
  909http_scheme(Port) -->
  910    { http_server_property(Port, scheme(Scheme)) },
  911    [ '~w://'-[Scheme] ]