35
36:- module(thread_httpd,
37 [ http_current_server/2, 38 http_server_property/2, 39 http_server/2, 40 http_workers/2, 41 http_add_worker/2, 42 http_current_worker/2, 43 http_stop_server/2, 44 http_spawn/2, 45
46 http_requeue/1, 47 http_close_connection/1, 48 http_enough_workers/3 49 ]). 50:- use_module(library(debug)). 51:- use_module(library(error)). 52:- use_module(library(option)). 53:- use_module(library(socket)). 54:- use_module(library(thread_pool)). 55:- use_module(library(gensym)). 56:- use_module(http_wrapper). 57:- use_module(http_path). 58
59
60:- predicate_options(http_server/2, 2,
61 [ port(any),
62 tcp_socket(any),
63 workers(positive_integer),
64 timeout(number),
65 keep_alive_timeout(number),
66 ssl(list(any)), 67 pass_to(system:thread_create/3, 3)
68 ]). 69:- predicate_options(http_spawn/2, 2,
70 [ pool(atom),
71 pass_to(system:thread_create/3, 3),
72 pass_to(thread_pool:thread_create_in_pool/4, 4)
73 ]). 74:- predicate_options(http_add_worker/2, 2,
75 [ timeout(number),
76 keep_alive_timeout(number),
77 max_idle_time(number),
78 pass_to(system:thread_create/3, 3)
79 ]). 80
102
103:- meta_predicate
104 http_server(1, :),
105 http_current_server(1, ?),
106 http_spawn(0, +). 107
108:- dynamic
109 current_server/6, 110 queue_worker/2, 111 queue_options/2. 112
113:- multifile
114 make_socket_hook/3,
115 accept_hook/2,
116 close_hook/1,
117 open_client_hook/6,
118 http:create_pool/1,
119 http:schedule_workers/1. 120
174
175http_server(Goal, M:Options0) :-
176 option(port(Port), Options0),
177 !,
178 make_socket(Port, M:Options0, Options),
179 create_workers(Options),
180 create_server(Goal, Port, Options),
181 print_message(informational,
182 httpd_started_server(Port)).
183http_server(_Goal, _Options) :-
184 existence_error(option, port).
185
186
194
195make_socket(Port, Options0, Options) :-
196 make_socket_hook(Port, Options0, Options),
197 !.
198make_socket(Port, _:Options0, Options) :-
199 option(tcp_socket(_), Options0),
200 !,
201 make_addr_atom('httpd', Port, Queue),
202 Options = [ queue(Queue)
203 | Options0
204 ].
205make_socket(Port, _:Options0, Options) :-
206 tcp_socket(Socket),
207 tcp_setopt(Socket, reuseaddr),
208 tcp_bind(Socket, Port),
209 tcp_listen(Socket, 5),
210 make_addr_atom('httpd', Port, Queue),
211 Options = [ queue(Queue),
212 tcp_socket(Socket)
213 | Options0
214 ].
215
220
221make_addr_atom(Scheme, Address, Atom) :-
222 phrase(address_parts(Address), Parts),
223 atomic_list_concat([Scheme,@|Parts], Atom).
224
225address_parts(Atomic) -->
226 { atomic(Atomic) },
227 !,
228 [Atomic].
229address_parts(Host:Port) -->
230 !,
231 address_parts(Host), [:], address_parts(Port).
232address_parts(ip(A,B,C,D)) -->
233 !,
234 [ A, '.', B, '.', C, '.', D ].
235
240
241create_server(Goal, Address, Options) :-
242 get_time(StartTime),
243 memberchk(queue(Queue), Options),
244 scheme(Scheme, Options),
245 address_port(Address, Port),
246 make_addr_atom(Scheme, Port, Alias),
247 thread_create(accept_server(Goal, Options), _,
248 [ alias(Alias)
249 ]),
250 assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)).
251
252scheme(Scheme, Options) :-
253 option(scheme(Scheme), Options),
254 !.
255scheme(Scheme, Options) :-
256 ( option(ssl(_), Options)
257 ; option(ssl_instance(_), Options)
258 ),
259 !,
260 Scheme = https.
261scheme(http, _).
262
263address_port(_Host:Port, Port) :- !.
264address_port(Port, Port).
265
266
272
273http_current_server(Goal, Port) :-
274 current_server(Port, Goal, _, _, _, _).
275
276
289
290http_server_property(_:Port, Property) :-
291 integer(Port),
292 !,
293 server_property(Property, Port).
294http_server_property(Port, Property) :-
295 server_property(Property, Port).
296
297server_property(goal(Goal), Port) :-
298 current_server(Port, Goal, _, _, _, _).
299server_property(scheme(Scheme), Port) :-
300 current_server(Port, _, _, _, Scheme, _).
301server_property(start_time(Time), Port) :-
302 current_server(Port, _, _, _, _, Time).
303
304
311
312http_workers(Port, Workers) :-
313 must_be(ground, Port),
314 current_server(Port, _, _, Queue, _, _),
315 !,
316 ( integer(Workers)
317 -> resize_pool(Queue, Workers)
318 ; findall(W, queue_worker(Queue, W), WorkerIDs),
319 length(WorkerIDs, Workers)
320 ).
321http_workers(Port, _) :-
322 existence_error(http_server, Port).
323
324
334
335http_add_worker(Port, Options) :-
336 must_be(ground, Port),
337 current_server(Port, _, _, Queue, _, _),
338 !,
339 queue_options(Queue, QueueOptions),
340 merge_options(Options, QueueOptions, WorkerOptions),
341 atom_concat(Queue, '_', AliasBase),
342 create_workers(1, 1, Queue, AliasBase, WorkerOptions).
343http_add_worker(Port, _) :-
344 existence_error(http_server, Port).
345
346
353
354http_current_worker(Port, ThreadID) :-
355 current_server(Port, _, _, Queue, _, _),
356 queue_worker(Queue, ThreadID).
357
358
363
364accept_server(Goal, Options) :-
365 catch(accept_server2(Goal, Options), http_stop, true),
366 thread_self(Thread),
367 retract(current_server(_Port, _, Thread, _Queue, _Scheme, _StartTime)),
368 close_server_socket(Options).
369
370accept_server2(Goal, Options) :-
371 repeat,
372 ( catch(accept_server3(Goal, Options), E, true)
373 -> ( var(E)
374 -> fail
375 ; accept_rethrow_error(E)
376 -> throw(E)
377 ; print_message(error, E),
378 fail
379 )
380 ; print_message(error, 381 goal_failed(accept_server3(Goal, Options))),
382 fail
383 ).
384
385accept_server3(Goal, Options) :-
386 accept_hook(Goal, Options),
387 !.
388accept_server3(Goal, Options) :-
389 memberchk(tcp_socket(Socket), Options),
390 memberchk(queue(Queue), Options),
391 tcp_accept(Socket, Client, Peer),
392 debug(http(connection), 'New HTTP connection from ~p', [Peer]),
393 http_enough_workers(Queue, accept, Peer),
394 thread_send_message(Queue, tcp_client(Client, Goal, Peer)).
395
396accept_rethrow_error(http_stop).
397accept_rethrow_error('$aborted').
398
399
403
404close_server_socket(Options) :-
405 close_hook(Options),
406 !.
407close_server_socket(Options) :-
408 memberchk(tcp_socket(Socket), Options),
409 !,
410 tcp_close_socket(Socket).
411
412
419
420http_stop_server(Host:Port, Options) :- 421 ground(Host),
422 !,
423 http_stop_server(Port, Options).
424http_stop_server(Port, _Options) :-
425 http_workers(Port, 0), 426 current_server(Port, _, Thread, Queue, _Scheme, _Start),
427 retractall(queue_options(Queue, _)),
428 thread_signal(Thread, throw(http_stop)),
429 catch(connect(localhost:Port), _, true),
430 thread_join(Thread, _),
431 message_queue_destroy(Queue).
432
433connect(Address) :-
434 setup_call_cleanup(
435 tcp_socket(Socket),
436 tcp_connect(Socket, Address),
437 tcp_close_socket(Socket)).
438
444
445http_enough_workers(Queue, Why, Peer) :-
446 message_queue_property(Queue, size(Size)),
447 ( enough(Size, Why)
448 -> true
449 ; current_server(Port, _, _, Queue, _, _),
450 catch(http:schedule_workers(_{port:Port,
451 reason:Why,
452 peer:Peer,
453 waiting:Size}),
454 Error,
455 print_message(error, Error))
456 -> true
457 ; true
458 ).
459
460enough(0, _).
461enough(1, keep_alive). 462
463
487
488
489 492
497
498create_workers(Options) :-
499 option(workers(N), Options, 5),
500 option(queue(Queue), Options),
501 catch(message_queue_create(Queue), _, true),
502 atom_concat(Queue, '_', AliasBase),
503 create_workers(1, N, Queue, AliasBase, Options),
504 assert(queue_options(Queue, Options)).
505
506create_workers(I, N, _, _, _) :-
507 I > N,
508 !.
509create_workers(I, N, Queue, AliasBase, Options) :-
510 gensym(AliasBase, Alias),
511 thread_create(http_worker(Options), Id,
512 [ alias(Alias)
513 | Options
514 ]),
515 assertz(queue_worker(Queue, Id)),
516 I2 is I + 1,
517 create_workers(I2, N, Queue, AliasBase, Options).
518
519
524
525resize_pool(Queue, Size) :-
526 findall(W, queue_worker(Queue, W), Workers),
527 length(Workers, Now),
528 ( Now < Size
529 -> queue_options(Queue, Options),
530 atom_concat(Queue, '_', AliasBase),
531 I0 is Now+1,
532 create_workers(I0, Size, Queue, AliasBase, Options)
533 ; Now == Size
534 -> true
535 ; Now > Size
536 -> Excess is Now - Size,
537 thread_self(Me),
538 forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
539 forall(between(1, Excess, _), thread_get_message(quitted(_)))
540 ).
541
542
550
551http_worker(Options) :-
552 thread_at_exit(done_worker),
553 option(queue(Queue), Options),
554 option(max_idle_time(MaxIdle), Options, infinite),
555 repeat,
556 garbage_collect,
557 trim_stacks,
558 debug(http(worker), 'Waiting for a job ...', []),
559 ( MaxIdle == infinite
560 -> thread_get_message(Queue, Message)
561 ; thread_get_message(Queue, Message, [timeout(MaxIdle)])
562 -> true
563 ; Message = quit(idle)
564 ),
565 debug(http(worker), 'Got job ~p', [Message]),
566 ( Message = quit(Sender)
567 -> !,
568 thread_self(Self),
569 thread_detach(Self),
570 ( Sender == idle
571 -> true
572 ; thread_send_message(Sender, quitted(Self))
573 )
574 ; open_client(Message, Queue, Goal, In, Out,
575 Options, ClientOptions),
576 ( catch(http_process(Goal, In, Out, ClientOptions),
577 Error, true)
578 -> true
579 ; Error = goal_failed(http_process/4)
580 ),
581 ( var(Error)
582 -> fail
583 ; current_message_level(Error, Level),
584 print_message(Level, Error),
585 memberchk(peer(Peer), ClientOptions),
586 close_connection(Peer, In, Out),
587 fail
588 )
589 ).
590
591
597
598open_client(requeue(In, Out, Goal, ClOpts),
599 _, Goal, In, Out, Opts, ClOpts) :-
600 !,
601 memberchk(peer(Peer), ClOpts),
602 option(keep_alive_timeout(KeepAliveTMO), Opts, 2),
603 check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out).
604open_client(Message, Queue, Goal, In, Out, Opts,
605 [ pool(client(Queue, Goal, In, Out)),
606 timeout(Timeout)
607 | Options
608 ]) :-
609 catch(open_client(Message, Goal, In, Out, Options, Opts),
610 E, report_error(E)),
611 option(timeout(Timeout), Opts, 60),
612 ( debugging(http(connection))
613 -> memberchk(peer(Peer), Options),
614 debug(http(connection), 'Opened connection from ~p', [Peer])
615 ; true
616 ).
617
618
621
622open_client(Message, Goal, In, Out, ClientOptions, Options) :-
623 open_client_hook(Message, Goal, In, Out, ClientOptions, Options),
624 !.
625open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out,
626 [ peer(Peer),
627 protocol(http)
628 ], _) :-
629 tcp_open_socket(Socket, In, Out).
630
631report_error(E) :-
632 print_message(error, E),
633 fail.
634
635
641
642check_keep_alive_connection(In, TMO, Peer, In, Out) :-
643 stream_property(In, timeout(Old)),
644 set_stream(In, timeout(TMO)),
645 debug(http(keep_alive), 'Waiting for keep-alive ...', []),
646 catch(peek_code(In, Code), E, true),
647 ( var(E), 648 Code \== -1 649 -> set_stream(In, timeout(Old)),
650 debug(http(keep_alive), '\tre-using keep-alive connection', [])
651 ; ( Code == -1
652 -> debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
653 ; debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
654 ),
655 close_connection(Peer, In, Out),
656 fail
657 ).
658
659
665
666done_worker :-
667 thread_self(Self),
668 thread_property(Self, status(Status)),
669 retract(queue_worker(Queue, Self)),
670 ( catch(recreate_worker(Status, Queue), _, fail)
671 -> thread_detach(Self),
672 print_message(informational,
673 httpd_restarted_worker(Self))
674 ; done_status_message_level(Status, Level),
675 print_message(Level,
676 httpd_stopped_worker(Self, Status))
677 ).
678
679done_status_message_level(true, silent) :- !.
680done_status_message_level(exception('$aborted'), silent) :- !.
681done_status_message_level(_, informational).
682
683
695
696recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :-
697 halt(2).
698recreate_worker(exception(Error), Queue) :-
699 recreate_on_error(Error),
700 queue_options(Queue, Options),
701 atom_concat(Queue, '_', AliasBase),
702 create_workers(1, 1, Queue, AliasBase, Options).
703
704recreate_on_error('$aborted').
705recreate_on_error(time_limit_exceeded).
706
713
714:- multifile
715 message_level/2. 716
717message_level(error(io_error(read, _), _), silent).
718message_level(error(timeout_error(read, _), _), informational).
719message_level(keep_alive_timeout, silent).
720
721current_message_level(Term, Level) :-
722 ( message_level(Term, Level)
723 -> true
724 ; Level = error
725 ).
726
727
732
733http_requeue(Header) :-
734 requeue_header(Header, ClientOptions),
735 memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions),
736 memberchk(peer(Peer), ClientOptions),
737 http_enough_workers(Queue, keep_alive, Peer),
738 thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)),
739 !.
740http_requeue(Header) :-
741 debug(http(error), 'Re-queue failed: ~p', [Header]),
742 fail.
743
([], []).
745requeue_header([H|T0], [H|T]) :-
746 requeue_keep(H),
747 !,
748 requeue_header(T0, T).
749requeue_header([_|T0], T) :-
750 requeue_header(T0, T).
751
752requeue_keep(pool(_)).
753requeue_keep(peer(_)).
754requeue_keep(protocol(_)).
755
756
760
761http_process(Goal, In, Out, Options) :-
762 debug(http(server), 'Running server goal ~p on ~p -> ~p',
763 [Goal, In, Out]),
764 option(timeout(TMO), Options, 60),
765 set_stream(In, timeout(TMO)),
766 set_stream(Out, timeout(TMO)),
767 http_wrapper(Goal, In, Out, Connection,
768 [ request(Request)
769 | Options
770 ]),
771 next(Connection, Request).
772
773next(switch_protocol(SwitchGoal, _SwitchOptions), Request) :-
774 !,
775 memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
776 ( catch(call(SwitchGoal, In, Out), E,
777 ( print_message(error, E),
778 fail))
779 -> true
780 ; http_close_connection(Request)
781 ).
782next(spawned(ThreadId), _) :-
783 !,
784 debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]).
785next(Connection, Request) :-
786 downcase_atom(Connection, 'keep-alive'),
787 http_requeue(Request),
788 !.
789next(_, Request) :-
790 http_close_connection(Request).
791
792
796
797http_close_connection(Request) :-
798 memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
799 memberchk(peer(Peer), Request),
800 close_connection(Peer, In, Out).
801
806
807close_connection(Peer, In, Out) :-
808 debug(http(connection), 'Closing connection from ~p', [Peer]),
809 catch(close(In, [force(true)]), _, true),
810 catch(close(Out, [force(true)]), _, true).
811
827
828http_spawn(Goal, Options) :-
829 select_option(pool(Pool), Options, ThreadOptions),
830 !,
831 current_output(CGI),
832 catch(thread_create_in_pool(Pool,
833 wrap_spawned(CGI, Goal), Id,
834 [ detached(true)
835 | ThreadOptions
836 ]),
837 Error,
838 true),
839 ( var(Error)
840 -> http_spawned(Id)
841 ; Error = error(resource_error(threads_in_pool(_)), _)
842 -> throw(http_reply(busy))
843 ; Error = error(existence_error(thread_pool, Pool), _),
844 create_pool(Pool)
845 -> http_spawn(Goal, Options)
846 ; throw(Error)
847 ).
848http_spawn(Goal, Options) :-
849 current_output(CGI),
850 thread_create(wrap_spawned(CGI, Goal), Id,
851 [ detached(true)
852 | Options
853 ]),
854 http_spawned(Id).
855
856wrap_spawned(CGI, Goal) :-
857 set_output(CGI),
858 http_wrap_spawned(Goal, Request, Connection),
859 next(Connection, Request).
860
868
869create_pool(Pool) :-
870 E = error(permission_error(create, thread_pool, Pool), _),
871 catch(http:create_pool(Pool), E, true).
872create_pool(Pool) :-
873 print_message(informational, httpd(created_pool(Pool))),
874 thread_pool_create(Pool, 10, []).
875
876
877
878 881
882:- multifile
883 prolog:message/3. 884
885prolog:message(httpd_started_server(Port)) -->
886 [ 'Started server at '-[] ],
887 http_root(Port).
888prolog:message(httpd_stopped_worker(Self, Status)) -->
889 [ 'Stopped worker ~p: ~p'-[Self, Status] ].
890prolog:message(httpd_restarted_worker(Self)) -->
891 [ 'Replaced aborted worker ~p'-[Self] ].
892prolog:message(httpd(created_pool(Pool))) -->
893 [ 'Created thread-pool ~p of size 10'-[Pool], nl,
894 'Create this pool at startup-time or define the hook ', nl,
895 'http:create_pool/1 to avoid this message and create a ', nl,
896 'pool that fits the usage-profile.'
897 ].
898
899http_root(Host:Port) -->
900 !,
901 http_scheme(Port),
902 { http_absolute_location(root(.), URI, []) },
903 [ '~w:~w~w'-[Host, Port, URI] ].
904http_root(Port) -->
905 http_scheme(Port),
906 { http_absolute_location(root(.), URI, []) },
907 [ 'localhost:~w~w'-[Port, URI] ].
908
909http_scheme(Port) -->
910 { http_server_property(Port, scheme(Scheme)) },
911 [ '~w://'-[Scheme] ]