View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2014-2016, VU University Amsterdam
    7                              CWI Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(hub,
   37          [ hub_create/3,               % +HubName, -Hub, +Options
   38            hub_add/3,                  % +HubName, +Websocket, ?Id
   39            hub_send/2,                 % +ClientId, +Message
   40            hub_broadcast/2,            % +HubName, +Message
   41            hub_broadcast/3,            % +HubName, +Message, +Condition
   42            current_hub/2               % ?HubName, ?Hub
   43          ]).   44:- use_module(library(debug)).   45:- use_module(library(error)).   46:- use_module(library(apply)).   47:- use_module(library(gensym)).   48:- use_module(library(uuid)).   49:- use_module(library(ordsets)).   50:- use_module(library(http/websocket)).   51
   52:- meta_predicate
   53    hub_broadcast(+,+,1).

Manage a hub for websockets

This library manages a hub that consists of clients that are connected using a websocket. Messages arriving at any of the websockets are sent to the event queue of the hub. In addition, the hub provides a broadcast interface. A typical usage scenario for a hub is a chat server A scenario for realizing an chat server is:

  1. Create a new hub using hub_create/3.
  2. Create one or more threads that listen to Hub.queues.event from the created hub. These threads can update the shared view of the world. A message is a dict as returned by ws_receive/2 or a hub control message. Currently, the following control messages are defined:
    hub{error:Error, left:ClientId, reason:Reason}
    A client left us because of an I/O error. Reason is read or write and Error is the Prolog I/O exception.
    hub{joined:ClientId}
    A new client has joined the chatroom.

    The thread(s) can talk to clients using two predicates:

A hub consists of (currenty) four message queues and a simple dynamic fact. Threads that are needed for the communication tasks are created on demand and die if no more work needs to be done.

To be done
-
The current design does not use threads to perform tasks for multiple hubs. This implies that the design scales rather poorly for hosting many hubs with few users. */
   92:- dynamic
   93    hub/2,                          % Hub, Queues ...
   94    websocket/5.                    % Hub, Socket, Queue, Lock, Id
   95    
   96:- volatile hub/2, websocket/5.
 hub_create(+Name, -Hub, +Options) is det
Create a new hub. Hub is a dict containing the following public information:
Hub.name
The name of the hub (the Name argument)
queues.event
Message queue to which the hub thread(s) can listen.

After creating a hub, the application normally creates a thread that listens to Hub.queues.event and exposes some mechanisms to establish websockets and add them to the hub using hub_add/3.

See also
- http_upgrade_to_websocket/3 establishes a websocket from the SWI-Prolog webserver.
  115hub_create(HubName, Hub, _Options) :-
  116    must_be(atom, HubName),
  117    message_queue_create(WaitQueue),
  118    message_queue_create(ReadyQueue),
  119    message_queue_create(EventQueue),
  120    message_queue_create(BroadcastQueue),
  121    Hub = hub{name:HubName,
  122              queues:_{wait:WaitQueue,
  123                       ready:ReadyQueue,
  124                       event:EventQueue,
  125                       broadcast:BroadcastQueue
  126                      }},
  127    assertz(hub(HubName, Hub)).
 current_hub(?Name, ?Hub) is nondet
True when there exists a hub Hub with Name.
  134current_hub(HubName, Hub) :-
  135    hub(HubName, Hub).
  136
  137
  138                 /*******************************
  139                 *            WAITERS           *
  140                 *******************************/
  141
  142/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  143The task of this layer is to wait   for  (a potentially large number of)
  144websockets. Whenever there is data on one   of these sockets, the socket
  145is handed to Hub.queues.ready. This is realised using wait_for_input/3,
  146which allows a single thread  to  wait   for  many  sockets.  But ... on
  147Windows it allows to wait for at most  64 sockets. In addition, there is
  148no way to add an additional input   for control messages because Windows
  149select() can only wait for sockets. On Unix   we could use pipe/2 to add
  150the control channal. On Windows  we   would  need  an additional network
  151service, giving rise its own  problems   with  allocation, firewalls and
  152security.
  153
  154So, instead we keep a queue of websockets   that  need to be waited for.
  155Whenever we add a  websocket,  we  create   a  waiter  thread  that will
  156typically start waiting for this socket.   In  addition, we schedule any
  157waiting thread that has less  than  the   maximum  number  of sockets to
  158timeout at as good as we can the same   time.  All of them will hunt for
  159the same set of queues,  but  they  have   to  wait  for  each other and
  160therefore most of the time one thread will walk away with all websockets
  161and the others commit suicide because there is nothing to wait for.
  162- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
  163
  164:- meta_predicate
  165    hub_thread(0, +, +).
 hub_add(+Hub, +WebSocket, ?Id) is det
Add a WebSocket to the hub. Id is used to identify this user. It may be provided (as a ground term) or is generated as a UUID.
  172hub_add(HubName, WebSocket, Id) :-
  173    must_be(atom, HubName),
  174    hub(HubName, Hub),
  175    (   var(Id)
  176    ->  uuid(Id)
  177    ;   true
  178    ),
  179    message_queue_create(OutputQueue),
  180    mutex_create(Lock),
  181                                         % asserta/1 allows for reuse of Id
  182    asserta(websocket(HubName, WebSocket, OutputQueue, Lock, Id)),
  183    thread_send_message(Hub.queues.wait, WebSocket),
  184    thread_send_message(Hub.queues.event,
  185                        hub{joined:Id}),
  186    debug(hub(gate), 'Joined ~w: ~w', [HubName, Id]),
  187    create_wait_thread(Hub).
  188
  189create_wait_thread(Hub) :-
  190    hub_thread(wait_for_sockets(Hub), Hub, hub_wait_).
  191
  192wait_for_sockets(Hub) :-
  193    wait_for_sockets(Hub, 64).
  194
  195wait_for_sockets(Hub, Max) :-
  196    Queues = Hub.queues,
  197    repeat,
  198      get_messages(Queues.wait, Max, List),
  199      (   List \== []
  200      ->  create_new_waiter_if_needed(Hub),
  201          sort(List, Set),
  202          length(Set, Len),
  203          debug(hub(wait), 'Waiting for ~d queues', [Len]),
  204          wait_for_set(Set, Left, ReadySet, Max),
  205          (   ReadySet \== []
  206          ->  debug(hub(ready), 'Data on ~p', [ReadySet]),
  207              Ready = Queues.ready,
  208              maplist(thread_send_message(Ready), ReadySet),
  209              create_reader_threads(Hub),
  210              ord_subtract(Set, ReadySet, NotReadySet)
  211          ;   NotReadySet = Left             % timeout
  212          ),
  213          (   NotReadySet \== []
  214          ->  debug(hub(wait), 'Re-scheduling: ~p', [NotReadySet]),
  215              Wait = Queues.wait,
  216              maplist(thread_send_message(Wait), NotReadySet),
  217              fail
  218          ;   true
  219          )
  220      ;   !
  221      ).
  222
  223create_new_waiter_if_needed(Hub) :-
  224    message_queue_property(Hub.queues.wait, size(0)),
  225    !.
  226create_new_waiter_if_needed(Hub) :-
  227    create_wait_thread(Hub).
 wait_for_set(+Set0, -Left, -Ready, +Max) is det
Wait for input from Set0. Note that Set0 may contain closed websockets.
  234wait_for_set([], [], [], _) :-
  235    !.
  236wait_for_set(Set0, Set, ReadySet, Max) :-
  237    wait_timeout(Set0, Max, Timeout),
  238    catch(wait_for_input(Set0, ReadySet, Timeout),
  239          error(existence_error(stream, S), _), true),
  240    (   var(S)
  241    ->  Set = Set0
  242    ;   delete(Set0, S, Set1),
  243        wait_for_set(Set1, Set, ReadySet, Max)
  244    ).
 wait_timeout(+WaitForList, +Max, -TimeOut) is det
Determine the timeout, such that multiple threads waiting for less than the maximum number of sockets time out at the same moment and we can combine them on a single thread.
  253:- dynamic
  254    scheduled_timeout/1.  255
  256wait_timeout(List, Max, Timeout) :-
  257    length(List, Max),
  258    !,
  259    Timeout = infinite.
  260wait_timeout(_, _, Timeout) :-
  261    get_time(Now),
  262    (   scheduled_timeout(SchedAt)
  263    ->  (   SchedAt > Now
  264        ->  At = SchedAt
  265        ;   retractall(scheduled_timeout(_)),
  266            At is ceiling(Now) + 1,
  267            asserta(scheduled_timeout(At))
  268        )
  269    ;   At is ceiling(Now) + 1,
  270        asserta(scheduled_timeout(At))
  271    ),
  272    Timeout is At - Now.
 get_messages(+Queue, +Max, -List) is det
Get the next Max messages from Queue or as many as there are available without blocking very long. This routine is designed such that if multiple threads are running for messages, one gets all of them and the others nothing.
  282get_messages(Q, N, List) :-
  283    with_mutex(hub_wait,
  284               get_messages_sync(Q, N, List)).
  285
  286get_messages_sync(Q, N, [H|T]) :-
  287    succ(N2, N),
  288    thread_get_message(Q, H, [timeout(0.01)]),
  289    !,
  290    get_messages_sync(Q, N2, T).
  291get_messages_sync(_, _, []).
  292
  293
  294                 /*******************************
  295                 *            READERS           *
  296                 *******************************/
  297
  298/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  299The next layer consists of `readers'.   Whenever  one or more websockets
  300have   data,   the   socket   is    added   to   Hub.queues.ready   and
  301create_reader_threads/1 is called. This  examines   the  number of ready
  302sockets and fires a number  of  threads   to  handle  the read requests.
  303Multiple threads are mainly needed for the case that a client signals to
  304be  ready,  but  only  provides  an   incomplete  message,  causing  the
  305ws_receive/2 to block.
  306
  307Each  of  the  threads  reads  the  next   message  and  sends  this  to
  308Hub.queues.event. The websocket is then rescheduled   to listen for new
  309events. This read either fires a thread   to  listen for the new waiting
  310socket using create_wait_thread/1 or, if there   are no more websockets,
  311does this job itself. This  deals  with   the  common  scenario that one
  312client wakes up, starts a thread to  read   its  event and waits for new
  313messages on the same websockets.
  314- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
  315
  316create_reader_threads(Hub) :-
  317    message_queue_property(Hub.queues.ready, size(Ready)),
  318    Threads is ceiling(sqrt(Ready)),
  319    forall(between(1, Threads, _),
  320           create_reader_thread(Hub)).
  321
  322create_reader_thread(Hub) :-
  323    hub_thread(read_message(Hub), Hub, hub_read_ws_).
  324
  325read_message(Hub) :-
  326    Queues = Hub.queues,
  327    thread_get_message(Queues.ready, WS, [timeout(0)]),
  328    !,
  329    catch(ws_receive(WS, Message), Error, true),
  330    (   var(Error),
  331        websocket(HubName, WS, _, _, Id)
  332    ->  (   _{opcode:close, data:end_of_file} :< Message
  333        ->  eof(WS)
  334        ;   Event = Message.put(_{client:Id, hub:HubName}),
  335            debug(hub(event), 'Event: ~p', [Event]),
  336            thread_send_message(Queues.event, Event),
  337            thread_send_message(Queues.wait, WS),
  338            (   message_queue_property(Queues.ready, size(0))
  339            ->  !,
  340                wait_for_sockets(Hub)
  341            ;   create_wait_thread(Hub),
  342                read_message(Hub)
  343            )
  344        )
  345    ;   websocket(_, WS, _, _, _)
  346    ->  io_read_error(WS, Error),
  347        read_message(Hub)
  348    ;   read_message(Hub)                   % already destroyed
  349    ).
  350read_message(_).
 io_read_error(+WebSocket, +Error)
Called on a read error from WebSocket. We close the websocket and send the hub an event that we lost the connection to the specified client. Note that we leave destruction of the anonymous message queue and mutex to the Prolog garbage collector.
  360io_read_error(WebSocket, Error) :-
  361    debug(hub(gate), 'Got read error on ~w: ~p',
  362          [WebSocket, Error]),
  363    retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)),
  364    !,
  365    catch(ws_close(WebSocket, 1011, Error), E,
  366          print_message(warning, E)),
  367    hub(HubName, Hub),
  368    thread_send_message(Hub.queues.event,
  369                        hub{left:Id,
  370                                 hub:HubName,
  371                                 reason:read,
  372                                 error:Error}).
  373io_read_error(_, _).                      % already considered gone
  374
  375eof(WebSocket) :-
  376    io_read_error(WebSocket, end_of_file).
 io_write_error(+WebSocket, +Message, +Error)
Failed to write Message to WebSocket due to Error. Note that this may be a pending but closed WebSocket. We first check whether there is a new one and if not send a left message and pass the error such that the client can re-send it when appropriate.
  385io_write_error(WebSocket, Message, Error) :-
  386    debug(hub(gate), 'Got write error on ~w: ~p',
  387          [WebSocket, Error]),
  388    retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)),
  389    !,
  390    catch(ws_close(WebSocket, 1011, Error), E,
  391          print_message(warning, E)),
  392    (   websocket(_, _, _, _, Id)
  393    ->  true
  394    ;   hub(HubName, Hub),
  395        thread_send_message(Hub.queues.event,
  396                            hub{left:Id,
  397                                hub:HubName,
  398                                reason:write(Message),
  399                                error:Error})
  400    ).
  401io_write_error(_, _, _).                      % already considered gone
  402
  403
  404                 /*******************************
  405                 *        SENDING MESSAGES      *
  406                 *******************************/
  407
  408/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  409My  initial  thought  about  sending  messages    was  to  add  a  tuple
  410WebSocket-Message to an output  queue  and   have  a  dynamic  number of
  411threads sending these messages to the   websockets. But, it is desirable
  412that, if multiple messages are sent to  a particular client, they arrive
  413in this order. As multiple threads are performing this task, this is not
  414easy to guarantee. Therefore, we create an  output queue and a mutex for
  415each client. An output thread will   walk  along the websockets, looking
  416for one that has pending messages.  It   then  grabs the lock associated
  417with the client and sends all waiting output messages.
  418
  419The price is that we might peek   a significant number of message queues
  420before we find one that  contains  messages.   If  this  proves  to be a
  421significant problem, we  could  maintain  a   queue  of  queues  holding
  422messages.
  423- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
 hub_send(+ClientId, +Message) is semidet
Send message to the indicated ClientId. Fails silently if ClientId does not exist.
Arguments:
Message- is either a single message (as accepted by ws_send/2) or a list of such messages.
  433hub_send(ClientId, Message) :-
  434    websocket(HubName, _WS, ClientQueue, _Lock, ClientId),
  435    hub(HubName, Hub),
  436    (   is_list(Message)
  437    ->  maplist(queue_output(ClientQueue), Message)
  438    ;   queue_output(ClientQueue, Message)
  439    ),
  440    create_output_thread(Hub, ClientQueue).
  441
  442create_output_thread(Hub, Queue) :-
  443    hub_thread(broadcast_from_queue(Queue, [timeout(0)]),
  444               Hub, hub_out_q_).
 hub_broadcast(+Hub, +Message) is det
 hub_broadcast(+Hub, +Message, :Condition) is det
Send Message to all websockets associated with Hub for which call(Condition, Id) succeeds. Note that this process is asynchronous: this predicate returns immediately after putting all requests in a broadcast queue. If a message cannot be delivered due to a network error, the hub is informed through io_error/3.
  456hub_broadcast(HubName, Message) :-
  457    hub_broadcast(HubName, Message, all).
  458
  459all(_).
  460
  461hub_broadcast(HubName, Message, Condition) :-
  462    must_be(atom, HubName),
  463    hub(HubName, Hub),
  464    State = count(0),
  465    forall(( websocket(HubName, _WS, ClientQueue, _Lock, Id),
  466             call(Condition, Id)
  467           ),
  468           ( queue_output(ClientQueue, Message),
  469             inc_count(State)
  470           )),
  471    State = count(Count),
  472    create_broadcast_threads(Hub, Count).
  473
  474queue_output(Queue, Message) :-
  475    thread_send_message(Queue, Message).
  476
  477inc_count(State) :-
  478    arg(1, State, C0),
  479    C1 is C0+1,
  480    nb_setarg(1, State, C1).
  481
  482create_broadcast_threads(Hub, Count) :-
  483    Threads is ceiling(sqrt(Count)),
  484    forall(between(1, Threads, _),
  485           create_broadcast_thread(Hub)).
  486
  487create_broadcast_thread(Hub) :-
  488    hub_thread(broadcast_from_queues(Hub, [timeout(0)]),
  489                    Hub, hub_out_all_).
 broadcast_from_queues(+Hub, +Options) is det
Broadcast from over all known queues.
  496broadcast_from_queues(Hub, Options) :-
  497    forall(websocket(Hub.name, _WebSocket, Queue, _Lock, _Id),
  498           broadcast_from_queue(Queue, Options)).
 broadcast_from_queue(+Queue, +Options) is det
Send all messages pending for Queue. Note that this predicate locks the mutex associated with the Queue, such that other workers cannot start sending messages to this client. Concurrent sending would lead to out-of-order arrival of broadcast messages. If the mutex is already held, someone else is processing this message queue, so we don't have to worry.
  510broadcast_from_queue(Queue, _Options) :-
  511    message_queue_property(Queue, size(0)),
  512    !.
  513broadcast_from_queue(Queue, Options) :-
  514    websocket(_Hub, _WebSocket, Queue, Lock, _Id),
  515    !,
  516    (   setup_call_cleanup(
  517            mutex_trylock(Lock),
  518            broadcast_from_queue_sync(Queue, Options),
  519            mutex_unlock(Lock))
  520    ->  true
  521    ;   true
  522    ).
  523broadcast_from_queue(_, _).
  524
  525% Note that we re-fetch websocket/5, such that we terminate if something
  526% closed the websocket.
  527
  528broadcast_from_queue_sync(Queue, Options) :-
  529    repeat,
  530      (   websocket(_Hub, WebSocket, Queue, _Lock, _Id),
  531          thread_get_message(Queue, Message, Options)
  532      ->  debug(hub(broadcast),
  533                'To: ~p messages: ~p', [WebSocket, Message]),
  534          catch(ws_send(WebSocket, Message), E,
  535                io_write_error(WebSocket, Message, E)),
  536          fail
  537      ;   !
  538      ).
 hub_thread(:Goal, +Hub, +Task) is det
Create a (temporary) thread for the hub to perform Task. We created named threads if debugging hub(thread) is enabled.
  545hub_thread(Goal, _, Task) :-
  546    debugging(hub(thread)),
  547    !,
  548    gensym(Task, Alias),
  549    thread_create(Goal, _, [detached(true), alias(Alias)]).
  550hub_thread(Goal, _, _) :-
  551    thread_create(Goal, _, [detached(true)])