hub.pl -- Manage a hub for websockets
This library manages a hub that consists of clients that are connected using a websocket. Messages arriving at any of the websockets are sent to the event queue of the hub. In addition, the hub provides a broadcast interface. A typical usage scenario for a hub is a chat server A scenario for realizing an chat server is:
- Create a new hub using hub_create/3.
- Create one or more threads that listen to Hub.queues.event from
the created hub. These threads can update the shared view of the
world. A message is a dict as returned by ws_receive/2 or a
hub control message. Currently, the following control messages
are defined:
- hub{error:Error, left:ClientId, reason:Reason}
- A client left us because of an I/O error. Reason is
read
orwrite
and Error is the Prolog I/O exception. - hub{joined:ClientId}
- A new client has joined the chatroom.
The
thread(s)
can talk to clients using two predicates:- hub_send/2 sends a message to a specific client
- hub_broadcast/2 sends a message to all clients of the hub.
A hub consists of (currenty) four message queues and a simple dynamic fact. Threads that are needed for the communication tasks are created on demand and die if no more work needs to be done.
- hub_create(+Name, -Hub, +Options) is det
- Create a new hub. Hub is a dict containing the following public
information:
- Hub.name
- The name of the hub (the Name argument)
- queues.event
- Message queue to which the hub
thread(s)
can listen.
After creating a hub, the application normally creates a thread that listens to Hub.queues.event and exposes some mechanisms to establish websockets and add them to the hub using hub_add/3.
- current_hub(?Name, ?Hub) is nondet
- True when there exists a hub Hub with Name.
- hub_add(+Hub, +WebSocket, ?Id) is det
- Add a WebSocket to the hub. Id is used to identify this user. It may be provided (as a ground term) or is generated as a UUID.
- wait_for_set(+Set0, -Left, -Ready, +Max) is det[private]
- Wait for input from Set0. Note that Set0 may contain closed websockets.
- wait_timeout(+WaitForList, +Max, -TimeOut) is det[private]
- Determine the timeout, such that multiple threads waiting for less than the maximum number of sockets time out at the same moment and we can combine them on a single thread.
- get_messages(+Queue, +Max, -List) is det[private]
- Get the next Max messages from Queue or as many as there are available without blocking very long. This routine is designed such that if multiple threads are running for messages, one gets all of them and the others nothing.
- io_read_error(+WebSocket, +Error)[private]
- Called on a read error from WebSocket. We close the websocket and send the hub an event that we lost the connection to the specified client. Note that we leave destruction of the anonymous message queue and mutex to the Prolog garbage collector.
- io_write_error(+WebSocket, +Message, +Error)[private]
- Failed to write Message to WebSocket due to Error. Note that this
may be a pending but closed WebSocket. We first check whether there
is a new one and if not send a
left
message and pass the error such that the client can re-send it when appropriate. - hub_send(+ClientId, +Message) is semidet
- Send message to the indicated ClientId. Fails silently if ClientId does not exist.
- hub_broadcast(+Hub, +Message) is det
- hub_broadcast(+Hub, +Message, :Condition) is det
- Send Message to all websockets associated with Hub for which
call(Condition, Id)
succeeds. Note that this process is asynchronous: this predicate returns immediately after putting all requests in a broadcast queue. If a message cannot be delivered due to a network error, the hub is informed through io_error/3. - broadcast_from_queues(+Hub, +Options) is det[private]
- Broadcast from over all known queues.
- broadcast_from_queue(+Queue, +Options) is det[private]
- Send all messages pending for Queue. Note that this predicate locks the mutex associated with the Queue, such that other workers cannot start sending messages to this client. Concurrent sending would lead to out-of-order arrival of broadcast messages. If the mutex is already held, someone else is processing this message queue, so we don't have to worry.
- hub_thread(:Goal, +Hub, +Task) is det[private]
- Create a (temporary) thread for the hub to perform Task. We
created named threads if debugging
hub(thread)
is enabled. - hub_broadcast(+Hub, +Message) is det
- hub_broadcast(+Hub, +Message, :Condition) is det
- Send Message to all websockets associated with Hub for which
call(Condition, Id)
succeeds. Note that this process is asynchronous: this predicate returns immediately after putting all requests in a broadcast queue. If a message cannot be delivered due to a network error, the hub is informed through io_error/3.