1/* Part of SWI-Prolog 2 3 Author: Jeffrey Rosenwald 4 E-mail: jeffrose@acm.org 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2012-2013, Jeffrey Rosenwald 7 All rights reserved. 8 9 Redistribution and use in source and binary forms, with or without 10 modification, are permitted provided that the following conditions 11 are met: 12 13 1. Redistributions of source code must retain the above copyright 14 notice, this list of conditions and the following disclaimer. 15 16 2. Redistributions in binary form must reproduce the above copyright 17 notice, this list of conditions and the following disclaimer in 18 the documentation and/or other materials provided with the 19 distribution. 20 21 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 29 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 31 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 32 POSSIBILITY OF SUCH DAMAGE. 33*/ 34 35:- module(udp_broadcast, 36 [ 37 udp_host_to_address/2, % ? Host, ? Address 38 udp_broadcast_initialize/2, % +IPAddress, +Subnet 39 udp_broadcast_service/2 % ? Domain, ? Address 40 ]).
227:- use_module(library(socket)). 228:- use_module(library(broadcast)). 229:- use_module(library(time)). 230 231:- require([ thread_self/1 232 , forall/2 233 , term_to_atom/2 234 , thread_send_message/2 235 , catch/3 236 , setup_call_cleanup/3 237 , thread_create/3 238 ]). 239 240% % ~>(:P, :Q) is nondet. 241% % eventually_implies(P, Q) is nondet. 242% asserts temporal Liveness (something good happens, eventually) and 243% Safety (nothing bad ever happens) properties. Analogous to the 244% "leads-to" operator of Owicki and Lamport, 1982. Provides a sort of 245% lazy implication described informally as: 246% 247% * Liveness: For all possible outcomes, P -> Q, eventually. 248% * Safety: For all possible outcomes, (\+P ; Q), is invariant. 249% 250% Described practically: 251% 252% P ~> Q, declares that if P is true, then Q must be true, now or at 253% some point in the future. 254% 255 256:- meta_predicate ~>( , ). 257:- op(950, xfy, ~>). 258 259~>(P, Q) :- 260 setup_call_cleanup(, 261 (true; fail), 262 ( -> true; 263 throw(error(goal_failed(Q), context(~>, _)))) 264 ). 265 266:- meta_predicate safely( ). 267 268safely(Predicate) :- 269 catch(, Err, 270 (Err == '$aborted' -> (!, fail); 271 print_message(error, Err), fail)). 272 273:- meta_predicate make_thread( , ). 274 275% You can't thread_signal a thread that isn't running. 276 277join_thread(Id) :- 278 catch(thread_signal(Id, abort), 279 error(existence_error(thread, Id), _Context), 280 true), 281 282 thread_join(Id, exception('$aborted')). 283 284make_thread(Goal, Options) :- 285 thread_create(safely(), Id, [ detached(false) | Options ]) 286 ~> join_thread(Id). 287 288udp_broadcast_address(IPAddress, Subnet, BroadcastAddress) :- 289 IPAddress = ip(A1, A2, A3, A4), 290 Subnet = ip(S1, S2, S3, S4), 291 BroadcastAddress = ip(B1, B2, B3, B4), 292 293 B1 is A1 \/ (S1 xor 255), 294 B2 is A2 \/ (S2 xor 255), 295 B3 is A3 \/ (S3 xor 255), 296 B4 is A4 \/ (S4 xor 255).
udp_subnet
.303% The following are defined at initialization: 304:- dynamic 305 udp_subnet_member/1, % +IpAddress:Port 306 udp_broadcast_service/2. % ?Domain, ?BroadcastAddress:Port 307 308:- volatile 309 udp_subnet_member/1, % +IpAddress:Port 310 udp_broadcast_service/2. % ?Domain, ?BroadcastAddress:Port 311% 312% Here's a UDP bridge to Prolog's broadcast library 313% 314% A sender may extend a broadcast to a subnet of a UDP network by 315% specifying a =|udp_subnet|= scoping qualifier in his/her broadcast. 316% The qualifier has the effect of selecting the appropriate multi-cast 317% address for the transmission. Thus, the sender of the message has 318% control over the scope of his/her traffic on a per-message basis. 319% 320% All in-scope listeners receive the broadcast and simply rebroadcast 321% the message locally. All broadcast replies, if any, are sent directly 322% to the sender via the port-id that was received with the broadcast. 323% 324% Each listener exposes two UDP ports, a shared public port that is 325% bound to a well-known port number and a private port that uniquely 326% indentifies the listener. Broadcasts are received on the public port 327% and replies are sent on the private port. Directed broadcasts 328% (unicasts) are received on the private port and replies are sent on 329% the private port. 330% 331% Interactions with TIPC Broadcast 332% 333% As a security precaution, we do not allow unsupervised transactions 334% directly between UDP and TIPC broadcast. These terms are black-listed 335% and ignored when received via UDP listeners. A UDP enabled service 336% that wishes to use TIPC resources on the cluster must have a sponsor 337% on the TIPC cluster to filter incoming UDP broadcast traffic. This 338% can be as simple as loading and initializing both tipc_broadcast and 339% udp_broadcast within the same TIPC broadcast service. 340% 341% Because the UDP and TIPC broadcast listeners are operating in 342% separate threads of execution, a UDP broadcast sponsor can safely 343% perform a broadcast_request with TIPC scope from within the UDP 344% broadcast listener context. This is one of the few scenarios where a 345% recursive broadcast_request with TIPC scope is safe. 346% 347 348black_list(tipc_node(_)). 349black_list(tipc_node(_,_)). 350black_list(tipc_cluster(_)). 351black_list(tipc_cluster(_,_)). 352black_list(tipc_zone(_)). 353black_list(tipc_zone(_,_)). 354% 355ld_dispatch(_S, '$udp_request'(Term), _From) :- 356 black_list(Term), !, fail. 357 358ld_dispatch(_S, Term, _From) :- 359 black_list(Term), !, fail. 360 361ld_dispatch(S, '$udp_request'(wru(Name)), From) :- 362 !, gethostname(Name), 363 term_to_atom(wru(Name), Atom), 364 udp_send(S, Atom, From, []). 365 366ld_dispatch(S, '$udp_request'(Term), From) :- 367 !, forall(broadcast_request(Term), 368 ( term_to_atom(Term, Atom), 369 udp_send(S, Atom, From, []))). 370 371ld_dispatch(_S, Term, _From) :- 372 safely(broadcast(Term)). 373 374% Thread 1 listens for directed traffic on the private port. 375% 376udp_listener_daemon1(S) :- 377 repeat, 378 safely(dispatch_traffic(S, S)). 379 380% Thread 2 listens for broadcast traffic on the well-known public port 381% (S). All replies are originated from the private port (S1). 382% 383udp_listener_daemon2(Parent) :- 384 udp_socket(S) ~> tcp_close_socket(S), 385 udp_socket(S1) ~> tcp_close_socket(S1), 386 387 tcp_bind(S1, _PrivatePort), % bind him to a private port now 388 389 make_thread(udp_listener_daemon1(S1), 390 [ alias(udp_listener_daemon1)]), 391 392 tcp_setopt(S, reuseaddr), 393 394 udp_broadcast_service(udp_subnet, _Address:Port), 395 tcp_bind(S, Port), % bind to our public port 396 397 listen(udp_broadcast, Head, broadcast_listener(Head)) 398 ~> unlisten(udp_broadcast), 399 400 thread_send_message(Parent, udp_listener_daemon_ready), 401 402 repeat, 403 safely(dispatch_traffic(S, S1)). 404 405dispatch_traffic(S, S1) :- 406 udp_receive(S, Data, From, 407 [as(atom), max_message_size(65535)]), 408 udp_subnet_member(From), % ignore all traffic that is foreign to my subnet 409 term_to_atom(Term, Data), 410 with_mutex(udp_broadcast, ld_dispatch(S1, Term, From)), 411 !, 412 dispatch_traffic(S, S1). 413 414start_udp_listener_daemon :- 415 catch(thread_property(udp_listener_daemon2, status(running)),_, fail), 416 417 !. 418 419start_udp_listener_daemon :- 420 thread_self(Self), 421 thread_create(udp_listener_daemon2(Self), _, 422 [alias(udp_listener_daemon2), detached(true)]), 423 call_with_time_limit(6.0, 424 thread_get_message(udp_listener_daemon_ready)). 425 426:- multifile udp:host_to_address/2. 427% 428broadcast_listener(udp_host_to_address(Host, Addr)) :- 429 udp:host_to_address(Host, Addr). 430 431broadcast_listener(udp_broadcast_service(Class, Addr)) :- 432 udp_broadcast_service(Class, Addr). 433 434broadcast_listener(udp_subnet(X)) :- 435 udp_broadcast(X, udp_subnet, 0.250). 436 437broadcast_listener(udp_subnet(X, Timeout)) :- 438 udp_broadcast(X, udp_subnet, Timeout). 439 440% 441% 442udp_basic_broadcast(S, Port, Term, Address) :- 443 udp_socket(S) 444 ~> tcp_close_socket(S), 445 446 ( udp_broadcast_service(udp_subnet, Address) 447 -> tcp_setopt(S, broadcast) 448 ; true 449 ), 450 451 tcp_bind(S, Port), % find our own ephemeral Port 452 term_to_atom(Term, Atom), 453 454 ( udp_subnet_member(Address) % talk only to your local subnet 455 -> safely(udp_send(S, Atom, Address, [])) 456 ; true). 457 458% directed broadcast to a single listener 459udp_broadcast(Term:To, _Scope, _Timeout) :- 460 ground(Term), ground(To), 461 !, 462 udp_basic_broadcast(_S, _Port, Term, To), 463 464 !. 465 466% broadcast to all listeners 467udp_broadcast(Term, Scope, _Timeout) :- 468 ground(Term), 469 !, 470 udp_broadcast_service(Scope, Address), 471 udp_basic_broadcast(_S, _Port, Term, Address), 472 473 !. 474 475% directed broadcast_request to a single listener 476udp_broadcast(Term:Address, _Scope, Timeout) :- 477 ground(Address), 478 !, 479 udp_basic_broadcast(S, Port, '$udp_request'(Term), Address), 480 udp_br_collect_replies(S, Port, Timeout, Term:Address). 481 482% broadcast_request to all listeners returning responder port-id 483udp_broadcast(Term:From, Scope, Timeout) :- 484 !, udp_broadcast_service(Scope, Address), 485 udp_basic_broadcast(S, Port, '$udp_request'(Term), Address), 486 udp_br_collect_replies(S, Port, Timeout, Term:From). 487 488% broadcast_request to all listeners ignoring responder port-id 489udp_broadcast(Term, Scope, Timeout) :- 490 udp_broadcast(Term:_, Scope, Timeout). 491 492udp_br_send_timeout(Port) :- 493 udp_socket(S) 494 ~> tcp_close_socket(S), 495 udp_send(S, '$udp_br_timeout', localhost:Port, []), 496 497 !. 498 499udp_br_collect_replies(S, Port, Timeout, Term:From) :- 500 alarm(Timeout, udp_br_send_timeout(Port), Id, [remove(false)]) 501 ~> remove_alarm(Id), 502 503 tcp_setopt(S, dispatch(false)), 504 505 repeat, 506 udp_receive(S, Atom, From1, [as(atom)]), 507 ( (Atom \== '$udp_br_timeout') 508 -> (From1 = From, safely(term_to_atom(Term, Atom))) 509 ; (!, fail)).
host_to_address(+Service, +Address)
,
somewhere in its source. This predicate can also be used to perform
reverse searches. That is it will also resolve an Address to a
Service name.
522udp_host_to_address(Host, Address) :-
523 broadcast_request(udp_subnet(udp_host_to_address(Host, Address))).
:- multifile udp_stack_initialize/0.
541udp_broadcast_initialize(IPAddress, Subnet) :-
542 retractall(udp_broadcast_service(_,_)),
543 retractall(udp_subnet_member(_)),
544
545 udp_broadcast_address(IPAddress, Subnet, BroadcastAddr),
546 assert(udp_broadcast_service(udp_subnet, BroadcastAddr:20005)),
547 assert(udp_subnet_member(Address:_Port) :- udp_broadcast_address(Address, Subnet, BroadcastAddr)),
548
549 start_udp_listener_daemon
A UDP Broadcast Bridge
SWI-Prolog's broadcast library provides a means that may be used to facilitate publish and subscribe communication regimes between anonymous members of a community of interest. The members of the community are however, necessarily limited to a single instance of Prolog. The UDP broadcast library removes that restriction. With this library loaded, any member on your local IP subnetwork that also has this library loaded may hear and respond to your broadcasts.
This module has only two public predicates. When the module is initialized, it starts a two listener threads that listen for broadcasts from others, received as UDP datagrams.
Unlike TIPC broadcast, UDP broadcast has only one scope,
udp_subnet
. A broadcast/1 or broadcast_request/1 that is not directed to the listener above, behaves as usual and is confined to the instance of Prolog that originated it. But when so directed, the broadcast will be sent to all participating systems, including itself, by way of UDP's multicast addressing facility. A UDP broadcast or broadcast request takes the typical form:broadcast(udp_subnet(+Term, +Timeout))
. To prevent the potential for feedback loops, the scope qualifier is stripped from the message before transmission. The timeout is optional. It specifies the amount to time to wait for replies to arrive in response to a broadcast_request. The default period is 0.250 seconds. The timeout is ignored for broadcasts.An example of three separate processes cooperating on the same Node:
It is also possible to carry on a private dialog with a single responder. To do this, you supply a compound of the form, Term:PortId, to a UDP scoped broadcast/1 or broadcast_request/1, where PortId is the ip-address and port-id of the intended listener. If you supply an unbound variable, PortId, to broadcast_request, it will be unified with the address of the listener that responds to Term. You may send a directed broadcast to a specific member by simply providing this address in a similarly structured compound to a UDP scoped broadcast/1. The message is sent via unicast to that member only by way of the member's broadcast listener. It is received by the listener just as any other broadcast would be. The listener does not know the difference.
For example, in order to discover who responded with a particular value:
Caveats
While the implementation is mostly transparent, there are some important and subtle differences that must be taken into consideration:
udp_subnet
scope is not reentrant. If a listener performs a broadcast_request/1 with UDP scope recursively, then disaster looms certain. This caveat does not apply to a UDP scoped broadcast/1, which can safely be performed from a listener context.tipc.pl
*/