View source with formatted comments or as raw
    1:- encoding(utf8).
    2/*  Part of SWI-Prolog
    3
    4    Author:        Torbjörn Lager and Jan Wielemaker
    5    E-mail:        J.Wielemaker@vu.nl
    6    WWW:           http://www.swi-prolog.org
    7    Copyright (C): 2014-2016, Torbjörn Lager,
    8                              VU University Amsterdam
    9    All rights reserved.
   10
   11    Redistribution and use in source and binary forms, with or without
   12    modification, are permitted provided that the following conditions
   13    are met:
   14
   15    1. Redistributions of source code must retain the above copyright
   16       notice, this list of conditions and the following disclaimer.
   17
   18    2. Redistributions in binary form must reproduce the above copyright
   19       notice, this list of conditions and the following disclaimer in
   20       the documentation and/or other materials provided with the
   21       distribution.
   22
   23    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   24    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   25    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   26    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   27    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   28    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   29    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   30    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   31    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   32    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   33    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   34    POSSIBILITY OF SUCH DAMAGE.
   35*/
   36
   37:- module(pengines,
   38          [ pengine_create/1,                   % +Options
   39            pengine_ask/3,                      % +Pengine, :Query, +Options
   40            pengine_next/2,                     % +Pengine. +Options
   41            pengine_stop/2,                     % +Pengine. +Options
   42            pengine_event/2,                    % -Event, +Options
   43            pengine_input/2,                    % +Prompt, -Term
   44            pengine_output/1,                   % +Term
   45            pengine_respond/3,                  % +Pengine, +Input, +Options
   46            pengine_debug/2,                    % +Format, +Args
   47            pengine_self/1,                     % -Pengine
   48            pengine_pull_response/2,            % +Pengine, +Options
   49            pengine_destroy/1,                  % +Pengine
   50            pengine_destroy/2,                  % +Pengine, +Options
   51            pengine_abort/1,                    % +Pengine
   52            pengine_application/1,              % +Application
   53            current_pengine_application/1,      % ?Application
   54            pengine_property/2,                 % ?Pengine, ?Property
   55            pengine_user/1,                     % -User
   56            pengine_event_loop/2,               % :Closure, +Options
   57            pengine_rpc/2,                      % +Server, :Goal
   58            pengine_rpc/3                       % +Server, :Goal, +Options
   59          ]).   60
   61/** <module> Pengines: Web Logic Programming Made Easy
   62
   63The library(pengines) provides an  infrastructure   for  creating Prolog
   64engines in a (remote) pengine server  and accessing these engines either
   65from Prolog or JavaScript.
   66
   67@author Torbjörn Lager and Jan Wielemaker
   68*/
   69
   70:- use_module(library(http/http_dispatch)).   71:- use_module(library(http/http_parameters)).   72:- use_module(library(http/http_client)).   73:- use_module(library(http/http_json)).   74:- use_module(library(http/http_open)).   75:- use_module(library(http/http_stream)).   76:- use_module(library(http/http_wrapper)).   77:- use_module(library(http/http_cors)).   78:- use_module(library(thread_pool)).   79:- use_module(library(broadcast)).   80:- use_module(library(uri)).   81:- use_module(library(filesex)).   82:- use_module(library(time)).   83:- use_module(library(lists)).   84:- use_module(library(charsio)).   85:- use_module(library(apply)).   86:- use_module(library(aggregate)).   87:- use_module(library(option)).   88:- use_module(library(settings)).   89:- use_module(library(debug)).   90:- use_module(library(error)).   91:- use_module(library(sandbox)).   92:- use_module(library(modules)).   93:- use_module(library(term_to_json)).   94:- if(exists_source(library(uuid))).   95:- use_module(library(uuid)).   96:- endif.   97
   98
   99:- meta_predicate
  100    pengine_create(:),
  101    pengine_rpc(+, +, :),
  102    pengine_event_loop(1, +).  103
  104:- multifile
  105    write_result/3,                 % +Format, +Event, +Dict
  106    event_to_json/3,                % +Event, -JSON, +Format
  107    prepare_module/3,               % +Module, +Application, +Options
  108    prepare_goal/3,                 % +GoalIn, -GoalOut, +Options
  109    authentication_hook/3,          % +Request, +Application, -User
  110    not_sandboxed/2.                % +User, +App
  111
  112:- predicate_options(pengine_create/1, 1,
  113                     [ id(-atom),
  114                       alias(atom),
  115                       application(atom),
  116                       destroy(boolean),
  117                       server(atom),
  118                       ask(compound),
  119                       template(compound),
  120                       chunk(integer),
  121                       bindings(list),
  122                       src_list(list),
  123                       src_text(any),           % text
  124                       src_url(atom),
  125                       src_predicates(list)
  126                     ]).  127:- predicate_options(pengine_ask/3, 3,
  128                     [ template(any),
  129                       chunk(integer),
  130                       bindings(list)
  131                     ]).  132:- predicate_options(pengine_next/2, 2,
  133                     [ chunk(integer),
  134                       pass_to(pengine_send/3, 3)
  135                     ]).  136:- predicate_options(pengine_stop/2, 2,
  137                     [ pass_to(pengine_send/3, 3)
  138                     ]).  139:- predicate_options(pengine_respond/3, 2,
  140                     [ pass_to(pengine_send/3, 3)
  141                     ]).  142:- predicate_options(pengine_rpc/3, 3,
  143                     [ chunk(integer),
  144                       pass_to(pengine_create/1, 1)
  145                     ]).  146:- predicate_options(pengine_send/3, 3,
  147                     [ delay(number)
  148                     ]).  149:- predicate_options(pengine_event/2, 2,
  150                     [ pass_to(thread_get_message/3, 3)
  151                     ]).  152:- predicate_options(pengine_pull_response/2, 2,
  153                     [ pass_to(http_open/3, 3)
  154                     ]).  155:- predicate_options(pengine_event_loop/2, 2,
  156                     []).                       % not yet implemented
  157
  158% :- debug(pengine(transition)).
  159:- debug(pengine(debug)).               % handle pengine_debug in pengine_rpc/3.
  160
  161goal_expansion(random_delay, Expanded) :-
  162    (   debugging(pengine(delay))
  163    ->  Expanded = do_random_delay
  164    ;   Expanded = true
  165    ).
  166
  167do_random_delay :-
  168    Delay is random(20)/1000,
  169    sleep(Delay).
  170
  171:- meta_predicate                       % internal meta predicates
  172    solve(+, ?, 0, +),
  173    findnsols_no_empty(+, ?, 0, -),
  174    pengine_event_loop(+, 1, +).  175
  176/**  pengine_create(:Options) is det.
  177
  178    Creates a new pengine. Valid options are:
  179
  180    * id(-ID)
  181      ID gets instantiated to the id of the created pengine.  ID is
  182      atomic.
  183
  184    * alias(+Name)
  185      The pengine is named Name (an atom). A slave pengine (child) can
  186      subsequently be referred to by this name.
  187
  188    * application(+Application)
  189      Application in which the pengine runs.  See pengine_application/1.
  190
  191    * server(+URL)
  192      The pengine will run in (and in the Prolog context of) the pengine
  193      server located at URL.
  194
  195    * src_list(+List_of_clauses)
  196      Inject a list of Prolog clauses into the pengine.
  197
  198    * src_text(+Atom_or_string)
  199      Inject the clauses specified by a source text into the pengine.
  200
  201    * src_url(+URL)
  202      Inject the clauses specified in the file located at URL into the
  203      pengine.
  204
  205    * src_predicates(+List)
  206      Send the local predicates denoted by List to the remote pengine.
  207      List is a list of predicate indicators.
  208
  209Remaining  options  are  passed  to  http_open/3  (meaningful  only  for
  210non-local pengines) and thread_create/3. Note   that for thread_create/3
  211only options changing the stack-sizes can be used. In particular, do not
  212pass the detached or alias options..
  213
  214Successful creation of a pengine will return an _event term_ of the
  215following form:
  216
  217    * create(ID, Term)
  218      ID is the id of the pengine that was created.
  219      Term is not used at the moment.
  220
  221An error will be returned if the pengine could not be created:
  222
  223    * error(ID, Term)
  224      ID is invalid, since no pengine was created.
  225      Term is the exception's error term.
  226*/
  227
  228
  229pengine_create(M:Options0) :-
  230    translate_local_sources(Options0, Options, M),
  231    (   select_option(server(BaseURL), Options, RestOptions)
  232    ->  remote_pengine_create(BaseURL, RestOptions)
  233    ;   local_pengine_create(Options)
  234    ).
  235
  236%!  translate_local_sources(+OptionsIn, -Options, +Module) is det.
  237%
  238%   Translate  the  `src_predicates`  and  `src_list`  options  into
  239%   `src_text`. We need to do that   anyway for remote pengines. For
  240%   local pengines, we could avoid  this   step,  but  there is very
  241%   little point in transferring source to a local pengine anyway as
  242%   local pengines can access any  Prolog   predicate  that you make
  243%   visible to the application.
  244%
  245%   Multiple sources are concatenated  to  end   up  with  a  single
  246%   src_text option.
  247
  248translate_local_sources(OptionsIn, Options, Module) :-
  249    translate_local_sources(OptionsIn, Sources, Options2, Module),
  250    (   Sources == []
  251    ->  Options = Options2
  252    ;   Sources = [Source]
  253    ->  Options = [src_text(Source)|Options2]
  254    ;   atomics_to_string(Sources, Source)
  255    ->  Options = [src_text(Source)|Options2]
  256    ).
  257
  258translate_local_sources([], [], [], _).
  259translate_local_sources([H0|T], [S0|S], Options, M) :-
  260    nonvar(H0),
  261    translate_local_source(H0, S0, M),
  262    !,
  263    translate_local_sources(T, S, Options, M).
  264translate_local_sources([H|T0], S, [H|T], M) :-
  265    translate_local_sources(T0, S, T, M).
  266
  267translate_local_source(src_predicates(PIs), Source, M) :-
  268    must_be(list, PIs),
  269    with_output_to(string(Source),
  270                   maplist(listing(M), PIs)).
  271translate_local_source(src_list(Terms), Source, _) :-
  272    must_be(list, Terms),
  273    with_output_to(string(Source),
  274                   forall(member(Term, Terms),
  275                          format('~k .~n', [Term]))).
  276translate_local_source(src_text(Source), Source, _).
  277
  278listing(M, PI) :-
  279    listing(M:PI).
  280
  281/**  pengine_send(+NameOrID, +Term) is det
  282
  283Same as pengine_send(NameOrID, Term, []).
  284*/
  285
  286pengine_send(Target, Event) :-
  287    pengine_send(Target, Event, []).
  288
  289
  290/**  pengine_send(+NameOrID, +Term, +Options) is det
  291
  292Succeeds immediately and  places  Term  in   the  queue  of  the pengine
  293NameOrID. Options is a list of options:
  294
  295   * delay(+Time)
  296     The actual sending is delayed by Time seconds. Time is an integer
  297     or a float.
  298
  299Any remaining options are passed to http_open/3.
  300*/
  301
  302pengine_send(Target, Event, Options) :-
  303    must_be(atom, Target),
  304    pengine_send2(Target, Event, Options).
  305
  306pengine_send2(self, Event, Options) :-
  307    !,
  308    thread_self(Queue),
  309    delay_message(queue(Queue), Event, Options).
  310pengine_send2(Name, Event, Options) :-
  311    child(Name, Target),
  312    !,
  313    delay_message(pengine(Target), Event, Options).
  314pengine_send2(Target, Event, Options) :-
  315    delay_message(pengine(Target), Event, Options).
  316
  317delay_message(Target, Event, Options) :-
  318    option(delay(Delay), Options),
  319    !,
  320    alarm(Delay,
  321          send_message(Target, Event, Options),
  322          _AlarmID,
  323          [remove(true)]).
  324delay_message(Target, Event, Options) :-
  325    random_delay,
  326    send_message(Target, Event, Options).
  327
  328send_message(queue(Queue), Event, _) :-
  329    thread_send_message(Queue, pengine_request(Event)).
  330send_message(pengine(Pengine), Event, Options) :-
  331    (   pengine_remote(Pengine, Server)
  332    ->  remote_pengine_send(Server, Pengine, Event, Options)
  333    ;   pengine_thread(Pengine, Thread)
  334    ->  thread_send_message(Thread, pengine_request(Event))
  335    ;   existence_error(pengine, Pengine)
  336    ).
  337
  338%!  pengine_request(-Request) is det.
  339%
  340%   To be used by a  pengine  to   wait  for  the next request. Such
  341%   messages are placed in the queue by pengine_send/2.
  342
  343pengine_request(Request) :-
  344    pengine_self(Self),
  345    get_pengine_application(Self, Application),
  346    setting(Application:idle_limit, IdleLimit),
  347    thread_self(Me),
  348    (   thread_get_message(Me, pengine_request(Request), [timeout(IdleLimit)])
  349    ->  true
  350    ;   Request = destroy
  351    ).
  352
  353
  354%!  pengine_reply(+Event) is det.
  355%!  pengine_reply(+Queue, +Event) is det.
  356%
  357%   Reply Event to the parent of the   current  Pengine or the given
  358%   Queue.  Such  events  are  read   by    the   other   side  with
  359%   pengine_event/1.
  360%
  361%   If the message cannot be sent within the `idle_limit` setting of
  362%   the pengine, abort the pengine.
  363
  364pengine_reply(Event) :-
  365    pengine_parent(Queue),
  366    pengine_reply(Queue, Event).
  367
  368pengine_reply(_Queue, _Event0) :-
  369    nb_current(pengine_idle_limit_exceeded, true),
  370    !.
  371pengine_reply(Queue, Event0) :-
  372    arg(1, Event0, ID),
  373    wrap_first_answer(ID, Event0, Event),
  374    random_delay,
  375    debug(pengine(event), 'Reply to ~p: ~p', [Queue, Event]),
  376    (   pengine_self(ID)
  377    ->  get_pengine_application(ID, Application),
  378        setting(Application:idle_limit, IdleLimit),
  379        debug(pengine(reply), 'Sending ~p, timout: ~q', [Event, IdleLimit]),
  380        (   thread_send_message(Queue, pengine_event(ID, Event),
  381                                [ timeout(IdleLimit)
  382                                ])
  383        ->  true
  384        ;   thread_self(Me),
  385            debug(pengine(reply), 'pengine_reply: timeout for ~q (thread ~q)',
  386                  [ID, Me]),
  387            nb_setval(pengine_idle_limit_exceeded, true),
  388            thread_detach(Me),
  389            abort
  390        )
  391    ;   thread_send_message(Queue, pengine_event(ID, Event))
  392    ).
  393
  394wrap_first_answer(ID, Event0, CreateEvent) :-
  395    wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)]),
  396    arg(1, CreateEvent, ID),
  397    !,
  398    retract(wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)])).
  399wrap_first_answer(_ID, Event, Event).
  400
  401
  402empty_queue :-
  403    pengine_parent(Queue),
  404    empty_queue(Queue, 0, Discarded),
  405    debug(pengine(abort), 'Abort: discarded ~D messages', [Discarded]).
  406
  407empty_queue(Queue, C0, C) :-
  408    thread_get_message(Queue, _Term, [timeout(0)]),
  409    !,
  410    C1 is C0+1,
  411    empty_queue(Queue, C1, C).
  412empty_queue(_, C, C).
  413
  414
  415/** pengine_ask(+NameOrID, @Query, +Options) is det
  416
  417Asks pengine NameOrID a query Query.
  418
  419Options is a list of options:
  420
  421    * template(+Template)
  422      Template is a variable (or a term containing variables) shared
  423      with the query. By default, the template is identical to the
  424      query.
  425
  426    * chunk(+Integer)
  427      Retrieve solutions in chunks of Integer rather than one by one. 1
  428      means no chunking (default). Other integers indicate the maximum
  429      number of solutions to retrieve in one chunk.
  430
  431    * bindings(+Bindings)
  432      Sets the global variable '$variable_names' to a list of
  433      `Name = Var` terms, providing access to the actual variable
  434      names.
  435
  436Any remaining options are passed to pengine_send/3.
  437
  438Note that the predicate pengine_ask/3 is deterministic, even for queries
  439that have more than one solution. Also,  the variables in Query will not
  440be bound. Instead, results will  be  returned   in  the  form  of _event
  441terms_.
  442
  443    * success(ID, Terms, Projection, Time, More)
  444      ID is the id of the pengine that succeeded in solving the query.
  445      Terms is a list holding instantiations of `Template`.  Projection
  446      is a list of variable names that should be displayed. Time is
  447      the CPU time used to produce the results and finally, More
  448      is either `true` or `false`, indicating whether we can expect the
  449      pengine to be able to return more solutions or not, would we call
  450      pengine_next/2.
  451
  452    * failure(ID)
  453      ID is the id of the pengine that failed for lack of a solutions.
  454
  455    * error(ID, Term)
  456      ID is the id of the pengine throwing the exception.
  457      Term is the exception's error term.
  458
  459    * output(ID, Term)
  460      ID is the id of a pengine running the query that called
  461      pengine_output/1. Term is the term that was passed in the first
  462      argument of pengine_output/1 when it was called.
  463
  464    * prompt(ID, Term)
  465      ID is the id of the pengine that called pengine_input/2 and Term is
  466      the prompt.
  467
  468Defined in terms of pengine_send/3, like so:
  469
  470==
  471pengine_ask(ID, Query, Options) :-
  472    partition(pengine_ask_option, Options, AskOptions, SendOptions),
  473    pengine_send(ID, ask(Query, AskOptions), SendOptions).
  474==
  475*/
  476
  477pengine_ask(ID, Query, Options) :-
  478    partition(pengine_ask_option, Options, AskOptions, SendOptions),
  479    pengine_send(ID, ask(Query, AskOptions), SendOptions).
  480
  481
  482pengine_ask_option(template(_)).
  483pengine_ask_option(chunk(_)).
  484pengine_ask_option(bindings(_)).
  485pengine_ask_option(breakpoints(_)).
  486
  487
  488/** pengine_next(+NameOrID, +Options) is det
  489
  490Asks pengine NameOrID for the  next  solution   to  a  query  started by
  491pengine_ask/3. Defined options are:
  492
  493    * chunk(+Count)
  494    Modify the chunk-size to Count before asking the next set of
  495    solutions.
  496
  497Remaining  options  are  passed  to    pengine_send/3.   The  result  of
  498re-executing the current goal is returned  to the caller's message queue
  499in the form of _event terms_.
  500
  501    * success(ID, Terms, Projection, Time, More)
  502      See pengine_ask/3.
  503
  504    * failure(ID)
  505      ID is the id of the pengine that failed for lack of more solutions.
  506
  507    * error(ID, Term)
  508      ID is the id of the pengine throwing the exception.
  509      Term is the exception's error term.
  510
  511    * output(ID, Term)
  512      ID is the id of a pengine running the query that called
  513      pengine_output/1. Term is the term that was passed in the first
  514      argument of pengine_output/1 when it was called.
  515
  516    * prompt(ID, Term)
  517      ID is the id of the pengine that called pengine_input/2 and Term
  518      is the prompt.
  519
  520Defined in terms of pengine_send/3, as follows:
  521
  522==
  523pengine_next(ID, Options) :-
  524    pengine_send(ID, next, Options).
  525==
  526
  527*/
  528
  529pengine_next(ID, Options) :-
  530    select_option(chunk(Count), Options, Options1),
  531    !,
  532    pengine_send(ID, next(Count), Options1).
  533pengine_next(ID, Options) :-
  534    pengine_send(ID, next, Options).
  535
  536
  537/** pengine_stop(+NameOrID, +Options) is det
  538
  539Tells pengine NameOrID to stop looking  for   more  solutions to a query
  540started by pengine_ask/3. Options are passed to pengine_send/3.
  541
  542Defined in terms of pengine_send/3, like so:
  543
  544==
  545pengine_stop(ID, Options) :-
  546    pengine_send(ID, stop, Options).
  547==
  548*/
  549
  550pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
  551
  552
  553/** pengine_abort(+NameOrID) is det
  554
  555Aborts the running query. The pengine goes   back  to state `2', waiting
  556for new queries.
  557
  558@see pengine_destroy/1.
  559*/
  560
  561pengine_abort(Name) :-
  562    (   child(Name, Pengine)
  563    ->  true
  564    ;   Pengine = Name
  565    ),
  566    (   pengine_remote(Pengine, Server)
  567    ->  remote_pengine_abort(Server, Pengine, [])
  568    ;   pengine_thread(Pengine, Thread),
  569        debug(pengine(abort), 'Signalling thread ~p', [Thread]),
  570        catch(thread_signal(Thread, throw(abort_query)), _, true)
  571    ).
  572
  573
  574/** pengine_destroy(+NameOrID) is det.
  575    pengine_destroy(+NameOrID, +Options) is det.
  576
  577Destroys the pengine NameOrID.  With the option force(true), the pengine
  578is killed using abort/0 and pengine_destroy/2 succeeds.
  579*/
  580
  581pengine_destroy(ID) :-
  582    pengine_destroy(ID, []).
  583
  584pengine_destroy(Name, Options) :-
  585    (   child(Name, ID)
  586    ->  true
  587    ;   ID = Name
  588    ),
  589    option(force(true), Options),
  590    !,
  591    (   pengine_thread(ID, Thread)
  592    ->  catch(thread_signal(Thread, abort),
  593              error(existence_error(thread, _), _), true)
  594    ;   true
  595    ).
  596pengine_destroy(ID, _) :-
  597    catch(pengine_send(ID, destroy),
  598          error(existence_error(pengine, ID), _),
  599          retractall(child(_,ID))).
  600
  601
  602/*================= pengines administration =======================
  603*/
  604
  605%!  current_pengine(?Id, ?Parent, ?Location)
  606%
  607%   Dynamic predicate that registers our known pengines.  Id is
  608%   an atomic unique datatype.  Parent is the id of our parent
  609%   pengine.  Location is one of
  610%
  611%     - thread(ThreadId)
  612%     - remote(URL)
  613
  614:- dynamic
  615    current_pengine/6,              % Id, ParentId, Thread, URL, App, Destroy
  616    pengine_queue/4,                % Id, Queue, TimeOut, Time
  617    output_queue/3,                 % Id, Queue, Time
  618    pengine_user/2,                 % Id, User
  619    pengine_data/2.                 % Id, Data
  620:- volatile
  621    current_pengine/6,
  622    pengine_queue/4,
  623    output_queue/3,
  624    pengine_user/2,
  625    pengine_data/2.  626
  627:- thread_local
  628    child/2.                        % ?Name, ?Child
  629
  630%!  pengine_register_local(+Id, +Thread, +Queue, +URL, +App, +Destroy) is det.
  631%!  pengine_register_remote(+Id, +URL, +Queue, +App, +Destroy) is det.
  632%!  pengine_unregister(+Id) is det.
  633
  634pengine_register_local(Id, Thread, Queue, URL, Application, Destroy) :-
  635    asserta(current_pengine(Id, Queue, Thread, URL, Application, Destroy)).
  636
  637pengine_register_remote(Id, URL, Application, Destroy) :-
  638    thread_self(Queue),
  639    asserta(current_pengine(Id, Queue, 0, URL, Application, Destroy)).
  640
  641%!  pengine_unregister(+Id)
  642%
  643%   Called by the pengine thread  destruction.   If  we are a remote
  644%   pengine thread, our URL  equals  =http=   and  the  queue is the
  645%   message queue used to send events to the HTTP workers.
  646
  647pengine_unregister(Id) :-
  648    thread_self(Me),
  649    (   current_pengine(Id, Queue, Me, http, _, _)
  650    ->  with_mutex(pengine, sync_destroy_queue_from_pengine(Id, Queue))
  651    ;   true
  652    ),
  653    retractall(current_pengine(Id, _, Me, _, _, _)),
  654    retractall(pengine_user(Id, _)),
  655    retractall(pengine_data(Id, _)).
  656
  657pengine_unregister_remote(Id) :-
  658    retractall(current_pengine(Id, _Parent, 0, _, _, _)).
  659
  660%!  pengine_self(-Id) is det.
  661%
  662%   True if the current thread is a pengine with Id.
  663
  664pengine_self(Id) :-
  665    thread_self(Thread),
  666    current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy).
  667
  668pengine_parent(Parent) :-
  669    nb_getval(pengine_parent, Parent).
  670
  671pengine_thread(Pengine, Thread) :-
  672    current_pengine(Pengine, _Parent, Thread, _URL, _Application, _Destroy),
  673    Thread \== 0,
  674    !.
  675
  676pengine_remote(Pengine, URL) :-
  677    current_pengine(Pengine, _Parent, 0, URL, _Application, _Destroy).
  678
  679get_pengine_application(Pengine, Application) :-
  680    current_pengine(Pengine, _Parent, _, _URL, Application, _Destroy),
  681    !.
  682
  683get_pengine_module(Pengine, Pengine).
  684
  685:- if(current_predicate(uuid/2)).  686pengine_uuid(Id) :-
  687    uuid(Id, [version(4)]).             % Version 4 is random.
  688:- else.  689:- use_module(library(random)).  690pengine_uuid(Id) :-
  691    Max is 1<<128,
  692    random_between(0, Max, Num),
  693    atom_number(Id, Num).
  694:- endif.  695
  696/** pengine_application(+Application) is det.
  697
  698Directive that must be used to declare  a pengine application
  699module. The module may not  be  associated   to  any  file.  The default
  700application is =pengine_sandbox=.  The  example   below  creates  a  new
  701application =address_book= and imports the  API   defined  in the module
  702file =adress_book_api.pl= into the application.
  703
  704  ==
  705  :- pengine_application(address_book).
  706  :- use_module(address_book:adress_book_api).
  707  ==
  708*/
  709
  710pengine_application(Application) :-
  711    throw(error(context_error(nodirective,
  712                             pengine_application(Application)), _)).
  713
  714:- multifile
  715    system:term_expansion/2,
  716    current_application/1.  717
  718%!  current_pengine_application(?Application) is nondet.
  719%
  720%   True when Application is a currently defined application.
  721%
  722%   @see pengine_application/1
  723
  724current_pengine_application(Application) :-
  725    current_application(Application).
  726
  727
  728% Default settings for all applications
  729
  730:- setting(thread_pool_size, integer, 100,
  731           'Maximum number of pengines this application can run.').  732:- setting(thread_pool_stacks, list(compound), [],
  733           'Maximum stack sizes for pengines this application can run.').  734:- setting(slave_limit, integer, 3,
  735           'Maximum number of slave pengines a master pengine can create.').  736:- setting(time_limit, number, 300,
  737           'Maximum time to wait for output').  738:- setting(idle_limit, number, 300,
  739           'Pengine auto-destroys when idle for this time').  740:- setting(safe_goal_limit, number, 10,
  741           'Maximum time to try proving safity of the goal').  742:- setting(program_space, integer, 100_000_000,
  743           'Maximum memory used by predicates').  744:- setting(allow_from, list(atom), [*],
  745           'IP addresses from which remotes are allowed to connect').  746:- setting(deny_from, list(atom), [],
  747           'IP addresses from which remotes are NOT allowed to connect').  748:- setting(debug_info, boolean, false,
  749           'Keep information to support source-level debugging').  750
  751
  752system:term_expansion((:- pengine_application(Application)), Expanded) :-
  753    must_be(atom, Application),
  754    (   module_property(Application, file(_))
  755    ->  permission_error(create, pengine_application, Application)
  756    ;   true
  757    ),
  758    expand_term((:- setting(Application:thread_pool_size, integer,
  759                            setting(pengines:thread_pool_size),
  760                            'Maximum number of pengines this \c
  761                            application can run.')),
  762                ThreadPoolSizeSetting),
  763    expand_term((:- setting(Application:thread_pool_stacks, list(compound),
  764                            setting(pengines:thread_pool_stacks),
  765                            'Maximum stack sizes for pengines \c
  766                            this application can run.')),
  767                ThreadPoolStacksSetting),
  768    expand_term((:- setting(Application:slave_limit, integer,
  769                            setting(pengines:slave_limit),
  770                            'Maximum number of local slave pengines \c
  771                            a master pengine can create.')),
  772                SlaveLimitSetting),
  773    expand_term((:- setting(Application:time_limit, number,
  774                            setting(pengines:time_limit),
  775                            'Maximum time to wait for output')),
  776                TimeLimitSetting),
  777    expand_term((:- setting(Application:idle_limit, number,
  778                            setting(pengines:idle_limit),
  779                            'Pengine auto-destroys when idle for this time')),
  780                IdleLimitSetting),
  781    expand_term((:- setting(Application:safe_goal_limit, number,
  782                            setting(pengines:safe_goal_limit),
  783                            'Maximum time to try proving safity of the goal')),
  784                SafeGoalLimitSetting),
  785    expand_term((:- setting(Application:program_space, integer,
  786                            setting(pengines:program_space),
  787                            'Maximum memory used by predicates')),
  788                ProgramSpaceSetting),
  789    expand_term((:- setting(Application:allow_from, list(atom),
  790                            setting(pengines:allow_from),
  791                            'IP addresses from which remotes are allowed \c
  792                            to connect')),
  793                AllowFromSetting),
  794    expand_term((:- setting(Application:deny_from, list(atom),
  795                            setting(pengines:deny_from),
  796                            'IP addresses from which remotes are NOT \c
  797                            allowed to connect')),
  798                DenyFromSetting),
  799    expand_term((:- setting(Application:debug_info, boolean,
  800                            setting(pengines:debug_info),
  801                            'Keep information to support source-level \c
  802                            debugging')),
  803                DebugInfoSetting),
  804    flatten([ pengines:current_application(Application),
  805              ThreadPoolSizeSetting,
  806              ThreadPoolStacksSetting,
  807              SlaveLimitSetting,
  808              TimeLimitSetting,
  809              IdleLimitSetting,
  810              SafeGoalLimitSetting,
  811              ProgramSpaceSetting,
  812              AllowFromSetting,
  813              DenyFromSetting,
  814              DebugInfoSetting
  815            ], Expanded).
  816
  817% Register default application
  818
  819:- pengine_application(pengine_sandbox).  820
  821
  822/** pengine_property(?Pengine, ?Property) is nondet.
  823
  824True when Property is a property of   the  given Pengine. Enumerates all
  825pengines  that  are  known  to  the   calling  Prolog  process.  Defined
  826properties are:
  827
  828  * self(ID)
  829    Identifier of the pengine.  This is the same as the first argument,
  830    and can be used to enumerate all known pengines.
  831  * alias(Name)
  832    Name is the alias name of the pengine, as provided through the
  833    `alias` option when creating the pengine.
  834  * thread(Thread)
  835    If the pengine is a local pengine, Thread is the Prolog thread
  836    identifier of the pengine.
  837  * remote(Server)
  838    If the pengine is remote, the URL of the server.
  839  * application(Application)
  840    Pengine runs the given application
  841  * module(Module)
  842    Temporary module used for running the Pengine.
  843  * destroy(Destroy)
  844    Destroy is =true= if the pengines is destroyed automatically
  845    after completing the query.
  846  * parent(Queue)
  847    Message queue to which the (local) pengine reports.
  848  * source(?SourceID, ?Source)
  849    Source is the source code with the given SourceID. May be present if
  850    the setting `debug_info` is present.
  851*/
  852
  853
  854pengine_property(Id, Prop) :-
  855    nonvar(Id), nonvar(Prop),
  856    pengine_property2(Id, Prop),
  857    !.
  858pengine_property(Id, Prop) :-
  859    pengine_property2(Id, Prop).
  860
  861pengine_property2(Id, self(Id)) :-
  862    current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
  863pengine_property2(Id, module(Id)) :-
  864    current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
  865pengine_property2(Id, alias(Alias)) :-
  866    child(Alias, Id),
  867    Alias \== Id.
  868pengine_property2(Id, thread(Thread)) :-
  869    current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy),
  870    Thread \== 0.
  871pengine_property2(Id, remote(Server)) :-
  872    current_pengine(Id, _Parent, 0, Server, _Application, _Destroy).
  873pengine_property2(Id, application(Application)) :-
  874    current_pengine(Id, _Parent, _Thread, _Server, Application, _Destroy).
  875pengine_property2(Id, destroy(Destroy)) :-
  876    current_pengine(Id, _Parent, _Thread, _Server, _Application, Destroy).
  877pengine_property2(Id, parent(Parent)) :-
  878    current_pengine(Id, Parent, _Thread, _URL, _Application, _Destroy).
  879pengine_property2(Id, source(SourceID, Source)) :-
  880    pengine_data(Id, source(SourceID, Source)).
  881
  882/** pengine_output(+Term) is det
  883
  884Sends Term to the parent pengine or thread.
  885*/
  886
  887pengine_output(Term) :-
  888    pengine_self(Me),
  889    pengine_reply(output(Me, Term)).
  890
  891
  892/** pengine_debug(+Format, +Args) is det
  893
  894Create a message using format/3 from Format   and  Args and send this to
  895the    client.    The    default    JavaScript    client    will    call
  896=|console.log(Message)|=  if  there  is   a    console.   The  predicate
  897pengine_rpc/3 calls debug(pengine(debug), '~w',   [Message]).  The debug
  898topic pengine(debug) is enabled by default.
  899
  900@see debug/1 and nodebug/1 for controlling the pengine(debug) topic
  901@see format/2 for format specifications
  902*/
  903
  904pengine_debug(Format, Args) :-
  905    pengine_parent(Queue),
  906    pengine_self(Self),
  907    catch(safe_goal(format(atom(_), Format, Args)), E, true),
  908    (   var(E)
  909    ->  format(atom(Message), Format, Args)
  910    ;   message_to_string(E, Message)
  911    ),
  912    pengine_reply(Queue, debug(Self, Message)).
  913
  914
  915/*================= Local pengine =======================
  916*/
  917
  918%!  local_pengine_create(+Options)
  919%
  920%   Creates  a  local   Pengine,   which    is   a   thread  running
  921%   pengine_main/2.  It maintains two predicates:
  922%
  923%     - The global dynamic predicate id/2 relates Pengines to their
  924%       childs.
  925%     - The local predicate id/2 maps named childs to their ids.
  926
  927local_pengine_create(Options) :-
  928    thread_self(Self),
  929    option(application(Application), Options, pengine_sandbox),
  930    create(Self, Child, Options, local, Application),
  931    option(alias(Name), Options, Child),
  932    assert(child(Name, Child)).
  933
  934
  935%!  thread_pool:create_pool(+Application) is det.
  936%
  937%   On demand creation of a thread pool for a pengine application.
  938
  939thread_pool:create_pool(Application) :-
  940    current_application(Application),
  941    setting(Application:thread_pool_size, Size),
  942    setting(Application:thread_pool_stacks, Stacks),
  943    thread_pool_create(Application, Size, Stacks).
  944
  945%!  create(+Queue, -Child, +Options, +URL, +Application) is det.
  946%
  947%   Create a new pengine thread.
  948%
  949%   @arg Queue is the queue (or thread handle) to report to
  950%   @arg Child is the identifier of the created pengine.
  951%   @arg URL is one of =local= or =http=
  952
  953create(Queue, Child, Options, local, Application) :-
  954    !,
  955    pengine_child_id(Child),
  956    create0(Queue, Child, Options, local, Application).
  957create(Queue, Child, Options, URL, Application) :-
  958    pengine_child_id(Child),
  959    catch(create0(Queue, Child, Options, URL, Application),
  960          Error,
  961          create_error(Queue, Child, Error)).
  962
  963pengine_child_id(Child) :-
  964    (   nonvar(Child)
  965    ->  true
  966    ;   pengine_uuid(Child)
  967    ).
  968
  969create_error(Queue, Child, Error) :-
  970    pengine_reply(Queue, error(Child, Error)).
  971
  972create0(Queue, Child, Options, URL, Application) :-
  973    (  current_application(Application)
  974    -> true
  975    ;  existence_error(pengine_application, Application)
  976    ),
  977    (   URL \== http                    % pengine is _not_ a child of the
  978                                        % HTTP server thread
  979    ->  aggregate_all(count, child(_,_), Count),
  980        setting(Application:slave_limit, Max),
  981        (   Count >= Max
  982        ->  throw(error(resource_error(max_pengines), _))
  983        ;   true
  984        )
  985    ;   true
  986    ),
  987    partition(pengine_create_option, Options, PengineOptions, RestOptions),
  988    thread_create_in_pool(
  989        Application,
  990        pengine_main(Queue, PengineOptions, Application), ChildThread,
  991        [ at_exit(pengine_done)
  992        | RestOptions
  993        ]),
  994    option(destroy(Destroy), PengineOptions, true),
  995    pengine_register_local(Child, ChildThread, Queue, URL, Application, Destroy),
  996    thread_send_message(ChildThread, pengine_registered(Child)),
  997    (   option(id(Id), Options)
  998    ->  Id = Child
  999    ;   true
 1000    ).
 1001
 1002pengine_create_option(src_text(_)).
 1003pengine_create_option(src_url(_)).
 1004pengine_create_option(application(_)).
 1005pengine_create_option(destroy(_)).
 1006pengine_create_option(ask(_)).
 1007pengine_create_option(template(_)).
 1008pengine_create_option(bindings(_)).
 1009pengine_create_option(chunk(_)).
 1010pengine_create_option(alias(_)).
 1011pengine_create_option(user(_)).
 1012
 1013
 1014%!  pengine_done is det.
 1015%
 1016%   Called  from  the  pengine  thread  =at_exit=  option.  Destroys
 1017%   _child_ pengines using pengine_destroy/1.
 1018
 1019:- public
 1020    pengine_done/0. 1021
 1022pengine_done :-
 1023    thread_self(Me),
 1024    (   thread_property(Me, status(exception('$aborted')))
 1025    ->  pengine_self(Pengine),
 1026        pengine_reply(destroy(Pengine, abort(Pengine))),
 1027        thread_detach(Me)
 1028    ;   true
 1029    ),
 1030    forall(child(_Name, Child),
 1031           pengine_destroy(Child)),
 1032    pengine_self(Id),
 1033    pengine_unregister(Id).
 1034
 1035
 1036%!  pengine_main(+Parent, +Options, +Application)
 1037%
 1038%   Run a pengine main loop. First acknowledges its creation and run
 1039%   pengine_main_loop/1.
 1040
 1041:- thread_local wrap_first_answer_in_create_event/2. 1042
 1043:- meta_predicate
 1044    pengine_prepare_source(:, +). 1045
 1046pengine_main(Parent, Options, Application) :-
 1047    fix_streams,
 1048    thread_get_message(pengine_registered(Self)),
 1049    nb_setval(pengine_parent, Parent),
 1050    pengine_register_user(Options),
 1051    catch(in_temporary_module(
 1052              Self,
 1053              pengine_prepare_source(Application, Options),
 1054              pengine_create_and_loop(Self, Application, Options)),
 1055          prepare_source_failed,
 1056          pengine_terminate(Self)).
 1057
 1058pengine_create_and_loop(Self, Application, Options) :-
 1059    setting(Application:slave_limit, SlaveLimit),
 1060    CreateEvent = create(Self, [slave_limit(SlaveLimit)|Extra]),
 1061    (   option(ask(Query), Options)
 1062    ->  asserta(wrap_first_answer_in_create_event(CreateEvent, Extra)),
 1063        option(template(Template), Options, Query),
 1064        option(chunk(Chunk), Options, 1),
 1065        option(bindings(Bindings), Options, []),
 1066        pengine_ask(Self, Query,
 1067                    [ template(Template),
 1068                      chunk(Chunk),
 1069                      bindings(Bindings)
 1070                    ])
 1071    ;   Extra = [],
 1072        pengine_reply(CreateEvent)
 1073    ),
 1074    pengine_main_loop(Self).
 1075
 1076
 1077%!  fix_streams is det.
 1078%
 1079%   If we are a pengine that is   created  from a web server thread,
 1080%   the current output points to a CGI stream.
 1081
 1082fix_streams :-
 1083    fix_stream(current_output).
 1084
 1085fix_stream(Name) :-
 1086    is_cgi_stream(Name),
 1087    !,
 1088    debug(pengine(stream), '~w is a CGI stream!', [Name]),
 1089    set_stream(user_output, alias(Name)).
 1090fix_stream(_).
 1091
 1092%!  pengine_prepare_source(:Application, +Options) is det.
 1093%
 1094%   Load the source into the pengine's module.
 1095%
 1096%   @throws =prepare_source_failed= if it failed to prepare the
 1097%           sources.
 1098
 1099pengine_prepare_source(Module:Application, Options) :-
 1100    setting(Application:program_space, SpaceLimit),
 1101    set_module(Module:program_space(SpaceLimit)),
 1102    delete_import_module(Module, user),
 1103    add_import_module(Module, Application, start),
 1104    catch(prep_module(Module, Application, Options), Error, true),
 1105    (   var(Error)
 1106    ->  true
 1107    ;   send_error(Error),
 1108        throw(prepare_source_failed)
 1109    ).
 1110
 1111prep_module(Module, Application, Options) :-
 1112    maplist(copy_flag(Module, Application), [var_prefix]),
 1113    forall(prepare_module(Module, Application, Options), true),
 1114    setup_call_cleanup(
 1115        '$set_source_module'(OldModule, Module),
 1116        maplist(process_create_option(Module), Options),
 1117        '$set_source_module'(OldModule)).
 1118
 1119copy_flag(Module, Application, Flag) :-
 1120    current_prolog_flag(Application:Flag, Value),
 1121    !,
 1122    set_prolog_flag(Module:Flag, Value).
 1123copy_flag(_, _, _).
 1124
 1125process_create_option(Application, src_text(Text)) :-
 1126    !,
 1127    pengine_src_text(Text, Application).
 1128process_create_option(Application, src_url(URL)) :-
 1129    !,
 1130    pengine_src_url(URL, Application).
 1131process_create_option(_, _).
 1132
 1133
 1134%!  prepare_module(+Module, +Application, +Options) is semidet.
 1135%
 1136%   Hook, called to initialize  the   temporary  private module that
 1137%   provides the working context of a pengine. This hook is executed
 1138%   by the pengine's thread.  Preparing the source consists of three
 1139%   steps:
 1140%
 1141%     1. Add Application as (first) default import module for Module
 1142%     2. Call this hook
 1143%     3. Compile the source provided by the the `src_text` and
 1144%        `src_url` options
 1145%
 1146%   @arg    Module is a new temporary module (see
 1147%           in_temporary_module/3) that may be (further) prepared
 1148%           by this hook.
 1149%   @arg    Application (also a module) associated to the pengine.
 1150%   @arg    Options is passed from the environment and should
 1151%           (currently) be ignored.
 1152
 1153
 1154pengine_main_loop(ID) :-
 1155    catch(guarded_main_loop(ID), abort_query, pengine_aborted(ID)).
 1156
 1157pengine_aborted(ID) :-
 1158    thread_self(Self),
 1159    debug(pengine(abort), 'Aborting ~p (thread ~p)', [ID, Self]),
 1160    empty_queue,
 1161    destroy_or_continue(abort(ID)).
 1162
 1163
 1164%!  guarded_main_loop(+Pengine) is det.
 1165%
 1166%   Executes state `2' of  the  pengine,   where  it  waits  for two
 1167%   events:
 1168%
 1169%     - destroy
 1170%     Terminate the pengine
 1171%     - ask(:Goal, +Options)
 1172%     Solve Goal.
 1173
 1174guarded_main_loop(ID) :-
 1175    pengine_request(Request),
 1176    (   Request = destroy
 1177    ->  debug(pengine(transition), '~q: 2 = ~q => 1', [ID, destroy]),
 1178        pengine_terminate(ID)
 1179    ;   Request = ask(Goal, Options)
 1180    ->  debug(pengine(transition), '~q: 2 = ~q => 3', [ID, ask(Goal)]),
 1181        ask(ID, Goal, Options)
 1182    ;   debug(pengine(transition), '~q: 2 = ~q => 2', [ID, protocol_error]),
 1183        pengine_reply(error(ID, error(protocol_error, _))),
 1184        guarded_main_loop(ID)
 1185    ).
 1186
 1187
 1188pengine_terminate(ID) :-
 1189    pengine_reply(destroy(ID)),
 1190    thread_self(Me),            % Make the thread silently disappear
 1191    thread_detach(Me).
 1192
 1193
 1194%!  solve(+Chunk, +Template, :Goal, +ID) is det.
 1195%
 1196%   Solve Goal. Note that because we can ask for a new goal in state
 1197%   `6', we must provide for an ancesteral cut (prolog_cut_to/1). We
 1198%   need to be sure to  have  a   choice  point  before  we can call
 1199%   prolog_current_choice/1. This is the reason   why this predicate
 1200%   has two clauses.
 1201
 1202solve(Chunk, Template, Goal, ID) :-
 1203    prolog_current_choice(Choice),
 1204    State = count(Chunk),
 1205    statistics(cputime, Epoch),
 1206    Time = time(Epoch),
 1207    (   call_cleanup(catch(findnsols_no_empty(State, Template, Goal, Result),
 1208                           Error, true),
 1209                     Det = true),
 1210        arg(1, Time, T0),
 1211        statistics(cputime, T1),
 1212        CPUTime is T1-T0,
 1213        (   var(Error)
 1214        ->  projection(Projection),
 1215            (   var(Det)
 1216            ->  pengine_reply(success(ID, Result, Projection,
 1217                                      CPUTime, true)),
 1218                more_solutions(ID, Choice, State, Time)
 1219            ;   !,                      % commit
 1220                destroy_or_continue(success(ID, Result, Projection,
 1221                                            CPUTime, false))
 1222            )
 1223        ;   !,                          % commit
 1224            (   Error == abort_query
 1225            ->  throw(Error)
 1226            ;   destroy_or_continue(error(ID, Error))
 1227            )
 1228        )
 1229    ;   !,                              % commit
 1230        arg(1, Time, T0),
 1231        statistics(cputime, T1),
 1232        CPUTime is T1-T0,
 1233        destroy_or_continue(failure(ID, CPUTime))
 1234    ).
 1235solve(_, _, _, _).                      % leave a choice point
 1236
 1237projection(Projection) :-
 1238    nb_current('$variable_names', Bindings),
 1239    !,
 1240    maplist(var_name, Bindings, Projection).
 1241projection([]).
 1242
 1243
 1244findnsols_no_empty(N, Template, Goal, List) :-
 1245    findnsols(N, Template, Goal, List),
 1246    List \== [].
 1247
 1248destroy_or_continue(Event) :-
 1249    arg(1, Event, ID),
 1250    (   pengine_property(ID, destroy(true))
 1251    ->  thread_self(Me),
 1252        thread_detach(Me),
 1253        pengine_reply(destroy(ID, Event))
 1254    ;   pengine_reply(Event),
 1255        garbage_collect,                % minimise our footprint
 1256        trim_stacks,
 1257        guarded_main_loop(ID)
 1258    ).
 1259
 1260%!  more_solutions(+Pengine, +Choice, +State, +Time)
 1261%
 1262%   Called after a solution was found while  there can be more. This
 1263%   is state `6' of the state machine. It processes these events:
 1264%
 1265%     * stop
 1266%     Go back via state `7' to state `2' (guarded_main_loop/1)
 1267%     * next
 1268%     Fail.  This causes solve/3 to backtrack on the goal asked,
 1269%     providing at most the current `chunk` solutions.
 1270%     * next(Count)
 1271%     As `next`, but sets the new chunk-size to Count.
 1272%     * ask(Goal, Options)
 1273%     Ask another goal.  Note that we must commit the choice point
 1274%     of the previous goal asked for.
 1275
 1276more_solutions(ID, Choice, State, Time) :-
 1277    pengine_request(Event),
 1278    more_solutions(Event, ID, Choice, State, Time).
 1279
 1280more_solutions(stop, ID, _Choice, _State, _Time) :-
 1281    !,
 1282    debug(pengine(transition), '~q: 6 = ~q => 7', [ID, stop]),
 1283    destroy_or_continue(stop(ID)).
 1284more_solutions(next, ID, _Choice, _State, Time) :-
 1285    !,
 1286    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next]),
 1287    statistics(cputime, T0),
 1288    nb_setarg(1, Time, T0),
 1289    fail.
 1290more_solutions(next(Count), ID, _Choice, State, Time) :-
 1291    Count > 0,
 1292    !,
 1293    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next(Count)]),
 1294    nb_setarg(1, State, Count),
 1295    statistics(cputime, T0),
 1296    nb_setarg(1, Time, T0),
 1297    fail.
 1298more_solutions(ask(Goal, Options), ID, Choice, _State, _Time) :-
 1299    !,
 1300    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, ask(Goal)]),
 1301    prolog_cut_to(Choice),
 1302    ask(ID, Goal, Options).
 1303more_solutions(destroy, ID, _Choice, _State, _Time) :-
 1304    !,
 1305    debug(pengine(transition), '~q: 6 = ~q => 1', [ID, destroy]),
 1306    pengine_terminate(ID).
 1307more_solutions(Event, ID, Choice, State, Time) :-
 1308    debug(pengine(transition), '~q: 6 = ~q => 6', [ID, protocol_error(Event)]),
 1309    pengine_reply(error(ID, error(protocol_error, _))),
 1310    more_solutions(ID, Choice, State, Time).
 1311
 1312%!  ask(+Pengine, :Goal, +Options)
 1313%
 1314%   Migrate from state `2' to `3'.  This predicate validates that it
 1315%   is safe to call Goal using safe_goal/1 and then calls solve/3 to
 1316%   prove the goal. It takes care of the chunk(N) option.
 1317
 1318ask(ID, Goal, Options) :-
 1319    catch(prepare_goal(ID, Goal, Goal1, Options), Error, true),
 1320    !,
 1321    (   var(Error)
 1322    ->  option(template(Template), Options, Goal),
 1323        option(chunk(N), Options, 1),
 1324        solve(N, Template, Goal1, ID)
 1325    ;   pengine_reply(error(ID, Error)),
 1326        guarded_main_loop(ID)
 1327    ).
 1328
 1329%!  prepare_goal(+Pengine, +GoalIn, -GoalOut, +Options) is det.
 1330%
 1331%   Prepare GoalIn for execution in Pengine.   This  implies we must
 1332%   perform goal expansion and, if the   system  is sandboxed, check
 1333%   the sandbox.
 1334%
 1335%   Note that expand_goal(Module:GoalIn, GoalOut) is  what we'd like
 1336%   to write, but this does not work correctly if the user wishes to
 1337%   expand `X:Y` while interpreting `X` not   as the module in which
 1338%   to run `Y`. This happens in the  CQL package. Possibly we should
 1339%   disallow this reinterpretation?
 1340
 1341prepare_goal(ID, Goal0, Module:Goal, Options) :-
 1342    option(bindings(Bindings), Options, []),
 1343    b_setval('$variable_names', Bindings),
 1344    (   prepare_goal(Goal0, Goal1, Options)
 1345    ->  true
 1346    ;   Goal1 = Goal0
 1347    ),
 1348    get_pengine_module(ID, Module),
 1349    setup_call_cleanup(
 1350        '$set_source_module'(Old, Module),
 1351        expand_goal(Goal1, Goal),
 1352        '$set_source_module'(_, Old)),
 1353    (   pengine_not_sandboxed(ID)
 1354    ->  true
 1355    ;   get_pengine_application(ID, App),
 1356        setting(App:safe_goal_limit, Limit),
 1357        catch(call_with_time_limit(
 1358                  Limit,
 1359                  safe_goal(Module:Goal)), E, true)
 1360    ->  (   var(E)
 1361        ->  true
 1362        ;   E = time_limit_exceeded
 1363        ->  throw(error(sandbox(time_limit_exceeded, Limit),_))
 1364        ;   throw(E)
 1365        )
 1366    ).
 1367
 1368
 1369%%  prepare_goal(+Goal0, -Goal1, +Options) is semidet.
 1370%
 1371%   Pre-preparation hook for running Goal0. The hook runs in the context
 1372%   of the pengine. Goal is the raw   goal  given to _ask_. The returned
 1373%   Goal1 is subject  to  goal   expansion  (expand_goal/2)  and sandbox
 1374%   validation (safe_goal/1) prior to  execution.   If  this goal fails,
 1375%   Goal0 is used for further processing.
 1376%
 1377%   @arg Options provides the options as given to _ask_
 1378
 1379
 1380%%  pengine_not_sandboxed(+Pengine) is semidet.
 1381%
 1382%   True when pengine does not operate in sandboxed mode. This implies a
 1383%   user must be  registered  by   authentication_hook/3  and  the  hook
 1384%   pengines:not_sandboxed(User, Application) must succeed.
 1385
 1386pengine_not_sandboxed(ID) :-
 1387    pengine_user(ID, User),
 1388    pengine_property(ID, application(App)),
 1389    not_sandboxed(User, App),
 1390    !.
 1391
 1392%%  not_sandboxed(+User, +Application) is semidet.
 1393%
 1394%   This hook is called to see whether the Pengine must be executed in a
 1395%   protected environment. It is only called after authentication_hook/3
 1396%   has confirmed the authentity  of  the   current  user.  If this hook
 1397%   succeeds, both loading the code and  executing the query is executed
 1398%   without enforcing sandbox security.  Typically, one should:
 1399%
 1400%     1. Provide a safe user authentication hook.
 1401%     2. Enable HTTPS in the server or put it behind an HTTPS proxy and
 1402%        ensure that the network between the proxy and the pengine
 1403%        server can be trusted.
 1404
 1405
 1406/** pengine_pull_response(+Pengine, +Options) is det
 1407
 1408Pulls a response (an event term) from the  slave Pengine if Pengine is a
 1409remote process, else does nothing at all.
 1410*/
 1411
 1412pengine_pull_response(Pengine, Options) :-
 1413    pengine_remote(Pengine, Server),
 1414    !,
 1415    remote_pengine_pull_response(Server, Pengine, Options).
 1416pengine_pull_response(_ID, _Options).
 1417
 1418
 1419/** pengine_input(+Prompt, -Term) is det
 1420
 1421Sends Prompt to the parent pengine and waits for input. Note that Prompt may be
 1422any term, compound as well as atomic.
 1423*/
 1424
 1425pengine_input(Prompt, Term) :-
 1426    pengine_self(Self),
 1427    pengine_parent(Parent),
 1428    pengine_reply(Parent, prompt(Self, Prompt)),
 1429    pengine_request(Request),
 1430    (   Request = input(Input)
 1431    ->  Term = Input
 1432    ;   Request == destroy
 1433    ->  abort
 1434    ;   throw(error(protocol_error,_))
 1435    ).
 1436
 1437
 1438/** pengine_respond(+Pengine, +Input, +Options) is det
 1439
 1440Sends a response in the form of the term Input to a slave pengine
 1441that has prompted its master for input.
 1442
 1443Defined in terms of pengine_send/3, as follows:
 1444
 1445==
 1446pengine_respond(Pengine, Input, Options) :-
 1447    pengine_send(Pengine, input(Input), Options).
 1448==
 1449
 1450*/
 1451
 1452pengine_respond(Pengine, Input, Options) :-
 1453    pengine_send(Pengine, input(Input), Options).
 1454
 1455
 1456%!  send_error(+Error) is det.
 1457%
 1458%   Send an error to my parent.   Remove non-readable blobs from the
 1459%   error term first using replace_blobs/2. If  the error contains a
 1460%   stack-trace, this is resolved to a string before sending.
 1461
 1462send_error(error(Formal, context(prolog_stack(Frames), Message))) :-
 1463    is_list(Frames),
 1464    !,
 1465    with_output_to(string(Stack),
 1466                   print_prolog_backtrace(current_output, Frames)),
 1467    pengine_self(Self),
 1468    replace_blobs(Formal, Formal1),
 1469    replace_blobs(Message, Message1),
 1470    pengine_reply(error(Self, error(Formal1,
 1471                                    context(prolog_stack(Stack), Message1)))).
 1472send_error(Error) :-
 1473    pengine_self(Self),
 1474    replace_blobs(Error, Error1),
 1475    pengine_reply(error(Self, Error1)).
 1476
 1477%!  replace_blobs(Term0, Term) is det.
 1478%
 1479%   Copy Term0 to Term, replacing non-text   blobs. This is required
 1480%   for error messages that may hold   streams  and other handles to
 1481%   non-readable objects.
 1482
 1483replace_blobs(Blob, Atom) :-
 1484    blob(Blob, Type), Type \== text,
 1485    !,
 1486    format(atom(Atom), '~p', [Blob]).
 1487replace_blobs(Term0, Term) :-
 1488    compound(Term0),
 1489    !,
 1490    compound_name_arguments(Term0, Name, Args0),
 1491    maplist(replace_blobs, Args0, Args),
 1492    compound_name_arguments(Term, Name, Args).
 1493replace_blobs(Term, Term).
 1494
 1495
 1496/*================= Remote pengines =======================
 1497*/
 1498
 1499
 1500remote_pengine_create(BaseURL, Options) :-
 1501    partition(pengine_create_option, Options, PengineOptions0, RestOptions),
 1502        (       option(ask(Query), PengineOptions0),
 1503                \+ option(template(_Template), PengineOptions0)
 1504        ->      PengineOptions = [template(Query)|PengineOptions0]
 1505        ;       PengineOptions = PengineOptions0
 1506        ),
 1507    options_to_dict(PengineOptions, PostData),
 1508    remote_post_rec(BaseURL, create, PostData, Reply, RestOptions),
 1509    arg(1, Reply, ID),
 1510    (   option(id(ID2), Options)
 1511    ->  ID = ID2
 1512    ;   true
 1513    ),
 1514    option(alias(Name), Options, ID),
 1515    assert(child(Name, ID)),
 1516    (   (   functor(Reply, create, _)   % actually created
 1517        ;   functor(Reply, output, _)   % compiler messages
 1518        )
 1519    ->  option(application(Application), PengineOptions, pengine_sandbox),
 1520        option(destroy(Destroy), PengineOptions, true),
 1521        pengine_register_remote(ID, BaseURL, Application, Destroy)
 1522    ;   true
 1523    ),
 1524    thread_self(Queue),
 1525    pengine_reply(Queue, Reply).
 1526
 1527options_to_dict(Options, Dict) :-
 1528    select_option(ask(Ask), Options, Options1),
 1529    select_option(template(Template), Options1, Options2),
 1530    !,
 1531    no_numbered_var_in(Ask+Template),
 1532    findall(AskString-TemplateString,
 1533            ask_template_to_strings(Ask, Template, AskString, TemplateString),
 1534            [ AskString-TemplateString ]),
 1535    options_to_dict(Options2, Dict0),
 1536    Dict = Dict0.put(_{ask:AskString,template:TemplateString}).
 1537options_to_dict(Options, Dict) :-
 1538    maplist(prolog_option, Options, Options1),
 1539    dict_create(Dict, _, Options1).
 1540
 1541no_numbered_var_in(Term) :-
 1542    sub_term(Sub, Term),
 1543    subsumes_term('$VAR'(_), Sub),
 1544    !,
 1545    domain_error(numbered_vars_free_term, Term).
 1546no_numbered_var_in(_).
 1547
 1548ask_template_to_strings(Ask, Template, AskString, TemplateString) :-
 1549    numbervars(Ask+Template, 0, _),
 1550    WOpts = [ numbervars(true), ignore_ops(true), quoted(true) ],
 1551    format(string(AskTemplate), '~W\n~W', [ Ask, WOpts,
 1552                                            Template, WOpts
 1553                                          ]),
 1554    split_string(AskTemplate, "\n", "", [AskString, TemplateString]).
 1555
 1556prolog_option(Option0, Option) :-
 1557    create_option_type(Option0, term),
 1558    !,
 1559    Option0 =.. [Name,Value],
 1560    format(string(String), '~k', [Value]),
 1561    Option =.. [Name,String].
 1562prolog_option(Option, Option).
 1563
 1564create_option_type(ask(_),         term).
 1565create_option_type(template(_),    term).
 1566create_option_type(application(_), atom).
 1567
 1568remote_pengine_send(BaseURL, ID, Event, Options) :-
 1569    remote_send_rec(BaseURL, send, ID, [event=Event], Reply, Options),
 1570    thread_self(Queue),
 1571    pengine_reply(Queue, Reply).
 1572
 1573remote_pengine_pull_response(BaseURL, ID, Options) :-
 1574    remote_send_rec(BaseURL, pull_response, ID, [], Reply, Options),
 1575    thread_self(Queue),
 1576    pengine_reply(Queue, Reply).
 1577
 1578remote_pengine_abort(BaseURL, ID, Options) :-
 1579    remote_send_rec(BaseURL, abort, ID, [], Reply, Options),
 1580    thread_self(Queue),
 1581    pengine_reply(Queue, Reply).
 1582
 1583%!  remote_send_rec(+Server, +Action, +ID, +Params, -Reply, +Options)
 1584%
 1585%   Issue a GET request on Server and   unify Reply with the replied
 1586%   term.
 1587
 1588remote_send_rec(Server, Action, ID, [event=Event], Reply, Options) :-
 1589    !,
 1590    server_url(Server, Action, [id=ID], URL),
 1591    http_open(URL, Stream,              % putting this in setup_call_cleanup/3
 1592              [ post(prolog(Event))     % makes it impossible to interrupt.
 1593              | Options
 1594              ]),
 1595    call_cleanup(
 1596        read_prolog_reply(Stream, Reply),
 1597        close(Stream)).
 1598remote_send_rec(Server, Action, ID, Params, Reply, Options) :-
 1599    server_url(Server, Action, [id=ID|Params], URL),
 1600    http_open(URL, Stream, Options),
 1601    call_cleanup(
 1602        read_prolog_reply(Stream, Reply),
 1603        close(Stream)).
 1604
 1605remote_post_rec(Server, Action, Data, Reply, Options) :-
 1606    server_url(Server, Action, [], URL),
 1607    probe(Action, URL),
 1608    http_open(URL, Stream,
 1609              [ post(json(Data))
 1610              | Options
 1611              ]),
 1612    call_cleanup(
 1613        read_prolog_reply(Stream, Reply),
 1614        close(Stream)).
 1615
 1616%!  probe(+Action, +URL) is det.
 1617%
 1618%   Probe the target. This is a  good   idea  before posting a large
 1619%   document and be faced with an authentication challenge. Possibly
 1620%   we should make this an option for simpler scenarios.
 1621
 1622probe(create, URL) :-
 1623    !,
 1624    http_open(URL, Stream, [method(options)]),
 1625    close(Stream).
 1626probe(_, _).
 1627
 1628read_prolog_reply(In, Reply) :-
 1629    set_stream(In, encoding(utf8)),
 1630    read(In, Reply0),
 1631    rebind_cycles(Reply0, Reply).
 1632
 1633rebind_cycles(@(Reply, Bindings), Reply) :-
 1634    is_list(Bindings),
 1635    !,
 1636    maplist(bind, Bindings).
 1637rebind_cycles(Reply, Reply).
 1638
 1639bind(Var = Value) :-
 1640    Var = Value.
 1641
 1642server_url(Server, Action, Params, URL) :-
 1643    uri_components(Server, Components0),
 1644    uri_query_components(Query, Params),
 1645    uri_data(path, Components0, Path0),
 1646    atom_concat('pengine/', Action, PAction),
 1647    directory_file_path(Path0, PAction, Path),
 1648    uri_data(path, Components0, Path, Components),
 1649    uri_data(search, Components, Query),
 1650    uri_components(URL, Components).
 1651
 1652
 1653/** pengine_event(?EventTerm) is det.
 1654    pengine_event(?EventTerm, +Options) is det.
 1655
 1656Examines the pengine's event queue  and   if  necessary blocks execution
 1657until a term that unifies to Term  arrives   in  the queue. After a term
 1658from the queue has been unified to Term,   the  term is deleted from the
 1659queue.
 1660
 1661   Valid options are:
 1662
 1663   * timeout(+Time)
 1664     Time is a float or integer and specifies the maximum time to wait
 1665     in seconds. If no event has arrived before the time is up EventTerm
 1666     is bound to the atom =timeout=.
 1667   * listen(+Id)
 1668     Only listen to events from the pengine identified by Id.
 1669*/
 1670
 1671pengine_event(Event) :-
 1672    pengine_event(Event, []).
 1673
 1674pengine_event(Event, Options) :-
 1675    thread_self(Self),
 1676    option(listen(Id), Options, _),
 1677    (   thread_get_message(Self, pengine_event(Id, Event), Options)
 1678    ->  true
 1679    ;   Event = timeout
 1680    ),
 1681    update_remote_destroy(Event).
 1682
 1683update_remote_destroy(Event) :-
 1684    destroy_event(Event),
 1685    arg(1, Event, Id),
 1686    pengine_remote(Id, _Server),
 1687    !,
 1688    pengine_unregister_remote(Id).
 1689update_remote_destroy(_).
 1690
 1691destroy_event(destroy(_)).
 1692destroy_event(destroy(_,_)).
 1693destroy_event(create(_,Features)) :-
 1694    memberchk(answer(Answer), Features),
 1695    !,
 1696    nonvar(Answer),
 1697    destroy_event(Answer).
 1698
 1699
 1700/** pengine_event_loop(:Closure, +Options) is det
 1701
 1702Starts an event loop accepting event terms   sent to the current pengine
 1703or thread. For each such  event   E,  calls  ignore(call(Closure, E)). A
 1704closure thus acts as a _handler_  for   the  event. Some events are also
 1705treated specially:
 1706
 1707   * create(ID, Term)
 1708     The ID is placed in a list of active pengines.
 1709
 1710   * destroy(ID)
 1711     The ID is removed from the list of active pengines. When the last
 1712     pengine ID is removed, the loop terminates.
 1713
 1714   * output(ID, Term)
 1715     The predicate pengine_pull_response/2 is called.
 1716
 1717Valid options are:
 1718
 1719   * autoforward(+To)
 1720     Forwards received event terms to slaves. To is either =all=,
 1721     =all_but_sender= or a Prolog list of NameOrIDs. [not yet
 1722     implemented]
 1723
 1724*/
 1725
 1726pengine_event_loop(Closure, Options) :-
 1727    child(_,_),
 1728    !,
 1729    pengine_event(Event),
 1730    (   option(autoforward(all), Options) % TODO: Implement all_but_sender and list of IDs
 1731    ->  forall(child(_,ID), pengine_send(ID, Event))
 1732    ;   true
 1733    ),
 1734    pengine_event_loop(Event, Closure, Options).
 1735pengine_event_loop(_, _).
 1736
 1737:- meta_predicate
 1738    pengine_process_event(+, 1, -, +). 1739
 1740pengine_event_loop(Event, Closure, Options) :-
 1741    pengine_process_event(Event, Closure, Continue, Options),
 1742    (   Continue == true
 1743    ->  pengine_event_loop(Closure, Options)
 1744    ;   true
 1745    ).
 1746
 1747pengine_process_event(create(ID, T), Closure, Continue, Options) :-
 1748    debug(pengine(transition), '~q: 1 = /~q => 2', [ID, create(T)]),
 1749    (   select(answer(First), T, T1)
 1750    ->  ignore(call(Closure, create(ID, T1))),
 1751        pengine_process_event(First, Closure, Continue, Options)
 1752    ;   ignore(call(Closure, create(ID, T))),
 1753        Continue = true
 1754    ).
 1755pengine_process_event(output(ID, Msg), Closure, true, _Options) :-
 1756    debug(pengine(transition), '~q: 3 = /~q => 4', [ID, output(Msg)]),
 1757    ignore(call(Closure, output(ID, Msg))),
 1758    pengine_pull_response(ID, []).
 1759pengine_process_event(debug(ID, Msg), Closure, true, _Options) :-
 1760    debug(pengine(transition), '~q: 3 = /~q => 4', [ID, debug(Msg)]),
 1761    ignore(call(Closure, debug(ID, Msg))),
 1762    pengine_pull_response(ID, []).
 1763pengine_process_event(prompt(ID, Term), Closure, true, _Options) :-
 1764    debug(pengine(transition), '~q: 3 = /~q => 5', [ID, prompt(Term)]),
 1765    ignore(call(Closure, prompt(ID, Term))).
 1766pengine_process_event(success(ID, Sol, _Proj, _Time, More), Closure, true, _) :-
 1767    debug(pengine(transition), '~q: 3 = /~q => 6/2', [ID, success(Sol, More)]),
 1768    ignore(call(Closure, success(ID, Sol, More))).
 1769pengine_process_event(failure(ID, _Time), Closure, true, _Options) :-
 1770    debug(pengine(transition), '~q: 3 = /~q => 2', [ID, failure]),
 1771    ignore(call(Closure, failure(ID))).
 1772pengine_process_event(error(ID, Error), Closure, Continue, _Options) :-
 1773    debug(pengine(transition), '~q: 3 = /~q => 2', [ID, error(Error)]),
 1774    (   call(Closure, error(ID, Error))
 1775    ->  Continue = true
 1776    ;   forall(child(_,Child), pengine_destroy(Child)),
 1777        throw(Error)
 1778    ).
 1779pengine_process_event(stop(ID), Closure, true, _Options) :-
 1780    debug(pengine(transition), '~q: 7 = /~q => 2', [ID, stop]),
 1781    ignore(call(Closure, stop(ID))).
 1782pengine_process_event(destroy(ID, Event), Closure, Continue, Options) :-
 1783    pengine_process_event(Event, Closure, _, Options),
 1784    pengine_process_event(destroy(ID), Closure, Continue, Options).
 1785pengine_process_event(destroy(ID), Closure, true, _Options) :-
 1786    retractall(child(_,ID)),
 1787    debug(pengine(transition), '~q: 1 = /~q => 0', [ID, destroy]),
 1788    ignore(call(Closure, destroy(ID))).
 1789
 1790
 1791/** pengine_rpc(+URL, +Query) is nondet.
 1792    pengine_rpc(+URL, +Query, +Options) is nondet.
 1793
 1794Semantically equivalent to the sequence below,  except that the query is
 1795executed in (and in the Prolog context   of) the pengine server referred
 1796to by URL, rather than locally.
 1797
 1798  ==
 1799    copy_term(Query, Copy),
 1800    call(Copy),                 % executed on server at URL
 1801    Query = Copy.
 1802  ==
 1803
 1804Valid options are:
 1805
 1806    - chunk(+Integer)
 1807      Can be used to reduce the number of network roundtrips being made.
 1808      See pengine_ask/3.
 1809    - timeout(+Time)
 1810      Wait at most Time seconds for the next event from the server.
 1811      The default is defined by the setting `pengines:time_limit`.
 1812
 1813Remaining  options  (except   the   server    option)   are   passed  to
 1814pengine_create/1.
 1815*/
 1816
 1817pengine_rpc(URL, Query) :-
 1818    pengine_rpc(URL, Query, []).
 1819
 1820pengine_rpc(URL, Query, M:Options0) :-
 1821    translate_local_sources(Options0, Options1, M),
 1822    (  option(timeout(_), Options1)
 1823    -> Options = Options1
 1824    ;  setting(time_limit, Limit),
 1825       Options = [timeout(Limit)|Options1]
 1826    ),
 1827    term_variables(Query, Vars),
 1828    Template =.. [v|Vars],
 1829    State = destroy(true),              % modified by process_event/4
 1830    setup_call_catcher_cleanup(
 1831        pengine_create([ ask(Query),
 1832                         template(Template),
 1833                         server(URL),
 1834                         id(Id)
 1835                       | Options
 1836                       ]),
 1837        wait_event(Template, State, [listen(Id)|Options]),
 1838        Why,
 1839        pengine_destroy_and_wait(State, Id, Why)).
 1840
 1841pengine_destroy_and_wait(destroy(true), Id, Why) :-
 1842    !,
 1843    debug(pengine(rpc), 'Destroying RPC because of ~p', [Why]),
 1844    pengine_destroy(Id),
 1845    wait_destroy(Id, 10).
 1846pengine_destroy_and_wait(_, _, Why) :-
 1847    debug(pengine(rpc), 'Not destroying RPC (~p)', [Why]).
 1848
 1849wait_destroy(Id, _) :-
 1850    \+ child(_, Id),
 1851    !.
 1852wait_destroy(Id, N) :-
 1853    pengine_event(Event, [listen(Id),timeout(10)]),
 1854    !,
 1855    (   destroy_event(Event)
 1856    ->  retractall(child(_,Id))
 1857    ;   succ(N1, N)
 1858    ->  wait_destroy(Id, N1)
 1859    ;   debug(pengine(rpc), 'RPC did not answer to destroy ~p', [Id]),
 1860        pengine_unregister_remote(Id),
 1861        retractall(child(_,Id))
 1862    ).
 1863
 1864wait_event(Template, State, Options) :-
 1865    pengine_event(Event, Options),
 1866    debug(pengine(event), 'Received ~p', [Event]),
 1867    process_event(Event, Template, State, Options).
 1868
 1869process_event(create(_ID, Features), Template, State, Options) :-
 1870    memberchk(answer(First), Features),
 1871    process_event(First, Template, State, Options).
 1872process_event(error(_ID, Error), _Template, _, _Options) :-
 1873    throw(Error).
 1874process_event(failure(_ID, _Time), _Template, _, _Options) :-
 1875    fail.
 1876process_event(prompt(ID, Prompt), Template, State, Options) :-
 1877    pengine_rpc_prompt(ID, Prompt, Reply),
 1878    pengine_send(ID, input(Reply)),
 1879    wait_event(Template, State, Options).
 1880process_event(output(ID, Term), Template, State, Options) :-
 1881    pengine_rpc_output(ID, Term),
 1882    pengine_pull_response(ID, Options),
 1883    wait_event(Template, State, Options).
 1884process_event(debug(ID, Message), Template, State, Options) :-
 1885    debug(pengine(debug), '~w', [Message]),
 1886    pengine_pull_response(ID, Options),
 1887    wait_event(Template, State, Options).
 1888process_event(success(_ID, Solutions, _Proj, _Time, false),
 1889              Template, _, _Options) :-
 1890    !,
 1891    member(Template, Solutions).
 1892process_event(success(ID, Solutions, _Proj, _Time, true),
 1893              Template, State, Options) :-
 1894    (   member(Template, Solutions)
 1895    ;   pengine_next(ID, Options),
 1896        wait_event(Template, State, Options)
 1897    ).
 1898process_event(destroy(ID, Event), Template, State, Options) :-
 1899    !,
 1900    retractall(child(_,ID)),
 1901    nb_setarg(1, State, false),
 1902    debug(pengine(destroy), 'State: ~p~n', [State]),
 1903    process_event(Event, Template, State, Options).
 1904% compatibility with older versions of the protocol.
 1905process_event(success(ID, Solutions, Time, More),
 1906              Template, State, Options) :-
 1907    process_event(success(ID, Solutions, _Proj, Time, More),
 1908                  Template, State, Options).
 1909
 1910
 1911pengine_rpc_prompt(ID, Prompt, Term) :-
 1912    prompt(ID, Prompt, Term0),
 1913    !,
 1914    Term = Term0.
 1915pengine_rpc_prompt(_ID, Prompt, Term) :-
 1916    setup_call_cleanup(
 1917        prompt(Old, Prompt),
 1918        read(Term),
 1919        prompt(_, Old)).
 1920
 1921pengine_rpc_output(ID, Term) :-
 1922    output(ID, Term),
 1923    !.
 1924pengine_rpc_output(_ID, Term) :-
 1925    print(Term).
 1926
 1927%%  prompt(+ID, +Prompt, -Term) is semidet.
 1928%
 1929%   Hook to handle pengine_input/2 from the remote pengine. If the hooks
 1930%   fails, pengine_rpc/3 calls read/1 using the current prompt.
 1931
 1932:- multifile prompt/3. 1933
 1934%%  output(+ID, +Term) is semidet.
 1935%
 1936%   Hook to handle pengine_output/1 from the remote pengine. If the hook
 1937%   fails, it calls print/1 on Term.
 1938
 1939:- multifile output/2. 1940
 1941
 1942/*================= HTTP handlers =======================
 1943*/
 1944
 1945%   Declare  HTTP  locations  we  serve  and   how.  Note  that  we  use
 1946%   time_limit(inifinite) because pengines have their  own timeout. Also
 1947%   note that we use spawn. This  is   needed  because we can easily get
 1948%   many clients waiting for  some  action   on  a  pengine to complete.
 1949%   Without spawning, we would quickly exhaust   the  worker pool of the
 1950%   HTTP server.
 1951%
 1952%   FIXME: probably we should wait for a   short time for the pengine on
 1953%   the default worker thread. Only if  that   time  has expired, we can
 1954%   call http_spawn/2 to continue waiting on   a  new thread. That would
 1955%   improve the performance and reduce the usage of threads.
 1956
 1957:- http_handler(root(pengine),               http_404([]),
 1958                [ id(pengines) ]). 1959:- http_handler(root(pengine/create),        http_pengine_create,
 1960                [ time_limit(infinite), spawn([]) ]). 1961:- http_handler(root(pengine/send),          http_pengine_send,
 1962                [ time_limit(infinite), spawn([]) ]). 1963:- http_handler(root(pengine/pull_response), http_pengine_pull_response,
 1964                [ time_limit(infinite), spawn([]) ]). 1965:- http_handler(root(pengine/abort),         http_pengine_abort,         []). 1966:- http_handler(root(pengine/ping),          http_pengine_ping,          []). 1967:- http_handler(root(pengine/destroy_all),   http_pengine_destroy_all,   []). 1968
 1969:- http_handler(root(pengine/'pengines.js'),
 1970                http_reply_file(library('http/web/js/pengines.js'), []), []). 1971:- http_handler(root(pengine/'plterm.css'),
 1972                http_reply_file(library('http/web/css/plterm.css'), []), []). 1973
 1974
 1975%%  http_pengine_create(+Request)
 1976%
 1977%   HTTP POST handler  for  =/pengine/create=.   This  API  accepts  the
 1978%   pengine  creation  parameters  both  as  =application/json=  and  as
 1979%   =www-form-encoded=.  Accepted parameters:
 1980%
 1981%     | **Parameter** | **Default**       | **Comment**                |
 1982%     |---------------|-------------------|----------------------------|
 1983%     | format        | `prolog`          | Output format              |
 1984%     | application   | `pengine_sandbox` | Pengine application        |
 1985%     | chunk         | 1                 | Chunk-size for results     |
 1986%     | solutions     | chunked           | If `all`, emit all results |
 1987%     | ask           | -                 | The query                  |
 1988%     | template      | -                 | Output template            |
 1989%     | src_text      | ""                | Program                    |
 1990%     | src_url       | -                 | Program to download        |
 1991%     | disposition   | -                 | Download location          |
 1992%
 1993%     Note that solutions=all internally  uses   chunking  to obtain the
 1994%     results from the pengine, but the results are combined in a single
 1995%     HTTP reply. This is currently only  implemented by the CSV backend
 1996%     that is part of SWISH for   downloading unbounded result sets with
 1997%     limited memory resources.
 1998
 1999http_pengine_create(Request) :-
 2000    reply_options(Request, [post]),
 2001    !.
 2002http_pengine_create(Request) :-
 2003    memberchk(content_type(CT), Request),
 2004    sub_atom(CT, 0, _, _, 'application/json'),
 2005    !,
 2006    http_read_json_dict(Request, Dict),
 2007    dict_atom_option(format, Dict, Format, prolog),
 2008    dict_atom_option(application, Dict, Application, pengine_sandbox),
 2009    http_pengine_create(Request, Application, Format, Dict).
 2010http_pengine_create(Request) :-
 2011    Optional = [optional(true)],
 2012    OptString = [string|Optional],
 2013    Form = [ format(Format, [default(prolog)]),
 2014             application(Application, [default(pengine_sandbox)]),
 2015             chunk(_, [integer, default(1)]),
 2016             solutions(_, [oneof([all,chunked]), default(chunked)]),
 2017             ask(_, OptString),
 2018             template(_, OptString),
 2019             src_text(_, OptString),
 2020             disposition(_, OptString),
 2021             src_url(_, Optional)
 2022           ],
 2023    http_parameters(Request, Form),
 2024    form_dict(Form, Dict),
 2025    http_pengine_create(Request, Application, Format, Dict).
 2026
 2027dict_atom_option(Key, Dict, Atom, Default) :-
 2028    (   get_dict(Key, Dict, String)
 2029    ->  atom_string(Atom, String)
 2030    ;   Atom = Default
 2031    ).
 2032
 2033form_dict(Form, Dict) :-
 2034    form_values(Form, Pairs),
 2035    dict_pairs(Dict, _, Pairs).
 2036
 2037form_values([], []).
 2038form_values([H|T], Pairs) :-
 2039    arg(1, H, Value),
 2040    nonvar(Value),
 2041    !,
 2042    functor(H, Name, _),
 2043    Pairs = [Name-Value|PairsT],
 2044    form_values(T, PairsT).
 2045form_values([_|T], Pairs) :-
 2046    form_values(T, Pairs).
 2047
 2048%!  http_pengine_create(+Request, +Application, +Format, +OptionsDict)
 2049
 2050
 2051http_pengine_create(Request, Application, Format, Dict) :-
 2052    current_application(Application),
 2053    !,
 2054    allowed(Request, Application),
 2055    authenticate(Request, Application, UserOptions),
 2056    dict_to_options(Dict, Application, CreateOptions0),
 2057    append(UserOptions, CreateOptions0, CreateOptions),
 2058    pengine_uuid(Pengine),
 2059    message_queue_create(Queue, [max_size(25)]),
 2060    setting(Application:time_limit, TimeLimit),
 2061    get_time(Now),
 2062    asserta(pengine_queue(Pengine, Queue, TimeLimit, Now)),
 2063    broadcast(pengine(create(Pengine, Application, CreateOptions))),
 2064    create(Queue, Pengine, CreateOptions, http, Application),
 2065    create_wait_and_output_result(Pengine, Queue, Format,
 2066                                  TimeLimit, Dict),
 2067    gc_abandoned_queues.
 2068http_pengine_create(_Request, Application, Format, _Dict) :-
 2069    Error = existence_error(pengine_application, Application),
 2070    pengine_uuid(ID),
 2071    output_result(Format, error(ID, error(Error, _))).
 2072
 2073
 2074dict_to_options(Dict, Application, CreateOptions) :-
 2075    dict_pairs(Dict, _, Pairs),
 2076    pairs_create_options(Pairs, Application, CreateOptions).
 2077
 2078pairs_create_options([], _, []) :- !.
 2079pairs_create_options(T0, App, CreateOpts) :-
 2080    selectchk(ask-Ask, T0, T1),
 2081    selectchk(template-Template, T1, T2),
 2082    !,
 2083    CreateOpts = [ ask(Ask1), template(Template1), bindings(Bindings) | T ],
 2084    format(string(AskTemplate), 't((~s),(~s))', [Ask, Template]),
 2085    term_string(t(Ask1,Template1), AskTemplate,
 2086                [ variable_names(Bindings),
 2087                  module(App)
 2088                ]),
 2089    pairs_create_options(T2, App, T).
 2090pairs_create_options([ask-String|T0], App,
 2091                     [ask(Ask),template(Template),bindings(Bindings1)|T]) :-
 2092    !,
 2093    term_string(Ask, String,
 2094                [ variable_names(Bindings),
 2095                  module(App)
 2096                ]),
 2097    exclude(anon, Bindings, Bindings1),
 2098    dict_create(Template, json, Bindings1),
 2099    pairs_create_options(T0, App, T).
 2100pairs_create_options([N-V0|T0], App, [Opt|T]) :-
 2101    Opt =.. [N,V],
 2102    pengine_create_option(Opt), N \== user,
 2103    !,
 2104    (   create_option_type(Opt, Type)
 2105    ->  (   Type == term
 2106        ->  atom_to_term(V0, V, _)
 2107        ;   Type == atom
 2108        ->  atom_string(V, V0)
 2109        ;   assertion(false)
 2110        )
 2111    ;   V = V0
 2112    ),
 2113    pairs_create_options(T0, App, T).
 2114pairs_create_options([_|T0], App, T) :-
 2115    pairs_create_options(T0, App, T).
 2116
 2117
 2118%!  wait_and_output_result(+Pengine, +Queue,
 2119%!                         +Format, +TimeLimit) is det.
 2120%
 2121%   Wait for the Pengine's Queue and if  there is a message, send it
 2122%   to the requester using  output_result/1.   If  Pengine  does not
 2123%   answer within the time specified   by  the setting =time_limit=,
 2124%   Pengine is aborted and the  result is error(time_limit_exceeded,
 2125%   _).
 2126
 2127wait_and_output_result(Pengine, Queue, Format, TimeLimit) :-
 2128    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2129                                 [ timeout(TimeLimit)
 2130                                 ]),
 2131              Error, true)
 2132    ->  (   var(Error)
 2133        ->  debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
 2134            ignore(destroy_queue_from_http(Pengine, Event, Queue)),
 2135            output_result(Format, Event)
 2136        ;   output_result(Format, died(Pengine))
 2137        )
 2138    ;   time_limit_exceeded(Pengine, Format)
 2139    ).
 2140
 2141%!  create_wait_and_output_result(+Pengine, +Queue, +Format,
 2142%!                                +TimeLimit, +Dict) is det.
 2143%
 2144%   Intercepts  the  `solutions=all'  case    used  for  downloading
 2145%   results. Dict may contain a  `disposition`   key  to  denote the
 2146%   download location.
 2147
 2148create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :-
 2149    get_dict(solutions, Dict, all),
 2150    !,
 2151    between(1, infinite, Page),
 2152    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2153                                 [ timeout(TimeLimit)
 2154                                 ]),
 2155              Error, true)
 2156    ->  (   var(Error)
 2157        ->  debug(pengine(wait), 'Page ~D: got ~q from ~q', [Page, Event, Queue]),
 2158            (   destroy_queue_from_http(Pengine, Event, Queue)
 2159            ->  output_result(Format, page(Page, Event))
 2160            ;   pengine_thread(Pengine, Thread),
 2161                thread_send_message(Thread, pengine_request(next)),
 2162                output_result(Format, page(Page, Event), Dict),
 2163                fail
 2164            )
 2165        ;   output_result(Format, died(Pengine))
 2166        )
 2167    ;   time_limit_exceeded(Pengine, Format)
 2168    ),
 2169    !.
 2170create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, _Dict) :-
 2171    wait_and_output_result(Pengine, Queue, Format, TimeLimit).
 2172
 2173%!  time_limit_exceeded(+Pengine, +Format)
 2174%
 2175%   The Pengine did not reply within its time limit. Send a reply to the
 2176%   client in the requested format and interrupt the Pengine.
 2177%
 2178%   @bug Ideally, if the Pengine has `destroy` set to `false`, we should
 2179%   get the Pengine back to its main   loop.  Unfortunately we only have
 2180%   normal exceptions that may be  caught   by  the  Pengine and `abort`
 2181%   which cannot be caught and thus destroys the Pengine.
 2182
 2183time_limit_exceeded(Pengine, Format) :-
 2184    call_cleanup(
 2185        pengine_destroy(Pengine, [force(true)]),
 2186        output_result(Format,
 2187                      destroy(Pengine,
 2188                              error(Pengine, time_limit_exceeded)))).
 2189
 2190
 2191%!  destroy_queue_from_http(+Pengine, +Event, +Queue) is semidet.
 2192%
 2193%   Consider destroying the output queue   for Pengine after sending
 2194%   Event back to the HTTP client. We can destroy the queue if
 2195%
 2196%     - The pengine already died (output_queue/3 is present) and
 2197%       the queue is empty.
 2198%     - This is a final (destroy) event.
 2199%
 2200%   @tbd    If the client did not request all output, the queue will
 2201%           not be destroyed.  We need some timeout and GC for that.
 2202
 2203destroy_queue_from_http(ID, _, Queue) :-
 2204    output_queue(ID, Queue, _),
 2205    !,
 2206    destroy_queue_if_empty(Queue).
 2207destroy_queue_from_http(ID, Event, Queue) :-
 2208    debug(pengine(destroy), 'DESTROY? ~p', [Event]),
 2209    is_destroy_event(Event),
 2210    !,
 2211    message_queue_property(Queue, size(Waiting)),
 2212    debug(pengine(destroy), 'Destroy ~p (waiting ~D)', [Queue, Waiting]),
 2213    with_mutex(pengine, sync_destroy_queue_from_http(ID, Queue)).
 2214
 2215is_destroy_event(destroy(_)).
 2216is_destroy_event(destroy(_,_)).
 2217is_destroy_event(create(_, Options)) :-
 2218    memberchk(answer(Event), Options),
 2219    is_destroy_event(Event).
 2220
 2221destroy_queue_if_empty(Queue) :-
 2222    thread_peek_message(Queue, _),
 2223    !.
 2224destroy_queue_if_empty(Queue) :-
 2225    retractall(output_queue(_, Queue, _)),
 2226    message_queue_destroy(Queue).
 2227
 2228%!  gc_abandoned_queues
 2229%
 2230%   Check whether there are queues  that   have  been abadoned. This
 2231%   happens if the stream contains output events and not all of them
 2232%   are read by the client.
 2233
 2234:- dynamic
 2235    last_gc/1. 2236
 2237gc_abandoned_queues :-
 2238    consider_queue_gc,
 2239    !,
 2240    get_time(Now),
 2241    (   output_queue(_, Queue, Time),
 2242        Now-Time > 15*60,
 2243        retract(output_queue(_, Queue, Time)),
 2244        message_queue_destroy(Queue),
 2245        fail
 2246    ;   retractall(last_gc(_)),
 2247        asserta(last_gc(Now))
 2248    ).
 2249gc_abandoned_queues.
 2250
 2251consider_queue_gc :-
 2252    predicate_property(output_queue(_,_,_), number_of_clauses(N)),
 2253    N > 100,
 2254    (   last_gc(Time),
 2255        get_time(Now),
 2256        Now-Time > 5*60
 2257    ->  true
 2258    ;   \+ last_gc(_)
 2259    ).
 2260
 2261%!  sync_destroy_queue_from_http(+Pengine, +Queue) is det.
 2262%!  sync_delay_destroy_queue(+Pengine, +Queue) is det.
 2263%
 2264%   Handle destruction of the message queue connecting the HTTP side
 2265%   to the pengine. We cannot delete the queue when the pengine dies
 2266%   because the queue may contain output  events. Termination of the
 2267%   pengine and finishing the  HTTP  exchange   may  happen  in both
 2268%   orders. This means we need handle this using synchronization.
 2269%
 2270%     * sync_destroy_queue_from_pengine(+Pengine, +Queue)
 2271%     Called (indirectly) from pengine_done/1 if the pengine's
 2272%     thread dies.
 2273%     * sync_destroy_queue_from_http(+Pengine, +Queue)
 2274%     Called from destroy_queue/3, from wait_and_output_result/4,
 2275%     i.e., from the HTTP side.
 2276
 2277:- dynamic output_queue_destroyed/1. 2278
 2279sync_destroy_queue_from_http(ID, Queue) :-
 2280    (   output_queue(ID, Queue, _)
 2281    ->  destroy_queue_if_empty(Queue)
 2282    ;   thread_peek_message(Queue, pengine_event(_, output(_,_)))
 2283    ->  debug(pengine(destroy), 'Delay destruction of ~p because of output',
 2284              [Queue]),
 2285        get_time(Now),
 2286        asserta(output_queue(ID, Queue, Now))
 2287    ;   message_queue_destroy(Queue),
 2288        asserta(output_queue_destroyed(Queue))
 2289    ).
 2290
 2291%!  sync_destroy_queue_from_pengine(+Pengine, +Queue)
 2292%
 2293%   Called  from  pengine_unregister/1  when    the  pengine  thread
 2294%   terminates. It is called while the mutex `pengine` held.
 2295
 2296sync_destroy_queue_from_pengine(ID, Queue) :-
 2297    (   retract(output_queue_destroyed(Queue))
 2298    ->  true
 2299    ;   get_time(Now),
 2300        asserta(output_queue(ID, Queue, Now))
 2301    ),
 2302    retractall(pengine_queue(ID, Queue, _, _)).
 2303
 2304
 2305http_pengine_send(Request) :-
 2306    reply_options(Request, [get,post]),
 2307    !.
 2308http_pengine_send(Request) :-
 2309    http_parameters(Request,
 2310                    [ id(ID, [ type(atom) ]),
 2311                      event(EventString, [optional(true)]),
 2312                      format(Format, [default(prolog)])
 2313                    ]),
 2314    get_pengine_module(ID, Module),
 2315    (   current_module(Module)          % avoid re-creating the module
 2316    ->  catch(( read_event(Request, EventString, Module, Event0, Bindings),
 2317                fix_bindings(Format, Event0, Bindings, Event1)
 2318              ),
 2319              Error,
 2320              true),
 2321        (   var(Error)
 2322        ->  debug(pengine(event), 'HTTP send: ~p', [Event1]),
 2323            (   pengine_thread(ID, Thread)
 2324            ->  pengine_queue(ID, Queue, TimeLimit, _),
 2325                random_delay,
 2326                broadcast(pengine(send(ID, Event1))),
 2327                thread_send_message(Thread, pengine_request(Event1)),
 2328                wait_and_output_result(ID, Queue, Format, TimeLimit)
 2329            ;   atom(ID)
 2330            ->  pengine_died(Format, ID)
 2331            ;   http_404([], Request)
 2332            )
 2333        ;   output_result(Format, error(ID, Error))
 2334        )
 2335    ;   debug(pengine(event), 'Pengine module ~q vanished', [Module]),
 2336        discard_post_data(Request),
 2337        pengine_died(Format, ID)
 2338    ).
 2339
 2340pengine_died(Format, Pengine) :-
 2341    output_result(Format, error(Pengine,
 2342                                error(existence_error(pengine, Pengine),_))).
 2343
 2344
 2345%%  read_event(+Request, +EventString, +Module, -Event, -Bindings)
 2346%
 2347%   Read the sent event. The event is a   Prolog  term that is either in
 2348%   the =event= parameter or as a posted document.
 2349
 2350read_event(_Request, EventString, Module, Event, Bindings) :-
 2351    nonvar(EventString),
 2352    !,
 2353    term_string(Event, EventString,
 2354                [ variable_names(Bindings),
 2355                  module(Module)
 2356                ]).
 2357read_event(Request, _EventString, Module, Event, Bindings) :-
 2358    option(method(post), Request),
 2359    http_read_data(Request,     Event,
 2360                   [ content_type('application/x-prolog'),
 2361                     module(Module),
 2362                     variable_names(Bindings)
 2363                   ]).
 2364
 2365%%  discard_post_data(+Request) is det.
 2366%
 2367%   If this is a POST request, discard the posted data.
 2368
 2369discard_post_data(Request) :-
 2370    option(method(post), Request),
 2371    !,
 2372    setup_call_cleanup(
 2373        open_null_stream(NULL),
 2374        http_read_data(Request, _, [to(stream(NULL))]),
 2375        close(NULL)).
 2376discard_post_data(_).
 2377
 2378%!  fix_bindings(+Format, +EventIn, +Bindings, -Event) is det.
 2379%
 2380%   Generate the template for json(-s) Format  from the variables in
 2381%   the asked Goal. Variables starting  with an underscore, followed
 2382%   by an capital letter are ignored from the template.
 2383
 2384fix_bindings(Format,
 2385             ask(Goal, Options0), Bindings,
 2386             ask(Goal, NewOptions)) :-
 2387    json_lang(Format),
 2388    !,
 2389    exclude(anon, Bindings, NamedBindings),
 2390    template(NamedBindings, Template, Options0, Options1),
 2391    select_option(chunk(Paging), Options1, Options2, 1),
 2392    NewOptions = [ template(Template),
 2393                   chunk(Paging),
 2394                   bindings(NamedBindings)
 2395                 | Options2
 2396                 ].
 2397fix_bindings(_, Command, _, Command).
 2398
 2399template(_, Template, Options0, Options) :-
 2400    select_option(template(Template), Options0, Options),
 2401    !.
 2402template(Bindings, Template, Options, Options) :-
 2403    dict_create(Template, json, Bindings).
 2404
 2405anon(Name=_) :-
 2406    sub_atom(Name, 0, _, _, '_'),
 2407    sub_atom(Name, 1, 1, _, Next),
 2408    char_type(Next, prolog_var_start).
 2409
 2410var_name(Name=_, Name).
 2411
 2412
 2413%!  json_lang(+Format) is semidet.
 2414%
 2415%   True if Format is a JSON variation.
 2416
 2417json_lang(json) :- !.
 2418json_lang(Format) :-
 2419    sub_atom(Format, 0, _, _, 'json-').
 2420
 2421%!  http_pengine_pull_response(+Request)
 2422%
 2423%   HTTP handler for /pengine/pull_response.  Pulls possible pending
 2424%   messages from the pengine.
 2425
 2426http_pengine_pull_response(Request) :-
 2427    reply_options(Request, [get]),
 2428    !.
 2429http_pengine_pull_response(Request) :-
 2430    http_parameters(Request,
 2431            [   id(ID, []),
 2432                format(Format, [default(prolog)])
 2433            ]),
 2434    (   (   pengine_queue(ID, Queue, TimeLimit, _)
 2435        ->  true
 2436        ;   output_queue(ID, Queue, _),
 2437            TimeLimit = 0
 2438        )
 2439    ->  wait_and_output_result(ID, Queue, Format, TimeLimit)
 2440    ;   http_404([], Request)
 2441    ).
 2442
 2443%!  http_pengine_abort(+Request)
 2444%
 2445%   HTTP handler for /pengine/abort. Note that  abort may be sent at
 2446%   any time and the reply may  be   handled  by a pull_response. In
 2447%   that case, our  pengine  has  already   died  before  we  get to
 2448%   wait_and_output_result/4.
 2449
 2450http_pengine_abort(Request) :-
 2451    reply_options(Request, [get]),
 2452    !.
 2453http_pengine_abort(Request) :-
 2454    http_parameters(Request,
 2455            [   id(ID, []),
 2456                format(Format, [default(prolog)])
 2457            ]),
 2458    (   pengine_thread(ID, _Thread),
 2459        pengine_queue(ID, Queue, TimeLimit, _)
 2460    ->  broadcast(pengine(abort(ID))),
 2461        abort_pending_output(ID),
 2462        pengine_abort(ID),
 2463        wait_and_output_result(ID, Queue, Format, TimeLimit)
 2464    ;   http_404([], Request)
 2465    ).
 2466
 2467http_pengine_destroy_all(Request) :-
 2468    reply_options(Request, [get]),
 2469    !.
 2470http_pengine_destroy_all(Request) :-
 2471    http_parameters(Request,
 2472                    [ ids(IDsAtom, [])
 2473                    ]),
 2474    atomic_list_concat(IDs, ',', IDsAtom),
 2475    forall(member(ID, IDs),
 2476           pengine_destroy(ID, [force(true)])),
 2477    reply_json("ok").
 2478
 2479%!  http_pengine_ping(+Request)
 2480%
 2481%   HTTP handler for /pengine/ping.  If   the  requested  Pengine is
 2482%   alive and event status(Pengine, Stats) is created, where `Stats`
 2483%   is the return of thread_statistics/2.
 2484
 2485http_pengine_ping(Request) :-
 2486    reply_options(Request, [get]),
 2487    !.
 2488http_pengine_ping(Request) :-
 2489    http_parameters(Request,
 2490                    [ id(Pengine, []),
 2491                      format(Format, [default(prolog)])
 2492                    ]),
 2493    (   pengine_thread(Pengine, Thread),
 2494        catch(thread_statistics(Thread, Stats), _, fail)
 2495    ->  output_result(Format, ping(Pengine, Stats))
 2496    ;   output_result(Format, died(Pengine))
 2497    ).
 2498
 2499
 2500%!  output_result(+Format, +EventTerm) is det.
 2501%!  output_result(+Format, +EventTerm, +OptionsDict) is det.
 2502%
 2503%   Formulate an HTTP response from a pengine event term. Format is
 2504%   one of =prolog=, =json= or =json-s=.
 2505
 2506:- dynamic
 2507    pengine_replying/2.             % +Pengine, +Thread
 2508
 2509output_result(Format, Event) :-
 2510    arg(1, Event, Pengine),
 2511    thread_self(Thread),
 2512    setup_call_cleanup(
 2513        asserta(pengine_replying(Pengine, Thread), Ref),
 2514        catch(output_result(Format, Event, _{}),
 2515              pengine_abort_output,
 2516              true),
 2517        erase(Ref)).
 2518
 2519output_result(prolog, Event, _) :-
 2520    !,
 2521    format('Content-type: text/x-prolog; charset=UTF-8~n~n'),
 2522    write_term(Event,
 2523               [ quoted(true),
 2524                 ignore_ops(true),
 2525                 fullstop(true),
 2526                 blobs(portray),
 2527                 portray_goal(portray_blob),
 2528                 nl(true)
 2529               ]).
 2530output_result(Lang, Event, Dict) :-
 2531    write_result(Lang, Event, Dict),
 2532    !.
 2533output_result(Lang, Event, _) :-
 2534    json_lang(Lang),
 2535    !,
 2536    (   event_term_to_json_data(Event, JSON, Lang)
 2537    ->  cors_enable,
 2538        disable_client_cache,
 2539        reply_json(JSON)
 2540    ;   assertion(event_term_to_json_data(Event, _, Lang))
 2541    ).
 2542output_result(Lang, _Event, _) :-    % FIXME: allow for non-JSON format
 2543    domain_error(pengine_format, Lang).
 2544
 2545%!  portray_blob(+Blob, +Options) is det.
 2546%
 2547%   Portray non-text blobs that may  appear   in  output  terms. Not
 2548%   really sure about that. Basically such  terms need to be avoided
 2549%   as they are meaningless outside the process. The generated error
 2550%   is hard to debug though, so now we send them as `'$BLOB'(Type)`.
 2551%   Future versions may include more info, depending on `Type`.
 2552
 2553:- public portray_blob/2.               % called from write-term
 2554portray_blob(Blob, _Options) :-
 2555    blob(Blob, Type),
 2556    writeq('$BLOB'(Type)).
 2557
 2558%!  abort_pending_output(+Pengine) is det.
 2559%
 2560%   If we get an abort, it is possible that output is being produced
 2561%   for the client.  This predicate aborts these threads.
 2562
 2563abort_pending_output(Pengine) :-
 2564    forall(pengine_replying(Pengine, Thread),
 2565           abort_output_thread(Thread)).
 2566
 2567abort_output_thread(Thread) :-
 2568    catch(thread_signal(Thread, throw(pengine_abort_output)),
 2569          error(existence_error(thread, _), _),
 2570          true).
 2571
 2572%!  write_result(+Lang, +Event, +Dict) is semidet.
 2573%
 2574%   Hook that allows for different output formats. The core Pengines
 2575%   library supports `prolog` and various   JSON  dialects. The hook
 2576%   event_to_json/3 can be used to refine   the  JSON dialects. This
 2577%   hook must be used if  a   completely  different output format is
 2578%   desired.
 2579
 2580%!  disable_client_cache
 2581%
 2582%   Make sure the client will not cache our page.
 2583%
 2584%   @see http://stackoverflow.com/questions/49547/making-sure-a-web-page-is-not-cached-across-all-browsers
 2585
 2586disable_client_cache :-
 2587    format('Cache-Control: no-cache, no-store, must-revalidate\r\n\c
 2588            Pragma: no-cache\r\n\c
 2589            Expires: 0\r\n').
 2590
 2591event_term_to_json_data(Event, JSON, Lang) :-
 2592    event_to_json(Event, JSON, Lang),
 2593    !.
 2594event_term_to_json_data(success(ID, Bindings0, Projection, Time, More),
 2595                        json{event:success, id:ID, time:Time,
 2596                             data:Bindings, more:More, projection:Projection},
 2597                        json) :-
 2598    !,
 2599    term_to_json(Bindings0, Bindings).
 2600event_term_to_json_data(destroy(ID, Event),
 2601                        json{event:destroy, id:ID, data:JSON},
 2602                        Style) :-
 2603    !,
 2604    event_term_to_json_data(Event, JSON, Style).
 2605event_term_to_json_data(create(ID, Features0), JSON, Style) :-
 2606    !,
 2607    (   select(answer(First0), Features0, Features1)
 2608    ->  event_term_to_json_data(First0, First, Style),
 2609        Features = [answer(First)|Features1]
 2610    ;   Features = Features0
 2611    ),
 2612    dict_create(JSON, json, [event(create), id(ID)|Features]).
 2613event_term_to_json_data(destroy(ID, Event),
 2614                        json{event:destroy, id:ID, data:JSON}, Style) :-
 2615    !,
 2616    event_term_to_json_data(Event, JSON, Style).
 2617event_term_to_json_data(error(ID, ErrorTerm), Error, _Style) :-
 2618    !,
 2619    Error0 = json{event:error, id:ID, data:Message},
 2620    add_error_details(ErrorTerm, Error0, Error),
 2621    message_to_string(ErrorTerm, Message).
 2622event_term_to_json_data(failure(ID, Time),
 2623                        json{event:failure, id:ID, time:Time}, _).
 2624event_term_to_json_data(EventTerm, json{event:F, id:ID}, _) :-
 2625    functor(EventTerm, F, 1),
 2626    !,
 2627    arg(1, EventTerm, ID).
 2628event_term_to_json_data(EventTerm, json{event:F, id:ID, data:JSON}, _) :-
 2629    functor(EventTerm, F, 2),
 2630    arg(1, EventTerm, ID),
 2631    arg(2, EventTerm, Data),
 2632    term_to_json(Data, JSON).
 2633
 2634:- public add_error_details/3. 2635
 2636%%  add_error_details(+Error, +JSON0, -JSON)
 2637%
 2638%   Add format error code and  location   information  to an error. Also
 2639%   used by pengines_io.pl.
 2640
 2641add_error_details(Error, JSON0, JSON) :-
 2642    add_error_code(Error, JSON0, JSON1),
 2643    add_error_location(Error, JSON1, JSON).
 2644
 2645%%  add_error_code(+Error, +JSON0, -JSON) is det.
 2646%
 2647%   Add a =code= field to JSON0 of Error is an ISO error term. The error
 2648%   code is the functor name of  the   formal  part  of the error, e.g.,
 2649%   =syntax_error=,  =type_error=,  etc.   Some    errors   carry   more
 2650%   information:
 2651%
 2652%     - existence_error(Type, Obj)
 2653%     {arg1:Type, arg2:Obj}, where Obj is stringified of it is not
 2654%     atomic.
 2655
 2656add_error_code(error(existence_error(Type, Obj), _), Error0, Error) :-
 2657    atom(Type),
 2658    !,
 2659    to_atomic(Obj, Value),
 2660    Error = Error0.put(_{code:existence_error, arg1:Type, arg2:Value}).
 2661add_error_code(error(Formal, _), Error0, Error) :-
 2662    callable(Formal),
 2663    !,
 2664    functor(Formal, Code, _),
 2665    Error = Error0.put(code, Code).
 2666add_error_code(_, Error, Error).
 2667
 2668% What to do with large integers?
 2669to_atomic(Obj, Atomic) :- atom(Obj),   !, Atomic = Obj.
 2670to_atomic(Obj, Atomic) :- number(Obj), !, Atomic = Obj.
 2671to_atomic(Obj, Atomic) :- string(Obj), !, Atomic = Obj.
 2672to_atomic(Obj, Atomic) :- term_string(Obj, Atomic).
 2673
 2674
 2675%%  add_error_location(+Error, +JSON0, -JSON) is det.
 2676%
 2677%   Add a =location= property if the  error   can  be  associated with a
 2678%   source location. The location is an   object  with properties =file=
 2679%   and =line= and, if available, the character location in the line.
 2680
 2681add_error_location(error(_, file(Path, Line, -1, _CharNo)), Term0, Term) :-
 2682    atom(Path), integer(Line),
 2683    !,
 2684    Term = Term0.put(_{location:_{file:Path, line:Line}}).
 2685add_error_location(error(_, file(Path, Line, Ch, _CharNo)), Term0, Term) :-
 2686    atom(Path), integer(Line), integer(Ch),
 2687    !,
 2688    Term = Term0.put(_{location:_{file:Path, line:Line, ch:Ch}}).
 2689add_error_location(_, Term, Term).
 2690
 2691
 2692%!  event_to_json(+Event, -JSONTerm, +Lang) is semidet.
 2693%
 2694%   Hook that translates a Pengine event  structure into a term suitable
 2695%   for reply_json/1, according to the language specification Lang. This
 2696%   can be used to massage general Prolog terms, notably associated with
 2697%   `success(ID, Bindings, Projection,  Time,   More)`  and  `output(ID,
 2698%   Term)` into a format suitable for processing at the client side.
 2699
 2700%:- multifile pengines:event_to_json/3.
 2701
 2702
 2703                 /*******************************
 2704                 *        ACCESS CONTROL        *
 2705                 *******************************/
 2706
 2707%!  allowed(+Request, +Application) is det.
 2708%
 2709%   Check whether the peer is allowed to connect.  Returns a
 2710%   =forbidden= header if contact is not allowed.
 2711
 2712allowed(Request, Application) :-
 2713    setting(Application:allow_from, Allow),
 2714    match_peer(Request, Allow),
 2715    setting(Application:deny_from, Deny),
 2716    \+ match_peer(Request, Deny),
 2717    !.
 2718allowed(Request, _Application) :-
 2719    memberchk(request_uri(Here), Request),
 2720    throw(http_reply(forbidden(Here))).
 2721
 2722match_peer(_, Allowed) :-
 2723    memberchk(*, Allowed),
 2724    !.
 2725match_peer(_, []) :- !, fail.
 2726match_peer(Request, Allowed) :-
 2727    http_peer(Request, Peer),
 2728    debug(pengine(allow), 'Peer: ~q, Allow: ~q', [Peer, Allowed]),
 2729    (   memberchk(Peer, Allowed)
 2730    ->  true
 2731    ;   member(Pattern, Allowed),
 2732        match_peer_pattern(Pattern, Peer)
 2733    ).
 2734
 2735match_peer_pattern(Pattern, Peer) :-
 2736    ip_term(Pattern, IP),
 2737    ip_term(Peer, IP),
 2738    !.
 2739
 2740ip_term(Peer, Pattern) :-
 2741    split_string(Peer, ".", "", PartStrings),
 2742    ip_pattern(PartStrings, Pattern).
 2743
 2744ip_pattern([], []).
 2745ip_pattern([*], _) :- !.
 2746ip_pattern([S|T0], [N|T]) :-
 2747    number_string(N, S),
 2748    ip_pattern(T0, T).
 2749
 2750
 2751%%  authenticate(+Request, +Application, -UserOptions:list) is det.
 2752%
 2753%   Call authentication_hook/3, returning either `[user(User)]`, `[]` or
 2754%   an exception.
 2755
 2756authenticate(Request, Application, UserOptions) :-
 2757    authentication_hook(Request, Application, User),
 2758    !,
 2759    must_be(ground, User),
 2760    UserOptions = [user(User)].
 2761authenticate(_, _, []).
 2762
 2763%%  authentication_hook(+Request, +Application, -User) is semidet.
 2764%
 2765%   This hook is called  from  the   =/pengine/create=  HTTP  handler to
 2766%   discover whether the server is accessed   by  an authorized user. It
 2767%   can react in three ways:
 2768%
 2769%     - Succeed, binding User to a ground term.  The authentity of the
 2770%       user is available through pengine_user/1.
 2771%     - Fail.  The =/create= succeeds, but the pengine is not associated
 2772%       with a user.
 2773%     - Throw an exception to prevent creation of the pengine.  Two
 2774%       meaningful exceptions are:
 2775%         - throw(http_reply(authorise(basic(Realm))))
 2776%         Start a normal HTTP login challenge (reply 401)
 2777%         - throw(http_reply(forbidden(Path))))
 2778%         Reject the request using a 403 repply.
 2779%
 2780%   @see http_authenticate/3 can be used to implement this hook using
 2781%        default HTTP authentication data.
 2782
 2783pengine_register_user(Options) :-
 2784    option(user(User), Options),
 2785    !,
 2786    pengine_self(Me),
 2787    asserta(pengine_user(Me, User)).
 2788pengine_register_user(_).
 2789
 2790
 2791%%  pengine_user(-User) is semidet.
 2792%
 2793%   True when the pengine was create by  an HTTP request that authorized
 2794%   User.
 2795%
 2796%   @see authentication_hook/3 can be used to extract authorization from
 2797%        the HTTP header.
 2798
 2799pengine_user(User) :-
 2800    pengine_self(Me),
 2801    pengine_user(Me, User).
 2802
 2803%!  reply_options(+Request, +Methods) is semidet.
 2804%
 2805%   Reply the HTTP OPTIONS request
 2806
 2807reply_options(Request, Allowed) :-
 2808    option(method(options), Request),
 2809    !,
 2810    cors_enable(Request,
 2811                [ methods(Allowed)
 2812                ]),
 2813    format('Content-type: text/plain\r\n'),
 2814    format('~n').                   % empty body
 2815
 2816
 2817                 /*******************************
 2818                 *        COMPILE SOURCE        *
 2819                 *******************************/
 2820
 2821/** pengine_src_text(+SrcText, +Module) is det
 2822
 2823Asserts the clauses defined in SrcText in   the  private database of the
 2824current Pengine. This  predicate  processes   the  `src_text'  option of
 2825pengine_create/1.
 2826*/
 2827
 2828pengine_src_text(Src, Module) :-
 2829    pengine_self(Self),
 2830    format(atom(ID), 'pengine://~w/src', [Self]),
 2831    extra_load_options(Self, Options),
 2832    setup_call_cleanup(
 2833        open_chars_stream(Src, Stream),
 2834        load_files(Module:ID,
 2835                   [ stream(Stream),
 2836                     module(Module),
 2837                     silent(true)
 2838                   | Options
 2839                   ]),
 2840        close(Stream)),
 2841    keep_source(Self, ID, Src).
 2842
 2843system:'#file'(File, _Line) :-
 2844    prolog_load_context(stream, Stream),
 2845    set_stream(Stream, file_name(File)),
 2846    set_stream(Stream, record_position(false)),
 2847    set_stream(Stream, record_position(true)).
 2848
 2849%%   pengine_src_url(+URL, +Module) is det
 2850%
 2851%    Asserts the clauses defined in URL in   the private database of the
 2852%    current Pengine. This predicate processes   the `src_url' option of
 2853%    pengine_create/1.
 2854%
 2855%    @tbd: make a sensible guess at the encoding.
 2856
 2857pengine_src_url(URL, Module) :-
 2858    pengine_self(Self),
 2859    uri_encoded(path, URL, Path),
 2860    format(atom(ID), 'pengine://~w/url/~w', [Self, Path]),
 2861    extra_load_options(Self, Options),
 2862    (   get_pengine_application(Self, Application),
 2863        setting(Application:debug_info, false)
 2864    ->  setup_call_cleanup(
 2865            http_open(URL, Stream, []),
 2866            ( set_stream(Stream, encoding(utf8)),
 2867              load_files(Module:ID,
 2868                         [ stream(Stream),
 2869                           module(Module)
 2870                         | Options
 2871                         ])
 2872            ),
 2873            close(Stream))
 2874    ;   setup_call_cleanup(
 2875            http_open(URL, TempStream, []),
 2876            ( set_stream(TempStream, encoding(utf8)),
 2877              read_string(TempStream, _, Src)
 2878            ),
 2879            close(TempStream)),
 2880        setup_call_cleanup(
 2881            open_chars_stream(Src, Stream),
 2882            load_files(Module:ID,
 2883                       [ stream(Stream),
 2884                         module(Module)
 2885                       | Options
 2886                       ]),
 2887            close(Stream)),
 2888        keep_source(Self, ID, Src)
 2889    ).
 2890
 2891
 2892extra_load_options(Pengine, Options) :-
 2893    pengine_not_sandboxed(Pengine),
 2894    !,
 2895    Options = [].
 2896extra_load_options(_, [sandboxed(true)]).
 2897
 2898
 2899keep_source(Pengine, ID, SrcText) :-
 2900    get_pengine_application(Pengine, Application),
 2901    setting(Application:debug_info, true),
 2902    !,
 2903    to_string(SrcText, SrcString),
 2904    assertz(pengine_data(Pengine, source(ID, SrcString))).
 2905keep_source(_, _, _).
 2906
 2907to_string(String, String) :-
 2908    string(String),
 2909    !.
 2910to_string(Atom, String) :-
 2911    atom_string(Atom, String),
 2912    !.
 2913
 2914
 2915                 /*******************************
 2916                 *            MESSAGES          *
 2917                 *******************************/
 2918
 2919prolog:error_message(sandbox(time_limit_exceeded, Limit)) -->
 2920    [ 'Could not prove safety of your goal within ~f seconds.'-[Limit], nl,
 2921      'This is normally caused by an insufficiently instantiated'-[], nl,
 2922      'meta-call (e.g., call(Var)) for which it is too expensive to'-[], nl,
 2923      'find all possible instantations of Var.'-[]
 2924    ]