View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2014-2016, VU University Amsterdam
    7                              CWI Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(hub,
   37          [ hub_create/3,               % +HubName, -Hub, +Options
   38            hub_add/3,                  % +HubName, +Websocket, ?Id
   39            hub_send/2,                 % +ClientId, +Message
   40            hub_broadcast/2,            % +HubName, +Message
   41            hub_broadcast/3,            % +HubName, +Message, +Condition
   42            current_hub/2               % ?HubName, ?Hub
   43          ]).   44:- use_module(library(debug)).   45:- use_module(library(error)).   46:- use_module(library(apply)).   47:- use_module(library(gensym)).   48:- use_module(library(uuid)).   49:- use_module(library(ordsets)).   50:- use_module(library(http/websocket)).   51
   52:- meta_predicate
   53    hub_broadcast(+,+,1).   54
   55/** <module> Manage a hub for websockets
   56
   57This library manages a hub that consists   of clients that are connected
   58using a websocket. Messages arriving at any   of the websockets are sent
   59to the _event_ queue  of  the  hub.   In  addition,  the  hub provides a
   60_broadcast_ interface. A typical usage scenario  for   a  hub is a _chat
   61server_ A scenario for realizing an chat server is:
   62
   63  1. Create a new hub using hub_create/3.
   64  2. Create one or more threads that listen to Hub.queues.event from
   65     the created hub.  These threads can update the shared view of the
   66     world. A message is a dict as returned by ws_receive/2 or a
   67     hub control message. Currently, the following control messages
   68     are defined:
   69
   70       - hub{left:ClientId, reason:Reason, error:Error}
   71       A client left us because of an I/O error.  Reason is =read=
   72       or =write= and Error is the Prolog I/O exception.
   73
   74       - hub{joined:ClientId}
   75       A new client has joined the chatroom.
   76
   77     The thread(s) can talk to clients using two predicates:
   78
   79       - hub_send/2 sends a message to a specific client
   80       - hub_broadcast/2 sends a message to all clients of the
   81         hub.
   82
   83A hub consists of (currenty) four message   queues  and a simple dynamic
   84fact. Threads that are needed for the communication tasks are created on
   85demand and die if no more work needs to be done.
   86
   87@tbd    The current design does not use threads to perform tasks for
   88        multiple hubs.  This implies that the design scales rather
   89        poorly for hosting many hubs with few users.
   90*/
   91
   92:- dynamic
   93    hub/2,                          % Hub, Queues ...
   94    websocket/5.                    % Hub, Socket, Queue, Lock, Id
   95    
   96:- volatile hub/2, websocket/5.    
   97
   98%!  hub_create(+Name, -Hub, +Options) is det.
   99%
  100%   Create a new hub. Hub is a  dict containing the following public
  101%   information:
  102%
  103%     - Hub.name
  104%       The name of the hub (the Name argument)
  105%     - queues.event
  106%       Message queue to which the hub thread(s) can listen.
  107%
  108%   After creating a hub, the application  normally creates a thread
  109%   that listens to Hub.queues.event and  exposes some mechanisms to
  110%   establish websockets and add them to the hub using hub_add/3.
  111%
  112%   @see    http_upgrade_to_websocket/3 establishes a websocket from
  113%           the SWI-Prolog webserver.
  114
  115hub_create(HubName, Hub, _Options) :-
  116    must_be(atom, HubName),
  117    message_queue_create(WaitQueue),
  118    message_queue_create(ReadyQueue),
  119    message_queue_create(EventQueue),
  120    message_queue_create(BroadcastQueue),
  121    Hub = hub{name:HubName,
  122              queues:_{wait:WaitQueue,
  123                       ready:ReadyQueue,
  124                       event:EventQueue,
  125                       broadcast:BroadcastQueue
  126                      }},
  127    assertz(hub(HubName, Hub)).
  128
  129
  130%!  current_hub(?Name, ?Hub) is nondet.
  131%
  132%   True when there exists a hub Hub with Name.
  133
  134current_hub(HubName, Hub) :-
  135    hub(HubName, Hub).
  136
  137
  138                 /*******************************
  139                 *            WAITERS           *
  140                 *******************************/
  141
  142/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  143The task of this layer is to wait   for  (a potentially large number of)
  144websockets. Whenever there is data on one   of these sockets, the socket
  145is handed to Hub.queues.ready. This is realised using wait_for_input/3,
  146which allows a single thread  to  wait   for  many  sockets.  But ... on
  147Windows it allows to wait for at most  64 sockets. In addition, there is
  148no way to add an additional input   for control messages because Windows
  149select() can only wait for sockets. On Unix   we could use pipe/2 to add
  150the control channal. On Windows  we   would  need  an additional network
  151service, giving rise its own  problems   with  allocation, firewalls and
  152security.
  153
  154So, instead we keep a queue of websockets   that  need to be waited for.
  155Whenever we add a  websocket,  we  create   a  waiter  thread  that will
  156typically start waiting for this socket.   In  addition, we schedule any
  157waiting thread that has less  than  the   maximum  number  of sockets to
  158timeout at as good as we can the same   time.  All of them will hunt for
  159the same set of queues,  but  they  have   to  wait  for  each other and
  160therefore most of the time one thread will walk away with all websockets
  161and the others commit suicide because there is nothing to wait for.
  162- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
  163
  164:- meta_predicate
  165    hub_thread(0, +, +).  166
  167%!  hub_add(+Hub, +WebSocket, ?Id) is det.
  168%
  169%   Add a WebSocket to the hub. Id is used to identify this user. It may
  170%   be provided (as a ground term) or is generated as a UUID.
  171
  172hub_add(HubName, WebSocket, Id) :-
  173    must_be(atom, HubName),
  174    hub(HubName, Hub),
  175    (   var(Id)
  176    ->  uuid(Id)
  177    ;   true
  178    ),
  179    message_queue_create(OutputQueue),
  180    mutex_create(Lock),
  181                                         % asserta/1 allows for reuse of Id
  182    asserta(websocket(HubName, WebSocket, OutputQueue, Lock, Id)),
  183    thread_send_message(Hub.queues.wait, WebSocket),
  184    thread_send_message(Hub.queues.event,
  185                        hub{joined:Id}),
  186    debug(hub(gate), 'Joined ~w: ~w', [HubName, Id]),
  187    create_wait_thread(Hub).
  188
  189create_wait_thread(Hub) :-
  190    hub_thread(wait_for_sockets(Hub), Hub, hub_wait_).
  191
  192wait_for_sockets(Hub) :-
  193    wait_for_sockets(Hub, 64).
  194
  195wait_for_sockets(Hub, Max) :-
  196    Queues = Hub.queues,
  197    repeat,
  198      get_messages(Queues.wait, Max, List),
  199      (   List \== []
  200      ->  create_new_waiter_if_needed(Hub),
  201          sort(List, Set),
  202          length(Set, Len),
  203          debug(hub(wait), 'Waiting for ~d queues', [Len]),
  204          wait_for_set(Set, Left, ReadySet, Max),
  205          (   ReadySet \== []
  206          ->  debug(hub(ready), 'Data on ~p', [ReadySet]),
  207              Ready = Queues.ready,
  208              maplist(thread_send_message(Ready), ReadySet),
  209              create_reader_threads(Hub),
  210              ord_subtract(Set, ReadySet, NotReadySet)
  211          ;   NotReadySet = Left             % timeout
  212          ),
  213          (   NotReadySet \== []
  214          ->  debug(hub(wait), 'Re-scheduling: ~p', [NotReadySet]),
  215              Wait = Queues.wait,
  216              maplist(thread_send_message(Wait), NotReadySet),
  217              fail
  218          ;   true
  219          )
  220      ;   !
  221      ).
  222
  223create_new_waiter_if_needed(Hub) :-
  224    message_queue_property(Hub.queues.wait, size(0)),
  225    !.
  226create_new_waiter_if_needed(Hub) :-
  227    create_wait_thread(Hub).
  228
  229%!  wait_for_set(+Set0, -Left, -Ready, +Max) is det.
  230%
  231%   Wait for input from Set0.  Note that Set0 may contain closed
  232%   websockets.
  233
  234wait_for_set([], [], [], _) :-
  235    !.
  236wait_for_set(Set0, Set, ReadySet, Max) :-
  237    wait_timeout(Set0, Max, Timeout),
  238    catch(wait_for_input(Set0, ReadySet, Timeout),
  239          error(existence_error(stream, S), _), true),
  240    (   var(S)
  241    ->  Set = Set0
  242    ;   delete(Set0, S, Set1),
  243        wait_for_set(Set1, Set, ReadySet, Max)
  244    ).
  245
  246
  247%!  wait_timeout(+WaitForList, +Max, -TimeOut) is det.
  248%
  249%   Determine the timeout, such that   multiple  threads waiting for
  250%   less than the maximum number of  sockets   time  out at the same
  251%   moment and we can combine them on a single thread.
  252
  253:- dynamic
  254    scheduled_timeout/1.  255
  256wait_timeout(List, Max, Timeout) :-
  257    length(List, Max),
  258    !,
  259    Timeout = infinite.
  260wait_timeout(_, _, Timeout) :-
  261    get_time(Now),
  262    (   scheduled_timeout(SchedAt)
  263    ->  (   SchedAt > Now
  264        ->  At = SchedAt
  265        ;   retractall(scheduled_timeout(_)),
  266            At is ceiling(Now) + 1,
  267            asserta(scheduled_timeout(At))
  268        )
  269    ;   At is ceiling(Now) + 1,
  270        asserta(scheduled_timeout(At))
  271    ),
  272    Timeout is At - Now.
  273
  274
  275%!  get_messages(+Queue, +Max, -List) is det.
  276%
  277%   Get the next Max messages from  Queue   or  as many as there are
  278%   available without blocking very long.   This routine is designed
  279%   such that if multiple threads are running for messages, one gets
  280%   all of them and the others nothing.
  281
  282get_messages(Q, N, List) :-
  283    with_mutex(hub_wait,
  284               get_messages_sync(Q, N, List)).
  285
  286get_messages_sync(Q, N, [H|T]) :-
  287    succ(N2, N),
  288    thread_get_message(Q, H, [timeout(0.01)]),
  289    !,
  290    get_messages_sync(Q, N2, T).
  291get_messages_sync(_, _, []).
  292
  293
  294                 /*******************************
  295                 *            READERS           *
  296                 *******************************/
  297
  298/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  299The next layer consists of `readers'.   Whenever  one or more websockets
  300have   data,   the   socket   is    added   to   Hub.queues.ready   and
  301create_reader_threads/1 is called. This  examines   the  number of ready
  302sockets and fires a number  of  threads   to  handle  the read requests.
  303Multiple threads are mainly needed for the case that a client signals to
  304be  ready,  but  only  provides  an   incomplete  message,  causing  the
  305ws_receive/2 to block.
  306
  307Each  of  the  threads  reads  the  next   message  and  sends  this  to
  308Hub.queues.event. The websocket is then rescheduled   to listen for new
  309events. This read either fires a thread   to  listen for the new waiting
  310socket using create_wait_thread/1 or, if there   are no more websockets,
  311does this job itself. This  deals  with   the  common  scenario that one
  312client wakes up, starts a thread to  read   its  event and waits for new
  313messages on the same websockets.
  314- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
  315
  316create_reader_threads(Hub) :-
  317    message_queue_property(Hub.queues.ready, size(Ready)),
  318    Threads is ceiling(sqrt(Ready)),
  319    forall(between(1, Threads, _),
  320           create_reader_thread(Hub)).
  321
  322create_reader_thread(Hub) :-
  323    hub_thread(read_message(Hub), Hub, hub_read_ws_).
  324
  325read_message(Hub) :-
  326    Queues = Hub.queues,
  327    thread_get_message(Queues.ready, WS, [timeout(0)]),
  328    !,
  329    catch(ws_receive(WS, Message), Error, true),
  330    (   var(Error),
  331        websocket(HubName, WS, _, _, Id)
  332    ->  (   _{opcode:close, data:end_of_file} :< Message
  333        ->  eof(WS)
  334        ;   Event = Message.put(_{client:Id, hub:HubName}),
  335            debug(hub(event), 'Event: ~p', [Event]),
  336            thread_send_message(Queues.event, Event),
  337            thread_send_message(Queues.wait, WS),
  338            (   message_queue_property(Queues.ready, size(0))
  339            ->  !,
  340                wait_for_sockets(Hub)
  341            ;   create_wait_thread(Hub),
  342                read_message(Hub)
  343            )
  344        )
  345    ;   websocket(_, WS, _, _, _)
  346    ->  io_read_error(WS, Error),
  347        read_message(Hub)
  348    ;   read_message(Hub)                   % already destroyed
  349    ).
  350read_message(_).
  351
  352
  353%!  io_read_error(+WebSocket, +Error)
  354%
  355%   Called on a read error from WebSocket.   We  close the websocket and
  356%   send the hub an event that we   lost the connection to the specified
  357%   client. Note that we leave  destruction   of  the  anonymous message
  358%   queue and mutex to the Prolog garbage collector.
  359
  360io_read_error(WebSocket, Error) :-
  361    debug(hub(gate), 'Got read error on ~w: ~p',
  362          [WebSocket, Error]),
  363    retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)),
  364    !,
  365    catch(ws_close(WebSocket, 1011, Error), E,
  366          print_message(warning, E)),
  367    hub(HubName, Hub),
  368    thread_send_message(Hub.queues.event,
  369                        hub{left:Id,
  370                                 hub:HubName,
  371                                 reason:read,
  372                                 error:Error}).
  373io_read_error(_, _).                      % already considered gone
  374
  375eof(WebSocket) :-
  376    io_read_error(WebSocket, end_of_file).
  377
  378%!  io_write_error(+WebSocket, +Message, +Error)
  379%
  380%   Failed to write Message to WebSocket due   to  Error. Note that this
  381%   may be a pending but closed WebSocket.  We first check whether there
  382%   is a new one and if not  send   a  `left` message and pass the error
  383%   such that the client can re-send it when appropriate.
  384
  385io_write_error(WebSocket, Message, Error) :-
  386    debug(hub(gate), 'Got write error on ~w: ~p',
  387          [WebSocket, Error]),
  388    retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)),
  389    !,
  390    catch(ws_close(WebSocket, 1011, Error), E,
  391          print_message(warning, E)),
  392    (   websocket(_, _, _, _, Id)
  393    ->  true
  394    ;   hub(HubName, Hub),
  395        thread_send_message(Hub.queues.event,
  396                            hub{left:Id,
  397                                hub:HubName,
  398                                reason:write(Message),
  399                                error:Error})
  400    ).
  401io_write_error(_, _, _).                      % already considered gone
  402
  403
  404                 /*******************************
  405                 *        SENDING MESSAGES      *
  406                 *******************************/
  407
  408/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  409My  initial  thought  about  sending  messages    was  to  add  a  tuple
  410WebSocket-Message to an output  queue  and   have  a  dynamic  number of
  411threads sending these messages to the   websockets. But, it is desirable
  412that, if multiple messages are sent to  a particular client, they arrive
  413in this order. As multiple threads are performing this task, this is not
  414easy to guarantee. Therefore, we create an  output queue and a mutex for
  415each client. An output thread will   walk  along the websockets, looking
  416for one that has pending messages.  It   then  grabs the lock associated
  417with the client and sends all waiting output messages.
  418
  419The price is that we might peek   a significant number of message queues
  420before we find one that  contains  messages.   If  this  proves  to be a
  421significant problem, we  could  maintain  a   queue  of  queues  holding
  422messages.
  423- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
  424
  425%!  hub_send(+ClientId, +Message) is semidet.
  426%
  427%   Send message to the indicated ClientId.   Fails silently if ClientId
  428%   does not exist.
  429%
  430%   @arg    Message is either a single message (as accepted by
  431%           ws_send/2) or a list of such messages.
  432
  433hub_send(ClientId, Message) :-
  434    websocket(HubName, _WS, ClientQueue, _Lock, ClientId),
  435    hub(HubName, Hub),
  436    (   is_list(Message)
  437    ->  maplist(queue_output(ClientQueue), Message)
  438    ;   queue_output(ClientQueue, Message)
  439    ),
  440    create_output_thread(Hub, ClientQueue).
  441
  442create_output_thread(Hub, Queue) :-
  443    hub_thread(broadcast_from_queue(Queue, [timeout(0)]),
  444               Hub, hub_out_q_).
  445
  446%!  hub_broadcast(+Hub, +Message) is det.
  447%!  hub_broadcast(+Hub, +Message, :Condition) is det.
  448%
  449%   Send Message to all websockets  associated   with  Hub for which
  450%   call(Condition,  Id)  succeeds.  Note  that    this  process  is
  451%   _asynchronous_: this predicate returns immediately after putting
  452%   all requests in a  broadcast  queue.   If  a  message  cannot be
  453%   delivered due to a network error,   the  hub is informed through
  454%   io_error/3.
  455
  456hub_broadcast(HubName, Message) :-
  457    hub_broadcast(HubName, Message, all).
  458
  459all(_).
  460
  461hub_broadcast(HubName, Message, Condition) :-
  462    must_be(atom, HubName),
  463    hub(HubName, Hub),
  464    State = count(0),
  465    forall(( websocket(HubName, _WS, ClientQueue, _Lock, Id),
  466             call(Condition, Id)
  467           ),
  468           ( queue_output(ClientQueue, Message),
  469             inc_count(State)
  470           )),
  471    State = count(Count),
  472    create_broadcast_threads(Hub, Count).
  473
  474queue_output(Queue, Message) :-
  475    thread_send_message(Queue, Message).
  476
  477inc_count(State) :-
  478    arg(1, State, C0),
  479    C1 is C0+1,
  480    nb_setarg(1, State, C1).
  481
  482create_broadcast_threads(Hub, Count) :-
  483    Threads is ceiling(sqrt(Count)),
  484    forall(between(1, Threads, _),
  485           create_broadcast_thread(Hub)).
  486
  487create_broadcast_thread(Hub) :-
  488    hub_thread(broadcast_from_queues(Hub, [timeout(0)]),
  489                    Hub, hub_out_all_).
  490
  491
  492%!  broadcast_from_queues(+Hub, +Options) is det.
  493%
  494%   Broadcast from over all known queues.
  495
  496broadcast_from_queues(Hub, Options) :-
  497    forall(websocket(Hub.name, _WebSocket, Queue, _Lock, _Id),
  498           broadcast_from_queue(Queue, Options)).
  499
  500
  501%!  broadcast_from_queue(+Queue, +Options) is det.
  502%
  503%   Send all messages pending for Queue.   Note  that this predicate
  504%   locks the mutex associated  with  the   Queue,  such  that other
  505%   workers cannot start sending messages to this client. Concurrent
  506%   sending  would  lead  to  out-of-order    arrival  of  broadcast
  507%   messages.  If  the  mutex  is  already  held,  someone  else  is
  508%   processing this message queue, so we don't have to worry.
  509
  510broadcast_from_queue(Queue, _Options) :-
  511    message_queue_property(Queue, size(0)),
  512    !.
  513broadcast_from_queue(Queue, Options) :-
  514    websocket(_Hub, _WebSocket, Queue, Lock, _Id),
  515    !,
  516    (   setup_call_cleanup(
  517            mutex_trylock(Lock),
  518            broadcast_from_queue_sync(Queue, Options),
  519            mutex_unlock(Lock))
  520    ->  true
  521    ;   true
  522    ).
  523broadcast_from_queue(_, _).
  524
  525% Note that we re-fetch websocket/5, such that we terminate if something
  526% closed the websocket.
  527
  528broadcast_from_queue_sync(Queue, Options) :-
  529    repeat,
  530      (   websocket(_Hub, WebSocket, Queue, _Lock, _Id),
  531          thread_get_message(Queue, Message, Options)
  532      ->  debug(hub(broadcast),
  533                'To: ~p messages: ~p', [WebSocket, Message]),
  534          catch(ws_send(WebSocket, Message), E,
  535                io_write_error(WebSocket, Message, E)),
  536          fail
  537      ;   !
  538      ).
  539
  540%!  hub_thread(:Goal, +Hub, +Task) is det.
  541%
  542%   Create a (temporary) thread for the hub to perform Task. We
  543%   created named threads if debugging hub(thread) is enabled.
  544
  545hub_thread(Goal, _, Task) :-
  546    debugging(hub(thread)),
  547    !,
  548    gensym(Task, Alias),
  549    thread_create(Goal, _, [detached(true), alias(Alias)]).
  550hub_thread(Goal, _, _) :-
  551    thread_create(Goal, _, [detached(true)])