1/* Part of SWI-Prolog 2 3 Author: Jeffrey Rosenwald 4 E-mail: jeffrose@acm.org 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2012-2013, Jeffrey Rosenwald 7 All rights reserved. 8 9 Redistribution and use in source and binary forms, with or without 10 modification, are permitted provided that the following conditions 11 are met: 12 13 1. Redistributions of source code must retain the above copyright 14 notice, this list of conditions and the following disclaimer. 15 16 2. Redistributions in binary form must reproduce the above copyright 17 notice, this list of conditions and the following disclaimer in 18 the documentation and/or other materials provided with the 19 distribution. 20 21 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 29 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 31 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 32 POSSIBILITY OF SUCH DAMAGE. 33*/ 34 35:- module(udp_broadcast, 36 [ 37 udp_host_to_address/2, % ? Host, ? Address 38 udp_broadcast_initialize/2, % +IPAddress, +Subnet 39 udp_broadcast_service/2 % ? Domain, ? Address 40 ]). 41 42/** <module> A UDP Broadcast Bridge 43 44SWI-Prolog's broadcast library provides a means that may be used to 45facilitate publish and subscribe communication regimes between anonymous 46members of a community of interest. The members of the community are 47however, necessarily limited to a single instance of Prolog. The UDP 48broadcast library removes that restriction. With this library loaded, 49any member on your local IP subnetwork that also has this library loaded 50may hear and respond to your broadcasts. 51 52This module has only two public predicates. When the module is 53initialized, it starts a two listener threads that listen for broadcasts 54from others, received as UDP datagrams. 55 56Unlike TIPC broadcast, UDP broadcast has only one scope, =udp_subnet=. A 57broadcast/1 or broadcast_request/1 that is not directed to the listener 58above, behaves as usual and is confined to the instance of Prolog that 59originated it. But when so directed, the broadcast will be sent to all 60participating systems, including itself, by way of UDP's multicast 61addressing facility. A UDP broadcast or broadcast request takes the 62typical form: =|broadcast(udp_subnet(+Term, +Timeout))|=. To prevent the 63potential for feedback loops, the scope qualifier is stripped from the 64message before transmission. The timeout is optional. It specifies the 65amount to time to wait for replies to arrive in response to a 66broadcast_request. The default period is 0.250 seconds. The timeout is 67ignored for broadcasts. 68 69An example of three separate processes cooperating on the same Node: 70 71== 72Process A: 73 74 ?- listen(number(X), between(1, 5, X)). 75 true. 76 77 ?- 78 79Process B: 80 81 ?- listen(number(X), between(7, 9, X)). 82 true. 83 84 ?- 85 86Process C: 87 88 ?- findall(X, broadcast_request(udp_subnet(number(X))), Xs). 89 Xs = [1, 2, 3, 4, 5, 7, 8, 9]. 90 91 ?- 92== 93 94It is also possible to carry on a private dialog with a single 95responder. To do this, you supply a compound of the form, Term:PortId, 96to a UDP scoped broadcast/1 or broadcast_request/1, where PortId is the 97ip-address and port-id of the intended listener. If you supply an 98unbound variable, PortId, to broadcast_request, it will be unified with 99the address of the listener that responds to Term. You may send a 100directed broadcast to a specific member by simply providing this address 101in a similarly structured compound to a UDP scoped broadcast/1. The 102message is sent via unicast to that member only by way of the member's 103broadcast listener. It is received by the listener just as any other 104broadcast would be. The listener does not know the difference. 105 106For example, in order to discover who responded with a particular value: 107 108== 109Host B Process 1: 110 111 ?- listen(number(X), between(1, 5, X)). 112 true. 113 114 ?- 115 116Host A Process 1: 117 118 119 ?- listen(number(X), between(7, 9, X)). 120 true. 121 122 ?- 123 124Host A Process 2: 125 126 ?- listen(number(X), between(1, 5, X)). 127 true. 128 129 ?- bagof(X, broadcast_request(udp_subnet(number(X):From,1)), Xs). 130 From = ip(192, 168, 1, 103):34855, 131 Xs = [7, 8, 9] ; 132 From = ip(192, 168, 1, 103):56331, 133 Xs = [1, 2, 3, 4, 5] ; 134 From = ip(192, 168, 1, 104):3217, 135 Xs = [1, 2, 3, 4, 5]. 136 137== 138 139## Caveats {#udp-broadcase-caveats} 140 141While the implementation is mostly transparent, there are some important 142and subtle differences that must be taken into consideration: 143 144 * UDP broadcast requires an initialization step in order to 145 launch the broadcast listener daemon. See udp_broadcast_initialize/2. 146 147 * Prolog's broadcast_request/1 is nondet. It sends the request, 148 then evaluates the replies synchronously, backtracking as needed 149 until a satisfactory reply is received. The remaining potential 150 replies are not evaluated. This is not so when UDP is involved. 151 152 * A UDP broadcast/1 is completely asynchronous. 153 154 * A UDP broadcast_request/1 is partially synchronous. A 155 broadcast_request/1 is sent, then the sender balks for a period of 156 time (default: 250 ms) while the replies are collected. Any reply 157 that is received after this period is silently discarded. A 158 optional second argument is provided so that a sender may specify 159 more (or less) time for replies. 160 161 * Replies are presented to the user as a choice point on arrival, 162 until the broadcast request timer finally expires. This 163 allows traffic to propagate through the system faster and provides 164 the requestor with the opportunity to terminate a broadcast request 165 early if desired, by simply cutting choice points. 166 167 * Please beware that broadcast request transactions remain active 168 and resources consumed until broadcast_request finally fails on 169 backtracking, an uncaught exception occurs, or until choice points 170 are cut. Failure to properly manage this will likely result in 171 chronic exhaustion of UDP sockets. 172 173 * If a listener is connected to a generator that always succeeds 174 (e.g. a random number generator), then the broadcast request will 175 never terminate and trouble is bound to ensue. 176 177 * broadcast_request/1 with =|udp_subnet|= scope is _not_ reentrant. 178 If a listener performs a broadcast_request/1 with UDP scope 179 recursively, then disaster looms certain. This caveat does not apply 180 to a UDP scoped broadcast/1, which can safely be performed from a 181 listener context. 182 183 * UDP broadcast's capacity is not infinite. While it can tolerate 184 substantial bursts of activity, it is designed for short bursts of 185 small messages. Unlike TIPC, UDP is unreliable and has no QOS 186 protections. Congestion is likely to cause trouble in the form of 187 non-Byzantine failure. That is, late, lost (e.g. infinitely late), 188 or duplicate datagrams. Caveat emptor. 189 190 * A UDP broadcast_request/1 term that is grounded is considered to 191 be a broadcast only. No replies are collected unless the there is at 192 least one unbound variable to unify. 193 194 * A UDP broadcast/1 always succeeds, even if there are no 195 listeners. 196 197 * A UDP broadcast_request/1 that receives no replies will fail. 198 199 * Replies may be coming from many different places in the network 200 (or none at all). No ordering of replies is implied. 201 202 * Prolog terms are sent to others after first converting them to 203 atoms using term_to_atom/2. Passing real numbers this way may 204 result in a substantial truncation of precision. 205 206 * The broadcast model is based on anonymity and a presumption of 207 trust--a perfect recipe for compromise. UDP is an Internet protocol. 208 A UDP broadcast listener exposes a public port (20005), which is 209 static and shared by all listeners, and a private port, which is 210 semi-static and unique to the listener instance. Both can be seen 211 from off-cluster nodes and networks. Usage of this module exposes 212 the node and consequently, the cluster to significant security 213 risks. So have a care when designing your application. You must talk 214 only to those who share and contribute to your concerns using a 215 carefully prescribed protocol. 216 217 * UDP broadcast categorically and silently ignores all message 218 traffic originating from or terminating on nodes that are not 219 members of the local subnet. This security measure only keeps honest 220 people honest! 221 222@author Jeffrey Rosenwald (JeffRose@acm.org) 223@license BSD-2 224@see tipc.pl 225*/ 226 227:- use_module(library(socket)). 228:- use_module(library(broadcast)). 229:- use_module(library(time)). 230 231:- require([ thread_self/1 232 , forall/2 233 , term_to_atom/2 234 , thread_send_message/2 235 , catch/3 236 , setup_call_cleanup/3 237 , thread_create/3 238 ]). 239 240% % ~>(:P, :Q) is nondet. 241% % eventually_implies(P, Q) is nondet. 242% asserts temporal Liveness (something good happens, eventually) and 243% Safety (nothing bad ever happens) properties. Analogous to the 244% "leads-to" operator of Owicki and Lamport, 1982. Provides a sort of 245% lazy implication described informally as: 246% 247% * Liveness: For all possible outcomes, P -> Q, eventually. 248% * Safety: For all possible outcomes, (\+P ; Q), is invariant. 249% 250% Described practically: 251% 252% P ~> Q, declares that if P is true, then Q must be true, now or at 253% some point in the future. 254% 255 256:- meta_predicate ~>( , ). 257:- op(950, xfy, ~>). 258 259~>(P, Q) :- 260 setup_call_cleanup(, 261 (true; fail), 262 ( -> true; 263 throw(error(goal_failed(Q), context(~>, _)))) 264 ). 265 266:- meta_predicate safely( ). 267 268safely(Predicate) :- 269 catch(, Err, 270 (Err == '$aborted' -> (!, fail); 271 print_message(error, Err), fail)). 272 273:- meta_predicate make_thread( , ). 274 275% You can't thread_signal a thread that isn't running. 276 277join_thread(Id) :- 278 catch(thread_signal(Id, abort), 279 error(existence_error(thread, Id), _Context), 280 true), 281 282 thread_join(Id, exception('$aborted')). 283 284make_thread(Goal, Options) :- 285 thread_create(safely(), Id, [ detached(false) | Options ]) 286 ~> join_thread(Id). 287 288udp_broadcast_address(IPAddress, Subnet, BroadcastAddress) :- 289 IPAddress = ip(A1, A2, A3, A4), 290 Subnet = ip(S1, S2, S3, S4), 291 BroadcastAddress = ip(B1, B2, B3, B4), 292 293 B1 is A1 \/ (S1 xor 255), 294 B2 is A2 \/ (S2 xor 255), 295 B3 is A3 \/ (S3 xor 255), 296 B4 is A4 \/ (S4 xor 255). 297 298%! udp_broadcast_service(?Domain, ?Address) is nondet. 299% provides the UDP broadcast address for a given Domain. At present, 300% only one domain is supported, =|udp_subnet|=. 301% 302 303% The following are defined at initialization: 304:- dynamic 305 udp_subnet_member/1, % +IpAddress:Port 306 udp_broadcast_service/2. % ?Domain, ?BroadcastAddress:Port 307 308:- volatile 309 udp_subnet_member/1, % +IpAddress:Port 310 udp_broadcast_service/2. % ?Domain, ?BroadcastAddress:Port 311% 312% Here's a UDP bridge to Prolog's broadcast library 313% 314% A sender may extend a broadcast to a subnet of a UDP network by 315% specifying a =|udp_subnet|= scoping qualifier in his/her broadcast. 316% The qualifier has the effect of selecting the appropriate multi-cast 317% address for the transmission. Thus, the sender of the message has 318% control over the scope of his/her traffic on a per-message basis. 319% 320% All in-scope listeners receive the broadcast and simply rebroadcast 321% the message locally. All broadcast replies, if any, are sent directly 322% to the sender via the port-id that was received with the broadcast. 323% 324% Each listener exposes two UDP ports, a shared public port that is 325% bound to a well-known port number and a private port that uniquely 326% indentifies the listener. Broadcasts are received on the public port 327% and replies are sent on the private port. Directed broadcasts 328% (unicasts) are received on the private port and replies are sent on 329% the private port. 330% 331% Interactions with TIPC Broadcast 332% 333% As a security precaution, we do not allow unsupervised transactions 334% directly between UDP and TIPC broadcast. These terms are black-listed 335% and ignored when received via UDP listeners. A UDP enabled service 336% that wishes to use TIPC resources on the cluster must have a sponsor 337% on the TIPC cluster to filter incoming UDP broadcast traffic. This 338% can be as simple as loading and initializing both tipc_broadcast and 339% udp_broadcast within the same TIPC broadcast service. 340% 341% Because the UDP and TIPC broadcast listeners are operating in 342% separate threads of execution, a UDP broadcast sponsor can safely 343% perform a broadcast_request with TIPC scope from within the UDP 344% broadcast listener context. This is one of the few scenarios where a 345% recursive broadcast_request with TIPC scope is safe. 346% 347 348black_list(tipc_node(_)). 349black_list(tipc_node(_,_)). 350black_list(tipc_cluster(_)). 351black_list(tipc_cluster(_,_)). 352black_list(tipc_zone(_)). 353black_list(tipc_zone(_,_)). 354% 355ld_dispatch(_S, '$udp_request'(Term), _From) :- 356 black_list(Term), !, fail. 357 358ld_dispatch(_S, Term, _From) :- 359 black_list(Term), !, fail. 360 361ld_dispatch(S, '$udp_request'(wru(Name)), From) :- 362 !, gethostname(Name), 363 term_to_atom(wru(Name), Atom), 364 udp_send(S, Atom, From, []). 365 366ld_dispatch(S, '$udp_request'(Term), From) :- 367 !, forall(broadcast_request(Term), 368 ( term_to_atom(Term, Atom), 369 udp_send(S, Atom, From, []))). 370 371ld_dispatch(_S, Term, _From) :- 372 safely(broadcast(Term)). 373 374% Thread 1 listens for directed traffic on the private port. 375% 376udp_listener_daemon1(S) :- 377 repeat, 378 safely(dispatch_traffic(S, S)). 379 380% Thread 2 listens for broadcast traffic on the well-known public port 381% (S). All replies are originated from the private port (S1). 382% 383udp_listener_daemon2(Parent) :- 384 udp_socket(S) ~> tcp_close_socket(S), 385 udp_socket(S1) ~> tcp_close_socket(S1), 386 387 tcp_bind(S1, _PrivatePort), % bind him to a private port now 388 389 make_thread(udp_listener_daemon1(S1), 390 [ alias(udp_listener_daemon1)]), 391 392 tcp_setopt(S, reuseaddr), 393 394 udp_broadcast_service(udp_subnet, _Address:Port), 395 tcp_bind(S, Port), % bind to our public port 396 397 listen(udp_broadcast, Head, broadcast_listener(Head)) 398 ~> unlisten(udp_broadcast), 399 400 thread_send_message(Parent, udp_listener_daemon_ready), 401 402 repeat, 403 safely(dispatch_traffic(S, S1)). 404 405dispatch_traffic(S, S1) :- 406 udp_receive(S, Data, From, 407 [as(atom), max_message_size(65535)]), 408 udp_subnet_member(From), % ignore all traffic that is foreign to my subnet 409 term_to_atom(Term, Data), 410 with_mutex(udp_broadcast, ld_dispatch(S1, Term, From)), 411 !, 412 dispatch_traffic(S, S1). 413 414start_udp_listener_daemon :- 415 catch(thread_property(udp_listener_daemon2, status(running)),_, fail), 416 417 !. 418 419start_udp_listener_daemon :- 420 thread_self(Self), 421 thread_create(udp_listener_daemon2(Self), _, 422 [alias(udp_listener_daemon2), detached(true)]), 423 call_with_time_limit(6.0, 424 thread_get_message(udp_listener_daemon_ready)). 425 426:- multifile udp:host_to_address/2. 427% 428broadcast_listener(udp_host_to_address(Host, Addr)) :- 429 udp:host_to_address(Host, Addr). 430 431broadcast_listener(udp_broadcast_service(Class, Addr)) :- 432 udp_broadcast_service(Class, Addr). 433 434broadcast_listener(udp_subnet(X)) :- 435 udp_broadcast(X, udp_subnet, 0.250). 436 437broadcast_listener(udp_subnet(X, Timeout)) :- 438 udp_broadcast(X, udp_subnet, Timeout). 439 440% 441% 442udp_basic_broadcast(S, Port, Term, Address) :- 443 udp_socket(S) 444 ~> tcp_close_socket(S), 445 446 ( udp_broadcast_service(udp_subnet, Address) 447 -> tcp_setopt(S, broadcast) 448 ; true 449 ), 450 451 tcp_bind(S, Port), % find our own ephemeral Port 452 term_to_atom(Term, Atom), 453 454 ( udp_subnet_member(Address) % talk only to your local subnet 455 -> safely(udp_send(S, Atom, Address, [])) 456 ; true). 457 458% directed broadcast to a single listener 459udp_broadcast(Term:To, _Scope, _Timeout) :- 460 ground(Term), ground(To), 461 !, 462 udp_basic_broadcast(_S, _Port, Term, To), 463 464 !. 465 466% broadcast to all listeners 467udp_broadcast(Term, Scope, _Timeout) :- 468 ground(Term), 469 !, 470 udp_broadcast_service(Scope, Address), 471 udp_basic_broadcast(_S, _Port, Term, Address), 472 473 !. 474 475% directed broadcast_request to a single listener 476udp_broadcast(Term:Address, _Scope, Timeout) :- 477 ground(Address), 478 !, 479 udp_basic_broadcast(S, Port, '$udp_request'(Term), Address), 480 udp_br_collect_replies(S, Port, Timeout, Term:Address). 481 482% broadcast_request to all listeners returning responder port-id 483udp_broadcast(Term:From, Scope, Timeout) :- 484 !, udp_broadcast_service(Scope, Address), 485 udp_basic_broadcast(S, Port, '$udp_request'(Term), Address), 486 udp_br_collect_replies(S, Port, Timeout, Term:From). 487 488% broadcast_request to all listeners ignoring responder port-id 489udp_broadcast(Term, Scope, Timeout) :- 490 udp_broadcast(Term:_, Scope, Timeout). 491 492udp_br_send_timeout(Port) :- 493 udp_socket(S) 494 ~> tcp_close_socket(S), 495 udp_send(S, '$udp_br_timeout', localhost:Port, []), 496 497 !. 498 499udp_br_collect_replies(S, Port, Timeout, Term:From) :- 500 alarm(Timeout, udp_br_send_timeout(Port), Id, [remove(false)]) 501 ~> remove_alarm(Id), 502 503 tcp_setopt(S, dispatch(false)), 504 505 repeat, 506 udp_receive(S, Atom, From1, [as(atom)]), 507 ( (Atom \== '$udp_br_timeout') 508 -> (From1 = From, safely(term_to_atom(Term, Atom))) 509 ; (!, fail)). 510 511%! udp_host_to_address(?Service, ?Address) is nondet. 512% 513% locates a UDP service by name. Service is an atom or grounded term 514% representing the common name of the service. Address is a UDP 515% address structure. A server may advertise its services by name by 516% including the fact, udp:host_to_address(+Service, +Address), 517% somewhere in its source. This predicate can also be used to perform 518% reverse searches. That is it will also resolve an Address to a 519% Service name. 520% 521 522udp_host_to_address(Host, Address) :- 523 broadcast_request(udp_subnet(udp_host_to_address(Host, Address))). 524 525%! udp_initialize is semidet. 526% See udp:udp_initialize/0 527% 528%:- multifile udp:udp_stack_initialize/0. 529 530 531%! udp_broadcast_initialize(+IPAddress, +SubnetMask) is semidet. 532% causes any required runtime initialization to occur. At present, 533% proper operation of UDP broadcast depends on local information 534% that is not easily obtained mechanically. In order to determine 535% the appropriate UDP broadcast address, you must supply the 536% IPAddress and SubnetMask for the node that is running this module. 537% These data are supplied in the form of ip/4 terms. This is now 538% required to be included in an applications intialization directive. 539% 540 541udp_broadcast_initialize(IPAddress, Subnet) :- 542 retractall(udp_broadcast_service(_,_)), 543 retractall(udp_subnet_member(_)), 544 545 udp_broadcast_address(IPAddress, Subnet, BroadcastAddr), 546 assert(udp_broadcast_service(udp_subnet, BroadcastAddr:20005)), 547 assert(udp_subnet_member(Address:_Port) :- udp_broadcast_address(Address, Subnet, BroadcastAddr)), 548 549 start_udp_listener_daemon