1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2002-2016, University of Amsterdam 7 VU University Amsterdam 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36:- module(thread_httpd, 37 [ http_current_server/2, % ?:Goal, ?Port 38 http_server_property/2, % ?Port, ?Property 39 http_server/2, % :Goal, +Options 40 http_workers/2, % +Port, ?WorkerCount 41 http_add_worker/2, % +Port, +Options 42 http_current_worker/2, % ?Port, ?ThreadID 43 http_stop_server/2, % +Port, +Options 44 http_spawn/2, % :Goal, +Options 45 46 http_requeue/1, % +Request 47 http_close_connection/1, % +Request 48 http_enough_workers/3 % +Queue, +Why, +Peer 49 ]). 50:- use_module(library(debug)). 51:- use_module(library(error)). 52:- use_module(library(option)). 53:- use_module(library(socket)). 54:- use_module(library(thread_pool)). 55:- use_module(library(gensym)). 56:- use_module(http_wrapper). 57:- use_module(http_path). 58 59 60:- predicate_options(http_server/2, 2, 61 [ port(any), 62 tcp_socket(any), 63 workers(positive_integer), 64 timeout(number), 65 keep_alive_timeout(number), 66 ssl(list(any)), % if http/http_ssl_plugin is loaded 67 pass_to(system:thread_create/3, 3) 68 ]). 69:- predicate_options(http_spawn/2, 2, 70 [ pool(atom), 71 pass_to(system:thread_create/3, 3), 72 pass_to(thread_pool:thread_create_in_pool/4, 4) 73 ]). 74:- predicate_options(http_add_worker/2, 2, 75 [ timeout(number), 76 keep_alive_timeout(number), 77 max_idle_time(number), 78 pass_to(system:thread_create/3, 3) 79 ]).
103:- meta_predicate 104 http_server( , ), 105 http_current_server( , ), 106 http_spawn( , ). 107 108:- dynamic 109 current_server/6, % Port, Goal, Thread, Queue, Scheme, StartTime 110 queue_worker/2, % Queue, ThreadID 111 queue_options/2. % Queue, Options 112 113:- multifile 114 make_socket_hook/3, 115 accept_hook/2, 116 close_hook/1, 117 open_client_hook/6, 118 http:create_pool/1, 119 http:schedule_workers/1.
main
thread. As of version 5.9 stacks are no longer
pre-allocated and the given sizes only act as a limit.
If you need to control resource usage look at the spawn
option of http_handler/3 and library(thread_pool)
.A typical initialization for an HTTP server that uses http_dispatch/1 to relay requests to predicates is:
:- use_module(library(http/thread_httpd)). :- use_module(library(http/http_dispatch)). start_server(Port) :- http_server(http_dispatch, [port(Port)]).
Note that multiple servers can coexist in the same Prolog process. A notable application of this is to have both an HTTP and HTTPS server, where the HTTP server redirects to the HTTPS server for handling sensitive requests.
175http_server(Goal, M:Options0) :- 176 option(port(Port), Options0), 177 !, 178 make_socket(Port, M:Options0, Options), 179 create_workers(Options), 180 create_server(Goal, Port, Options), 181 print_message(informational, 182 httpd_started_server(Port)). 183http_server(_Goal, _Options) :- 184 existence_error(option, port).
queue(QueueId)
.
195make_socket(Port, Options0, Options) :- 196 make_socket_hook(Port, Options0, Options), 197 !. 198make_socket(Port, _:Options0, Options) :- 199 option(tcp_socket(_), Options0), 200 !, 201 make_addr_atom('httpd', Port, Queue), 202 Options = [ queue(Queue) 203 | Options0 204 ]. 205make_socket(Port, _:Options0, Options) :- 206 tcp_socket(Socket), 207 tcp_setopt(Socket, reuseaddr), 208 tcp_bind(Socket, Port), 209 tcp_listen(Socket, 5), 210 make_addr_atom('httpd', Port, Queue), 211 Options = [ queue(Queue), 212 tcp_socket(Socket) 213 | Options0 214 ].
221make_addr_atom(Scheme, Address, Atom) :- 222 phrase(address_parts(Address), Parts), 223 atomic_list_concat([Scheme,@|Parts], Atom). 224 225address_parts(Atomic) --> 226 { atomic(Atomic) }, 227 !, 228 [Atomic]. 229address_parts(Host:Port) --> 230 !, 231 address_parts(Host), [:], address_parts(Port). 232address_parts(ip(A,B,C,D)) --> 233 !, 234 [ A, '.', B, '.', C, '.', D ].
241create_server(Goal, Address, Options) :- 242 get_time(StartTime), 243 memberchk(queue(Queue), Options), 244 scheme(Scheme, Options), 245 address_port(Address, Port), 246 make_addr_atom(Scheme, Port, Alias), 247 thread_create(accept_server(Goal, Options), _, 248 [ alias(Alias) 249 ]), 250 assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)). 251 252scheme(Scheme, Options) :- 253 option(scheme(Scheme), Options), 254 !. 255scheme(Scheme, Options) :- 256 ( option(ssl(_), Options) 257 ; option(ssl_instance(_), Options) 258 ), 259 !, 260 Scheme = https. 261scheme(http, _). 262 263address_port(_Host:Port, Port) :- !. 264address_port(Port, Port).
273http_current_server(Goal, Port) :-
274 current_server(Port, Goal, _, _, _, _).
http
or https
.290http_server_property(_:Port, Property) :- 291 integer(Port), 292 !, 293 server_property(Property, Port). 294http_server_property(Port, Property) :- 295 server_property(Property, Port). 296 297server_property(goal(Goal), Port) :- 298 current_server(Port, Goal, _, _, _, _). 299server_property(scheme(Scheme), Port) :- 300 current_server(Port, _, _, _, Scheme, _). 301server_property(start_time(Time), Port) :- 302 current_server(Port, _, _, _, _, Time).
312http_workers(Port, Workers) :- 313 must_be(ground, Port), 314 current_server(Port, _, _, Queue, _, _), 315 !, 316 ( integer(Workers) 317 -> resize_pool(Queue, Workers) 318 ; findall(W, queue_worker(Queue, W), WorkerIDs), 319 length(WorkerIDs, Workers) 320 ). 321http_workers(Port, _) :- 322 existence_error(http_server, Port).
335http_add_worker(Port, Options) :- 336 must_be(ground, Port), 337 current_server(Port, _, _, Queue, _, _), 338 !, 339 queue_options(Queue, QueueOptions), 340 merge_options(Options, QueueOptions, WorkerOptions), 341 atom_concat(Queue, '_', AliasBase), 342 create_workers(1, 1, Queue, AliasBase, WorkerOptions). 343http_add_worker(Port, _) :- 344 existence_error(http_server, Port).
354http_current_worker(Port, ThreadID) :-
355 current_server(Port, _, _, Queue, _, _),
356 queue_worker(Queue, ThreadID).
364accept_server(Goal, Options) :- 365 catch(accept_server2(Goal, Options), http_stop, true), 366 thread_self(Thread), 367 retract(current_server(_Port, _, Thread, _Queue, _Scheme, _StartTime)), 368 close_server_socket(Options). 369 370accept_server2(Goal, Options) :- 371 repeat, 372 ( catch(accept_server3(Goal, Options), E, true) 373 -> ( var(E) 374 -> fail 375 ; accept_rethrow_error(E) 376 -> throw(E) 377 ; print_message(error, E), 378 fail 379 ) 380 ; print_message(error, % internal error 381 goal_failed(accept_server3(Goal, Options))), 382 fail 383 ). 384 385accept_server3(Goal, Options) :- 386 accept_hook(Goal, Options), 387 !. 388accept_server3(Goal, Options) :- 389 memberchk(tcp_socket(Socket), Options), 390 memberchk(queue(Queue), Options), 391 tcp_accept(Socket, Client, Peer), 392 debug(http(connection), 'New HTTP connection from ~p', [Peer]), 393 http_enough_workers(Queue, accept, Peer), 394 thread_send_message(Queue, tcp_client(Client, Goal, Peer)). 395 396accept_rethrow_error(http_stop). 397accept_rethrow_error('$aborted').
404close_server_socket(Options) :- 405 close_hook(Options), 406 !. 407close_server_socket(Options) :- 408 memberchk(tcp_socket(Socket), Options), 409 !, 410 tcp_close_socket(Socket).
420http_stop_server(Host:Port, Options) :- % e.g., localhost:4000 421 ground(Host), 422 !, 423 http_stop_server(Port, Options). 424http_stop_server(Port, _Options) :- 425 http_workers(Port, 0), % checks Port is ground 426 current_server(Port, _, Thread, Queue, _Scheme, _Start), 427 retractall(queue_options(Queue, _)), 428 thread_signal(Thread, throw(http_stop)), 429 catch(connect(localhost:Port), _, true), 430 thread_join(Thread, _), 431 message_queue_destroy(Queue). 432 433connect(Address) :- 434 setup_call_cleanup( 435 tcp_socket(Socket), 436 tcp_connect(Socket, Address), 437 tcp_close_socket(Socket)).
445http_enough_workers(Queue, Why, Peer) :- 446 message_queue_property(Queue, size(Size)), 447 ( enough(Size, Why) 448 -> true 449 ; current_server(Port, _, _, Queue, _, _), 450 catch(http:schedule_workers(_{port:Port, 451 reason:Why, 452 peer:Peer, 453 waiting:Size}), 454 Error, 455 print_message(error, Error)) 456 -> true 457 ; true 458 ). 459 460enough(0, _). 461enough(1, keep_alive). % I will be ready myself
accept
for a new connection or keep_alive
if a
worker tries to reschedule itself.
Note that, when called with reason:accept
, we are called in
the time critical main accept loop. An implementation of this
hook shall typically send the event to thread dedicated to
dynamic worker-pool management.
489 /******************************* 490 * WORKER QUEUE OPERATIONS * 491 *******************************/
498create_workers(Options) :- 499 option(workers(N), Options, 5), 500 option(queue(Queue), Options), 501 catch(message_queue_create(Queue), _, true), 502 atom_concat(Queue, '_', AliasBase), 503 create_workers(1, N, Queue, AliasBase, Options), 504 assert(queue_options(Queue, Options)). 505 506create_workers(I, N, _, _, _) :- 507 I > N, 508 !. 509create_workers(I, N, Queue, AliasBase, Options) :- 510 gensym(AliasBase, Alias), 511 thread_create(http_worker(Options), Id, 512 [ alias(Alias) 513 | Options 514 ]), 515 assertz(queue_worker(Queue, Id)), 516 I2 is I + 1, 517 create_workers(I2, N, Queue, AliasBase, Options).
525resize_pool(Queue, Size) :-
526 findall(W, queue_worker(Queue, W), Workers),
527 length(Workers, Now),
528 ( Now < Size
529 -> queue_options(Queue, Options),
530 atom_concat(Queue, '_', AliasBase),
531 I0 is Now+1,
532 create_workers(I0, Size, Queue, AliasBase, Options)
533 ; Now == Size
534 -> true
535 ; Now > Size
536 -> Excess is Now - Size,
537 thread_self(Me),
538 forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
539 forall(between(1, Excess, _), thread_get_message(quitted(_)))
540 ).
If the message quit(Sender)
is read from the queue, the worker
stops.
551http_worker(Options) :-
552 thread_at_exit(done_worker),
553 option(queue(Queue), Options),
554 option(max_idle_time(MaxIdle), Options, infinite),
555 repeat,
556 garbage_collect,
557 trim_stacks,
558 debug(http(worker), 'Waiting for a job ...', []),
559 ( MaxIdle == infinite
560 -> thread_get_message(Queue, Message)
561 ; thread_get_message(Queue, Message, [timeout(MaxIdle)])
562 -> true
563 ; Message = quit(idle)
564 ),
565 debug(http(worker), 'Got job ~p', [Message]),
566 ( Message = quit(Sender)
567 -> !,
568 thread_self(Self),
569 thread_detach(Self),
570 ( Sender == idle
571 -> true
572 ; thread_send_message(Sender, quitted(Self))
573 )
574 ; open_client(Message, Queue, Goal, In, Out,
575 Options, ClientOptions),
576 ( catch(http_process(Goal, In, Out, ClientOptions),
577 Error, true)
578 -> true
579 ; Error = goal_failed(http_process/4)
580 ),
581 ( var(Error)
582 -> fail
583 ; current_message_level(Error, Level),
584 print_message(Level, Error),
585 memberchk(peer(Peer), ClientOptions),
586 close_connection(Peer, In, Out),
587 fail
588 )
589 ).
598open_client(requeue(In, Out, Goal, ClOpts), 599 _, Goal, In, Out, Opts, ClOpts) :- 600 !, 601 memberchk(peer(Peer), ClOpts), 602 option(keep_alive_timeout(KeepAliveTMO), Opts, 2), 603 check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out). 604open_client(Message, Queue, Goal, In, Out, Opts, 605 [ pool(client(Queue, Goal, In, Out)), 606 timeout(Timeout) 607 | Options 608 ]) :- 609 catch(open_client(Message, Goal, In, Out, Options, Opts), 610 E, report_error(E)), 611 option(timeout(Timeout), Opts, 60), 612 ( debugging(http(connection)) 613 -> memberchk(peer(Peer), Options), 614 debug(http(connection), 'Opened connection from ~p', [Peer]) 615 ; true 616 ).
622open_client(Message, Goal, In, Out, ClientOptions, Options) :- 623 open_client_hook(Message, Goal, In, Out, ClientOptions, Options), 624 !. 625open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out, 626 [ peer(Peer), 627 protocol(http) 628 ], _) :- 629 tcp_open_socket(Socket, In, Out). 630 631report_error(E) :- 632 print_message(error, E), 633 fail.
642check_keep_alive_connection(In, TMO, Peer, In, Out) :-
643 stream_property(In, timeout(Old)),
644 set_stream(In, timeout(TMO)),
645 debug(http(keep_alive), 'Waiting for keep-alive ...', []),
646 catch(peek_code(In, Code), E, true),
647 ( var(E), % no exception
648 Code \== -1 % no end-of-file
649 -> set_stream(In, timeout(Old)),
650 debug(http(keep_alive), '\tre-using keep-alive connection', [])
651 ; ( Code == -1
652 -> debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
653 ; debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
654 ),
655 close_connection(Peer, In, Out),
656 fail
657 ).
666done_worker :- 667 thread_self(Self), 668 thread_property(Self, status(Status)), 669 retract(queue_worker(Queue, Self)), 670 ( catch(recreate_worker(Status, Queue), _, fail) 671 -> thread_detach(Self), 672 print_message(informational, 673 httpd_restarted_worker(Self)) 674 ; done_status_message_level(Status, Level), 675 print_message(Level, 676 httpd_stopped_worker(Self, Status)) 677 ). 678 679done_status_message_level(true, silent) :- !. 680done_status_message_level(exception('$aborted'), silent) :- !. 681done_status_message_level(_, informational).
The first clause deals with the possibility that we cannot write to
user_error
. This is possible when Prolog is started as a service
using some service managers. Would be nice if we could write an
error, but where?
696recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :- 697 halt(2). 698recreate_worker(exception(Error), Queue) :- 699 recreate_on_error(Error), 700 queue_options(Queue, Options), 701 atom_concat(Queue, '_', AliasBase), 702 create_workers(1, 1, Queue, AliasBase, Options). 703 704recreate_on_error('$aborted'). 705recreate_on_error(time_limit_exceeded). 706 707% thread_httpd:message_level(+Exception, -Level) 708% 709% Determine the message stream used for exceptions that may occur 710% during server_loop/5. Being multifile, clauses can be added by 711% the application to refine error handling. See also 712% message_hook/3 for further programming error handling. 713 714:- multifile 715 message_level/2. 716 717message_level(error(io_error(read, _), _), silent). 718message_level(error(timeout_error(read, _), _), informational). 719message_level(keep_alive_timeout, silent). 720 721current_message_level(Term, Level) :- 722 ( message_level(Term, Level) 723 -> true 724 ; Level = error 725 ).
733http_requeue(Header) :- 734 requeue_header(Header, ClientOptions), 735 memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions), 736 memberchk(peer(Peer), ClientOptions), 737 http_enough_workers(Queue, keep_alive, Peer), 738 thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)), 739 !. 740http_requeue(Header) :- 741 debug(http(error), 'Re-queue failed: ~p', [Header]), 742 fail. 743 744requeue_header([], []). 745requeue_header([H|T0], [H|T]) :- 746 requeue_keep(H), 747 !, 748 requeue_header(T0, T). 749requeue_header([_|T0], T) :- 750 requeue_header(T0, T). 751 752requeue_keep(pool(_)). 753requeue_keep(peer(_)). 754requeue_keep(protocol(_)).
761http_process(Goal, In, Out, Options) :- 762 debug(http(server), 'Running server goal ~p on ~p -> ~p', 763 [Goal, In, Out]), 764 option(timeout(TMO), Options, 60), 765 set_stream(In, timeout(TMO)), 766 set_stream(Out, timeout(TMO)), 767 http_wrapper(, In, Out, Connection, 768 [ request(Request) 769 | Options 770 ]), 771 next(Connection, Request). 772 773next(switch_protocol(SwitchGoal, _SwitchOptions), Request) :- 774 !, 775 memberchk(pool(client(_Queue, _Goal, In, Out)), Request), 776 ( catch(call(SwitchGoal, In, Out), E, 777 ( print_message(error, E), 778 fail)) 779 -> true 780 ; http_close_connection(Request) 781 ). 782next(spawned(ThreadId), _) :- 783 !, 784 debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]). 785next(Connection, Request) :- 786 downcase_atom(Connection, 'keep-alive'), 787 http_requeue(Request), 788 !. 789next(_, Request) :- 790 http_close_connection(Request).
797http_close_connection(Request) :-
798 memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
799 memberchk(peer(Peer), Request),
800 close_connection(Peer, In, Out).
807close_connection(Peer, In, Out) :-
808 debug(http(connection), 'Closing connection from ~p', [Peer]),
809 catch(close(In, [force(true)]), _, true),
810 catch(close(Out, [force(true)]), _, true).
library(thread_pool)
, starting the thread
on the given pool.If a pool does not exist, this predicate calls the multifile hook http:create_pool/1 to create it. If this predicate succeeds the operation is retried.
828http_spawn(Goal, Options) :- 829 select_option(pool(Pool), Options, ThreadOptions), 830 !, 831 current_output(CGI), 832 catch(thread_create_in_pool(Pool, 833 wrap_spawned(CGI, Goal), Id, 834 [ detached(true) 835 | ThreadOptions 836 ]), 837 Error, 838 true), 839 ( var(Error) 840 -> http_spawned(Id) 841 ; Error = error(resource_error(threads_in_pool(_)), _) 842 -> throw(http_reply(busy)) 843 ; Error = error(existence_error(thread_pool, Pool), _), 844 create_pool(Pool) 845 -> http_spawn(, Options) 846 ; throw(Error) 847 ). 848http_spawn(Goal, Options) :- 849 current_output(CGI), 850 thread_create(wrap_spawned(CGI, Goal), Id, 851 [ detached(true) 852 | Options 853 ]), 854 http_spawned(Id). 855 856wrap_spawned(CGI, Goal) :- 857 set_output(CGI), 858 http_wrap_spawned(Goal, Request, Connection), 859 next(Connection, Request).
869create_pool(Pool) :- 870 E = error(permission_error(create, thread_pool, Pool), _), 871 catch(http:create_pool(Pool), E, true). 872create_pool(Pool) :- 873 print_message(informational, httpd(created_pool(Pool))), 874 thread_pool_create(Pool, 10, []). 875 876 877 878 /******************************* 879 * MESSAGES * 880 *******************************/ 881 882:- multifile 883 prolog:message/3. 884 885prologmessage(httpd_started_server(Port)) --> 886 [ 'Started server at '-[] ], 887 http_root(Port). 888prologmessage(httpd_stopped_worker(Self, Status)) --> 889 [ 'Stopped worker ~p: ~p'-[Self, Status] ]. 890prologmessage(httpd_restarted_worker(Self)) --> 891 [ 'Replaced aborted worker ~p'-[Self] ]. 892prologmessage(httpd(created_pool(Pool))) --> 893 [ 'Created thread-pool ~p of size 10'-[Pool], nl, 894 'Create this pool at startup-time or define the hook ', nl, 895 'http:create_pool/1 to avoid this message and create a ', nl, 896 'pool that fits the usage-profile.' 897 ]. 898 899http_root(Host:Port) --> 900 !, 901 http_scheme(Port), 902 { http_absolute_location(root(.), URI, []) }, 903 [ '~w:~w~w'-[Host, Port, URI] ]. 904http_root(Port) --> 905 http_scheme(Port), 906 { http_absolute_location(root(.), URI, []) }, 907 [ 'localhost:~w~w'-[Port, URI] ]. 908 909http_scheme(Port) --> 910 { http_server_property(Port, scheme(Scheme)) }, 911 [ '~w://'-[Scheme] ]
Threaded HTTP server
This library defines the HTTP server frontend of choice for SWI-Prolog. It is based on the multi-threading capabilities of SWI-Prolog and thus exploits multiple cores to serve requests concurrently. The server scales well and can cooperate with
library(thread_pool)
to control the number of concurrent requests of a given type. For example, it can be configured to handle 200 file download requests concurrently, 2 requests that potentially uses a lot of memory and 8 requests that use a lot of CPU resources.On Unix systems, this library can be combined with
library(http/http_unix_daemon)
to realise a proper Unix service process that creates a web server at port 80, runs under a specific account, optionally detaches from the controlling terminal, etc.Combined with
library(http/http_ssl_plugin)
from the SSL package, this library can be used to create an HTTPS server. See <plbase>/doc/packages/examples/ssl/https for an example server using a self-signed SSL certificate. */