All predicatesShow sourcehub.pl -- Manage a hub for websockets

This library manages a hub that consists of clients that are connected using a websocket. Messages arriving at any of the websockets are sent to the event queue of the hub. In addition, the hub provides a broadcast interface. A typical usage scenario for a hub is a chat server A scenario for realizing an chat server is:

  1. Create a new hub using hub_create/3.
  2. Create one or more threads that listen to Hub.queues.event from the created hub. These threads can update the shared view of the world. A message is a dict as returned by ws_receive/2 or a hub control message. Currently, the following control messages are defined:
    hub{error:Error, left:ClientId, reason:Reason}
    A client left us because of an I/O error. Reason is read or write and Error is the Prolog I/O exception.
    hub{joined:ClientId}
    A new client has joined the chatroom.

    The thread(s) can talk to clients using two predicates:

A hub consists of (currenty) four message queues and a simple dynamic fact. Threads that are needed for the communication tasks are created on demand and die if no more work needs to be done.

To be done
- The current design does not use threads to perform tasks for multiple hubs. This implies that the design scales rather poorly for hosting many hubs with few users.
Source hub_create(+Name, -Hub, +Options) is det
Create a new hub. Hub is a dict containing the following public information:
Hub.name
The name of the hub (the Name argument)
queues.event
Message queue to which the hub thread(s) can listen.

After creating a hub, the application normally creates a thread that listens to Hub.queues.event and exposes some mechanisms to establish websockets and add them to the hub using hub_add/3.

See also
- http_upgrade_to_websocket/3 establishes a websocket from the SWI-Prolog webserver.
Source current_hub(?Name, ?Hub) is nondet
True when there exists a hub Hub with Name.
Source hub_add(+Hub, +WebSocket, ?Id) is det
Add a WebSocket to the hub. Id is used to identify this user. It may be provided (as a ground term) or is generated as a UUID.
Source wait_for_set(+Set0, -Left, -Ready, +Max) is det[private]
Wait for input from Set0. Note that Set0 may contain closed websockets.
Source wait_timeout(+WaitForList, +Max, -TimeOut) is det[private]
Determine the timeout, such that multiple threads waiting for less than the maximum number of sockets time out at the same moment and we can combine them on a single thread.
Source get_messages(+Queue, +Max, -List) is det[private]
Get the next Max messages from Queue or as many as there are available without blocking very long. This routine is designed such that if multiple threads are running for messages, one gets all of them and the others nothing.
Source io_read_error(+WebSocket, +Error)[private]
Called on a read error from WebSocket. We close the websocket and send the hub an event that we lost the connection to the specified client. Note that we leave destruction of the anonymous message queue and mutex to the Prolog garbage collector.
Source io_write_error(+WebSocket, +Message, +Error)[private]
Failed to write Message to WebSocket due to Error. Note that this may be a pending but closed WebSocket. We first check whether there is a new one and if not send a left message and pass the error such that the client can re-send it when appropriate.
Source hub_send(+ClientId, +Message) is semidet
Send message to the indicated ClientId. Fails silently if ClientId does not exist.
Arguments:
Message- is either a single message (as accepted by ws_send/2) or a list of such messages.
Source hub_broadcast(+Hub, +Message) is det
Source hub_broadcast(+Hub, +Message, :Condition) is det
Send Message to all websockets associated with Hub for which call(Condition, Id) succeeds. Note that this process is asynchronous: this predicate returns immediately after putting all requests in a broadcast queue. If a message cannot be delivered due to a network error, the hub is informed through io_error/3.
Source broadcast_from_queues(+Hub, +Options) is det[private]
Broadcast from over all known queues.
Source broadcast_from_queue(+Queue, +Options) is det[private]
Send all messages pending for Queue. Note that this predicate locks the mutex associated with the Queue, such that other workers cannot start sending messages to this client. Concurrent sending would lead to out-of-order arrival of broadcast messages. If the mutex is already held, someone else is processing this message queue, so we don't have to worry.
Source hub_thread(:Goal, +Hub, +Task) is det[private]
Create a (temporary) thread for the hub to perform Task. We created named threads if debugging hub(thread) is enabled.
Source hub_broadcast(+Hub, +Message) is det
Source hub_broadcast(+Hub, +Message, :Condition) is det
Send Message to all websockets associated with Hub for which call(Condition, Id) succeeds. Note that this process is asynchronous: this predicate returns immediately after putting all requests in a broadcast queue. If a message cannot be delivered due to a network error, the hub is informed through io_error/3.