1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2014-2016, VU University Amsterdam 7 CWI Amsterdam 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36:- module(hub, 37 [ hub_create/3, % +HubName, -Hub, +Options 38 hub_add/3, % +HubName, +Websocket, ?Id 39 hub_send/2, % +ClientId, +Message 40 hub_broadcast/2, % +HubName, +Message 41 hub_broadcast/3, % +HubName, +Message, +Condition 42 current_hub/2 % ?HubName, ?Hub 43 ]). 44:- use_module(library(debug)). 45:- use_module(library(error)). 46:- use_module(library(apply)). 47:- use_module(library(gensym)). 48:- use_module(library(uuid)). 49:- use_module(library(ordsets)). 50:- use_module(library(http/websocket)). 51 52:- meta_predicate 53 hub_broadcast( , , ). 54 55/** <module> Manage a hub for websockets 56 57This library manages a hub that consists of clients that are connected 58using a websocket. Messages arriving at any of the websockets are sent 59to the _event_ queue of the hub. In addition, the hub provides a 60_broadcast_ interface. A typical usage scenario for a hub is a _chat 61server_ A scenario for realizing an chat server is: 62 63 1. Create a new hub using hub_create/3. 64 2. Create one or more threads that listen to Hub.queues.event from 65 the created hub. These threads can update the shared view of the 66 world. A message is a dict as returned by ws_receive/2 or a 67 hub control message. Currently, the following control messages 68 are defined: 69 70 - hub{left:ClientId, reason:Reason, error:Error} 71 A client left us because of an I/O error. Reason is =read= 72 or =write= and Error is the Prolog I/O exception. 73 74 - hub{joined:ClientId} 75 A new client has joined the chatroom. 76 77 The thread(s) can talk to clients using two predicates: 78 79 - hub_send/2 sends a message to a specific client 80 - hub_broadcast/2 sends a message to all clients of the 81 hub. 82 83A hub consists of (currenty) four message queues and a simple dynamic 84fact. Threads that are needed for the communication tasks are created on 85demand and die if no more work needs to be done. 86 87@tbd The current design does not use threads to perform tasks for 88 multiple hubs. This implies that the design scales rather 89 poorly for hosting many hubs with few users. 90*/ 91 92:- dynamic 93 hub/2, % Hub, Queues ... 94 websocket/5. % Hub, Socket, Queue, Lock, Id 95 96:- volatile hub/2, websocket/5. 97 98%! hub_create(+Name, -Hub, +Options) is det. 99% 100% Create a new hub. Hub is a dict containing the following public 101% information: 102% 103% - Hub.name 104% The name of the hub (the Name argument) 105% - queues.event 106% Message queue to which the hub thread(s) can listen. 107% 108% After creating a hub, the application normally creates a thread 109% that listens to Hub.queues.event and exposes some mechanisms to 110% establish websockets and add them to the hub using hub_add/3. 111% 112% @see http_upgrade_to_websocket/3 establishes a websocket from 113% the SWI-Prolog webserver. 114 115hub_create(HubName, Hub, _Options) :- 116 must_be(atom, HubName), 117 message_queue_create(WaitQueue), 118 message_queue_create(ReadyQueue), 119 message_queue_create(EventQueue), 120 message_queue_create(BroadcastQueue), 121 Hub = hub{name:HubName, 122 queues:_{wait:WaitQueue, 123 ready:ReadyQueue, 124 event:EventQueue, 125 broadcast:BroadcastQueue 126 }}, 127 assertz(hub(HubName, Hub)). 128 129 130%! current_hub(?Name, ?Hub) is nondet. 131% 132% True when there exists a hub Hub with Name. 133 134current_hub(HubName, Hub) :- 135 hub(HubName, Hub). 136 137 138 /******************************* 139 * WAITERS * 140 *******************************/ 141 142/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 143The task of this layer is to wait for (a potentially large number of) 144websockets. Whenever there is data on one of these sockets, the socket 145is handed to Hub.queues.ready. This is realised using wait_for_input/3, 146which allows a single thread to wait for many sockets. But ... on 147Windows it allows to wait for at most 64 sockets. In addition, there is 148no way to add an additional input for control messages because Windows 149select() can only wait for sockets. On Unix we could use pipe/2 to add 150the control channal. On Windows we would need an additional network 151service, giving rise its own problems with allocation, firewalls and 152security. 153 154So, instead we keep a queue of websockets that need to be waited for. 155Whenever we add a websocket, we create a waiter thread that will 156typically start waiting for this socket. In addition, we schedule any 157waiting thread that has less than the maximum number of sockets to 158timeout at as good as we can the same time. All of them will hunt for 159the same set of queues, but they have to wait for each other and 160therefore most of the time one thread will walk away with all websockets 161and the others commit suicide because there is nothing to wait for. 162- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */ 163 164:- meta_predicate 165 hub_thread( , , ). 166 167%! hub_add(+Hub, +WebSocket, ?Id) is det. 168% 169% Add a WebSocket to the hub. Id is used to identify this user. It may 170% be provided (as a ground term) or is generated as a UUID. 171 172hub_add(HubName, WebSocket, Id) :- 173 must_be(atom, HubName), 174 hub(HubName, Hub), 175 ( var(Id) 176 -> uuid(Id) 177 ; true 178 ), 179 message_queue_create(OutputQueue), 180 mutex_create(Lock), 181 % asserta/1 allows for reuse of Id 182 asserta(websocket(HubName, WebSocket, OutputQueue, Lock, Id)), 183 thread_send_message(Hub.queues.wait, WebSocket), 184 thread_send_message(Hub.queues.event, 185 hub{joined:Id}), 186 debug(hub(gate), 'Joined ~w: ~w', [HubName, Id]), 187 create_wait_thread(Hub). 188 189create_wait_thread(Hub) :- 190 hub_thread(wait_for_sockets(Hub), Hub, hub_wait_). 191 192wait_for_sockets(Hub) :- 193 wait_for_sockets(Hub, 64). 194 195wait_for_sockets(Hub, Max) :- 196 Queues = Hub.queues, 197 repeat, 198 get_messages(Queues.wait, Max, List), 199 ( List \== [] 200 -> create_new_waiter_if_needed(Hub), 201 sort(List, Set), 202 length(Set, Len), 203 debug(hub(wait), 'Waiting for ~d queues', [Len]), 204 wait_for_set(Set, Left, ReadySet, Max), 205 ( ReadySet \== [] 206 -> debug(hub(ready), 'Data on ~p', [ReadySet]), 207 Ready = Queues.ready, 208 maplist(thread_send_message(Ready), ReadySet), 209 create_reader_threads(Hub), 210 ord_subtract(Set, ReadySet, NotReadySet) 211 ; NotReadySet = Left % timeout 212 ), 213 ( NotReadySet \== [] 214 -> debug(hub(wait), 'Re-scheduling: ~p', [NotReadySet]), 215 Wait = Queues.wait, 216 maplist(thread_send_message(Wait), NotReadySet), 217 fail 218 ; true 219 ) 220 ; ! 221 ). 222 223create_new_waiter_if_needed(Hub) :- 224 message_queue_property(Hub.queues.wait, size(0)), 225 !. 226create_new_waiter_if_needed(Hub) :- 227 create_wait_thread(Hub). 228 229%! wait_for_set(+Set0, -Left, -Ready, +Max) is det. 230% 231% Wait for input from Set0. Note that Set0 may contain closed 232% websockets. 233 234wait_for_set([], [], [], _) :- 235 !. 236wait_for_set(Set0, Set, ReadySet, Max) :- 237 wait_timeout(Set0, Max, Timeout), 238 catch(wait_for_input(Set0, ReadySet, Timeout), 239 error(existence_error(stream, S), _), true), 240 ( var(S) 241 -> Set = Set0 242 ; delete(Set0, S, Set1), 243 wait_for_set(Set1, Set, ReadySet, Max) 244 ). 245 246 247%! wait_timeout(+WaitForList, +Max, -TimeOut) is det. 248% 249% Determine the timeout, such that multiple threads waiting for 250% less than the maximum number of sockets time out at the same 251% moment and we can combine them on a single thread. 252 253:- dynamic 254 scheduled_timeout/1. 255 256wait_timeout(List, Max, Timeout) :- 257 length(List, Max), 258 !, 259 Timeout = infinite. 260wait_timeout(_, _, Timeout) :- 261 get_time(Now), 262 ( scheduled_timeout(SchedAt) 263 -> ( SchedAt > Now 264 -> At = SchedAt 265 ; retractall(scheduled_timeout(_)), 266 At is ceiling(Now) + 1, 267 asserta(scheduled_timeout(At)) 268 ) 269 ; At is ceiling(Now) + 1, 270 asserta(scheduled_timeout(At)) 271 ), 272 Timeout is At - Now. 273 274 275%! get_messages(+Queue, +Max, -List) is det. 276% 277% Get the next Max messages from Queue or as many as there are 278% available without blocking very long. This routine is designed 279% such that if multiple threads are running for messages, one gets 280% all of them and the others nothing. 281 282get_messages(Q, N, List) :- 283 with_mutex(hub_wait, 284 get_messages_sync(Q, N, List)). 285 286get_messages_sync(Q, N, [H|T]) :- 287 succ(N2, N), 288 thread_get_message(Q, H, [timeout(0.01)]), 289 !, 290 get_messages_sync(Q, N2, T). 291get_messages_sync(_, _, []). 292 293 294 /******************************* 295 * READERS * 296 *******************************/ 297 298/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 299The next layer consists of `readers'. Whenever one or more websockets 300have data, the socket is added to Hub.queues.ready and 301create_reader_threads/1 is called. This examines the number of ready 302sockets and fires a number of threads to handle the read requests. 303Multiple threads are mainly needed for the case that a client signals to 304be ready, but only provides an incomplete message, causing the 305ws_receive/2 to block. 306 307Each of the threads reads the next message and sends this to 308Hub.queues.event. The websocket is then rescheduled to listen for new 309events. This read either fires a thread to listen for the new waiting 310socket using create_wait_thread/1 or, if there are no more websockets, 311does this job itself. This deals with the common scenario that one 312client wakes up, starts a thread to read its event and waits for new 313messages on the same websockets. 314- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */ 315 316create_reader_threads(Hub) :- 317 message_queue_property(Hub.queues.ready, size(Ready)), 318 Threads is ceiling(sqrt(Ready)), 319 forall(between(1, Threads, _), 320 create_reader_thread(Hub)). 321 322create_reader_thread(Hub) :- 323 hub_thread(read_message(Hub), Hub, hub_read_ws_). 324 325read_message(Hub) :- 326 Queues = Hub.queues, 327 thread_get_message(Queues.ready, WS, [timeout(0)]), 328 !, 329 catch(ws_receive(WS, Message), Error, true), 330 ( var(Error), 331 websocket(HubName, WS, _, _, Id) 332 -> ( _{opcode:close, data:end_of_file} :< Message 333 -> eof(WS) 334 ; Event = Message.put(_{client:Id, hub:HubName}), 335 debug(hub(event), 'Event: ~p', [Event]), 336 thread_send_message(Queues.event, Event), 337 thread_send_message(Queues.wait, WS), 338 ( message_queue_property(Queues.ready, size(0)) 339 -> !, 340 wait_for_sockets(Hub) 341 ; create_wait_thread(Hub), 342 read_message(Hub) 343 ) 344 ) 345 ; websocket(_, WS, _, _, _) 346 -> io_read_error(WS, Error), 347 read_message(Hub) 348 ; read_message(Hub) % already destroyed 349 ). 350read_message(_). 351 352 353%! io_read_error(+WebSocket, +Error) 354% 355% Called on a read error from WebSocket. We close the websocket and 356% send the hub an event that we lost the connection to the specified 357% client. Note that we leave destruction of the anonymous message 358% queue and mutex to the Prolog garbage collector. 359 360io_read_error(WebSocket, Error) :- 361 debug(hub(gate), 'Got read error on ~w: ~p', 362 [WebSocket, Error]), 363 retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)), 364 !, 365 catch(ws_close(WebSocket, 1011, Error), E, 366 print_message(warning, E)), 367 hub(HubName, Hub), 368 thread_send_message(Hub.queues.event, 369 hub{left:Id, 370 hub:HubName, 371 reason:read, 372 error:Error}). 373io_read_error(_, _). % already considered gone 374 375eof(WebSocket) :- 376 io_read_error(WebSocket, end_of_file). 377 378%! io_write_error(+WebSocket, +Message, +Error) 379% 380% Failed to write Message to WebSocket due to Error. Note that this 381% may be a pending but closed WebSocket. We first check whether there 382% is a new one and if not send a `left` message and pass the error 383% such that the client can re-send it when appropriate. 384 385io_write_error(WebSocket, Message, Error) :- 386 debug(hub(gate), 'Got write error on ~w: ~p', 387 [WebSocket, Error]), 388 retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)), 389 !, 390 catch(ws_close(WebSocket, 1011, Error), E, 391 print_message(warning, E)), 392 ( websocket(_, _, _, _, Id) 393 -> true 394 ; hub(HubName, Hub), 395 thread_send_message(Hub.queues.event, 396 hub{left:Id, 397 hub:HubName, 398 reason:write(Message), 399 error:Error}) 400 ). 401io_write_error(_, _, _). % already considered gone 402 403 404 /******************************* 405 * SENDING MESSAGES * 406 *******************************/ 407 408/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 409My initial thought about sending messages was to add a tuple 410WebSocket-Message to an output queue and have a dynamic number of 411threads sending these messages to the websockets. But, it is desirable 412that, if multiple messages are sent to a particular client, they arrive 413in this order. As multiple threads are performing this task, this is not 414easy to guarantee. Therefore, we create an output queue and a mutex for 415each client. An output thread will walk along the websockets, looking 416for one that has pending messages. It then grabs the lock associated 417with the client and sends all waiting output messages. 418 419The price is that we might peek a significant number of message queues 420before we find one that contains messages. If this proves to be a 421significant problem, we could maintain a queue of queues holding 422messages. 423- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */ 424 425%! hub_send(+ClientId, +Message) is semidet. 426% 427% Send message to the indicated ClientId. Fails silently if ClientId 428% does not exist. 429% 430% @arg Message is either a single message (as accepted by 431% ws_send/2) or a list of such messages. 432 433hub_send(ClientId, Message) :- 434 websocket(HubName, _WS, ClientQueue, _Lock, ClientId), 435 hub(HubName, Hub), 436 ( is_list(Message) 437 -> maplist(queue_output(ClientQueue), Message) 438 ; queue_output(ClientQueue, Message) 439 ), 440 create_output_thread(Hub, ClientQueue). 441 442create_output_thread(Hub, Queue) :- 443 hub_thread(broadcast_from_queue(Queue, [timeout(0)]), 444 Hub, hub_out_q_). 445 446%! hub_broadcast(+Hub, +Message) is det. 447%! hub_broadcast(+Hub, +Message, :Condition) is det. 448% 449% Send Message to all websockets associated with Hub for which 450% call(Condition, Id) succeeds. Note that this process is 451% _asynchronous_: this predicate returns immediately after putting 452% all requests in a broadcast queue. If a message cannot be 453% delivered due to a network error, the hub is informed through 454% io_error/3. 455 456hub_broadcast(HubName, Message) :- 457 hub_broadcast(HubName, Message, all). 458 459all(_). 460 461hub_broadcast(HubName, Message, Condition) :- 462 must_be(atom, HubName), 463 hub(HubName, Hub), 464 State = count(0), 465 forall(( websocket(HubName, _WS, ClientQueue, _Lock, Id), 466 call(Condition, Id) 467 ), 468 ( queue_output(ClientQueue, Message), 469 inc_count(State) 470 )), 471 State = count(Count), 472 create_broadcast_threads(Hub, Count). 473 474queue_output(Queue, Message) :- 475 thread_send_message(Queue, Message). 476 477inc_count(State) :- 478 arg(1, State, C0), 479 C1 is C0+1, 480 nb_setarg(1, State, C1). 481 482create_broadcast_threads(Hub, Count) :- 483 Threads is ceiling(sqrt(Count)), 484 forall(between(1, Threads, _), 485 create_broadcast_thread(Hub)). 486 487create_broadcast_thread(Hub) :- 488 hub_thread(broadcast_from_queues(Hub, [timeout(0)]), 489 Hub, hub_out_all_). 490 491 492%! broadcast_from_queues(+Hub, +Options) is det. 493% 494% Broadcast from over all known queues. 495 496broadcast_from_queues(Hub, Options) :- 497 forall(websocket(Hub.name, _WebSocket, Queue, _Lock, _Id), 498 broadcast_from_queue(Queue, Options)). 499 500 501%! broadcast_from_queue(+Queue, +Options) is det. 502% 503% Send all messages pending for Queue. Note that this predicate 504% locks the mutex associated with the Queue, such that other 505% workers cannot start sending messages to this client. Concurrent 506% sending would lead to out-of-order arrival of broadcast 507% messages. If the mutex is already held, someone else is 508% processing this message queue, so we don't have to worry. 509 510broadcast_from_queue(Queue, _Options) :- 511 message_queue_property(Queue, size(0)), 512 !. 513broadcast_from_queue(Queue, Options) :- 514 websocket(_Hub, _WebSocket, Queue, Lock, _Id), 515 !, 516 ( setup_call_cleanup( 517 mutex_trylock(Lock), 518 broadcast_from_queue_sync(Queue, Options), 519 mutex_unlock(Lock)) 520 -> true 521 ; true 522 ). 523broadcast_from_queue(_, _). 524 525% Note that we re-fetch websocket/5, such that we terminate if something 526% closed the websocket. 527 528broadcast_from_queue_sync(Queue, Options) :- 529 repeat, 530 ( websocket(_Hub, WebSocket, Queue, _Lock, _Id), 531 thread_get_message(Queue, Message, Options) 532 -> debug(hub(broadcast), 533 'To: ~p messages: ~p', [WebSocket, Message]), 534 catch(ws_send(WebSocket, Message), E, 535 io_write_error(WebSocket, Message, E)), 536 fail 537 ; ! 538 ). 539 540%! hub_thread(:Goal, +Hub, +Task) is det. 541% 542% Create a (temporary) thread for the hub to perform Task. We 543% created named threads if debugging hub(thread) is enabled. 544 545hub_thread(Goal, _, Task) :- 546 debugging(hub(thread)), 547 !, 548 gensym(Task, Alias), 549 thread_create(, _, [detached(true), alias(Alias)]). 550hub_thread(Goal, _, _) :- 551 thread_create(, _, [detached(true)])