View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jeffrey Rosenwald
    4    E-mail:        jeffrose@acm.org
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2012-2013, Jeffrey Rosenwald
    7    All rights reserved.
    8
    9    Redistribution and use in source and binary forms, with or without
   10    modification, are permitted provided that the following conditions
   11    are met:
   12
   13    1. Redistributions of source code must retain the above copyright
   14       notice, this list of conditions and the following disclaimer.
   15
   16    2. Redistributions in binary form must reproduce the above copyright
   17       notice, this list of conditions and the following disclaimer in
   18       the documentation and/or other materials provided with the
   19       distribution.
   20
   21    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   22    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   23    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   24    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   25    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   26    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   27    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   28    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   29    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   30    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   31    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   32    POSSIBILITY OF SUCH DAMAGE.
   33*/
   34
   35:- module(udp_broadcast,
   36             [
   37             udp_host_to_address/2,    % ? Host, ? Address
   38             udp_broadcast_initialize/2,   % +IPAddress, +Subnet
   39             udp_broadcast_service/2   % ? Domain, ? Address
   40             ]).

A UDP Broadcast Bridge

SWI-Prolog's broadcast library provides a means that may be used to facilitate publish and subscribe communication regimes between anonymous members of a community of interest. The members of the community are however, necessarily limited to a single instance of Prolog. The UDP broadcast library removes that restriction. With this library loaded, any member on your local IP subnetwork that also has this library loaded may hear and respond to your broadcasts.

This module has only two public predicates. When the module is initialized, it starts a two listener threads that listen for broadcasts from others, received as UDP datagrams.

Unlike TIPC broadcast, UDP broadcast has only one scope, udp_subnet. A broadcast/1 or broadcast_request/1 that is not directed to the listener above, behaves as usual and is confined to the instance of Prolog that originated it. But when so directed, the broadcast will be sent to all participating systems, including itself, by way of UDP's multicast addressing facility. A UDP broadcast or broadcast request takes the typical form: broadcast(udp_subnet(+Term, +Timeout)). To prevent the potential for feedback loops, the scope qualifier is stripped from the message before transmission. The timeout is optional. It specifies the amount to time to wait for replies to arrive in response to a broadcast_request. The default period is 0.250 seconds. The timeout is ignored for broadcasts.

An example of three separate processes cooperating on the same Node:

Process A:

   ?- listen(number(X), between(1, 5, X)).
   true.

   ?-

Process B:

   ?- listen(number(X), between(7, 9, X)).
   true.

   ?-

Process C:

   ?- findall(X, broadcast_request(udp_subnet(number(X))), Xs).
   Xs = [1, 2, 3, 4, 5, 7, 8, 9].

   ?-

It is also possible to carry on a private dialog with a single responder. To do this, you supply a compound of the form, Term:PortId, to a UDP scoped broadcast/1 or broadcast_request/1, where PortId is the ip-address and port-id of the intended listener. If you supply an unbound variable, PortId, to broadcast_request, it will be unified with the address of the listener that responds to Term. You may send a directed broadcast to a specific member by simply providing this address in a similarly structured compound to a UDP scoped broadcast/1. The message is sent via unicast to that member only by way of the member's broadcast listener. It is received by the listener just as any other broadcast would be. The listener does not know the difference.

For example, in order to discover who responded with a particular value:

Host B Process 1:

   ?- listen(number(X), between(1, 5, X)).
   true.

   ?-

Host A Process 1:


   ?- listen(number(X), between(7, 9, X)).
   true.

   ?-

Host A Process 2:

   ?- listen(number(X), between(1, 5, X)).
   true.

   ?- bagof(X, broadcast_request(udp_subnet(number(X):From,1)), Xs).
   From = ip(192, 168, 1, 103):34855,
   Xs = [7, 8, 9] ;
   From = ip(192, 168, 1, 103):56331,
   Xs = [1, 2, 3, 4, 5] ;
   From = ip(192, 168, 1, 104):3217,
   Xs = [1, 2, 3, 4, 5].

Caveats

While the implementation is mostly transparent, there are some important and subtle differences that must be taken into consideration:

author
- Jeffrey Rosenwald (JeffRose@acm.org)
See also
- tipc.pl */
license
- BSD-2
  227:- use_module(library(socket)).  228:- use_module(library(broadcast)).  229:- use_module(library(time)).  230
  231:- require([ thread_self/1
  232           , forall/2
  233           , term_to_atom/2
  234           , thread_send_message/2
  235           , catch/3
  236           , setup_call_cleanup/3
  237           , thread_create/3
  238           ]).  239
  240% %     ~>(:P, :Q) is nondet.
  241% %     eventually_implies(P, Q) is nondet.
  242%    asserts temporal Liveness (something good happens, eventually) and
  243%    Safety (nothing bad ever happens) properties. Analogous to the
  244%    "leads-to" operator of Owicki and Lamport, 1982. Provides a sort of
  245%    lazy implication described informally as:
  246%
  247%    * Liveness: For all possible outcomes, P -> Q, eventually.
  248%    * Safety: For all possible outcomes, (\+P ; Q), is invariant.
  249%
  250%  Described practically:
  251%
  252%    P ~> Q, declares that if P is true, then Q must be true, now or at
  253%    some point in the future.
  254%
  255
  256:- meta_predicate ~>(0,0).  257:- op(950, xfy, ~>).  258
  259~>(P, Q) :-
  260    setup_call_cleanup(P,
  261                       (true; fail),
  262                       (   Q -> true;
  263                       throw(error(goal_failed(Q), context(~>, _))))
  264                      ).
  265
  266:- meta_predicate safely(0).  267
  268safely(Predicate) :-
  269    catch(Predicate, Err,
  270          (Err == '$aborted' -> (!, fail);
  271          print_message(error, Err), fail)).
  272
  273:- meta_predicate make_thread(0, +).  274
  275% You can't thread_signal a thread that isn't running.
  276
  277join_thread(Id) :-
  278    catch(thread_signal(Id, abort),
  279          error(existence_error(thread, Id), _Context),
  280          true),
  281
  282    thread_join(Id, exception('$aborted')).
  283
  284make_thread(Goal, Options) :-
  285    thread_create(safely(Goal), Id, [ detached(false) | Options ])
  286      ~> join_thread(Id).
  287
  288udp_broadcast_address(IPAddress, Subnet, BroadcastAddress) :-
  289    IPAddress = ip(A1, A2, A3, A4),
  290    Subnet = ip(S1, S2, S3, S4),
  291    BroadcastAddress = ip(B1, B2, B3, B4),
  292
  293    B1 is A1 \/ (S1 xor 255),
  294    B2 is A2 \/ (S2 xor 255),
  295    B3 is A3 \/ (S3 xor 255),
  296    B4 is A4 \/ (S4 xor 255).
 udp_broadcast_service(?Domain, ?Address) is nondet
provides the UDP broadcast address for a given Domain. At present, only one domain is supported, udp_subnet.
  303%  The following are defined at initialization:
  304:- dynamic
  305    udp_subnet_member/1,      % +IpAddress:Port
  306    udp_broadcast_service/2.  % ?Domain, ?BroadcastAddress:Port
  307
  308:- volatile
  309    udp_subnet_member/1,      % +IpAddress:Port
  310    udp_broadcast_service/2.  % ?Domain, ?BroadcastAddress:Port
  311%
  312%  Here's a UDP bridge to Prolog's broadcast library
  313%
  314%  A sender may extend a broadcast  to  a   subnet  of  a UDP network by
  315%  specifying a =|udp_subnet|= scoping qualifier   in his/her broadcast.
  316%  The qualifier has the effect of  selecting the appropriate multi-cast
  317%  address for the transmission. Thus,  the   sender  of the message has
  318%  control over the scope of his/her traffic on a per-message basis.
  319%
  320%  All in-scope listeners receive the   broadcast and simply rebroadcast
  321%  the message locally. All broadcast replies, if any, are sent directly
  322%  to the sender via the port-id that   was received with the broadcast.
  323%
  324%  Each listener exposes two UDP ports,  a   shared  public port that is
  325%  bound to a well-known port number and   a  private port that uniquely
  326%  indentifies the listener. Broadcasts are received  on the public port
  327%  and replies are  sent  on  the   private  port.  Directed  broadcasts
  328%  (unicasts) are received on the private port   and replies are sent on
  329%  the private port.
  330%
  331%  Interactions with TIPC Broadcast
  332%
  333%  As a security precaution, we do   not allow unsupervised transactions
  334%  directly between UDP and TIPC broadcast. These terms are black-listed
  335%  and ignored when received via UDP   listeners.  A UDP enabled service
  336%  that wishes to use TIPC resources on  the cluster must have a sponsor
  337%  on the TIPC cluster to filter   incoming  UDP broadcast traffic. This
  338%  can be as simple as loading  and initializing both tipc_broadcast and
  339%  udp_broadcast within the same TIPC broadcast service.
  340%
  341%  Because the UDP  and  TIPC  broadcast   listeners  are  operating  in
  342%  separate threads of execution, a  UDP   broadcast  sponsor can safely
  343%  perform a broadcast_request with  TIPC  scope   from  within  the UDP
  344%  broadcast listener context. This is one of the few scenarios where a
  345%  recursive broadcast_request with TIPC scope is safe.
  346%
  347
  348black_list(tipc_node(_)).
  349black_list(tipc_node(_,_)).
  350black_list(tipc_cluster(_)).
  351black_list(tipc_cluster(_,_)).
  352black_list(tipc_zone(_)).
  353black_list(tipc_zone(_,_)).
  354%
  355ld_dispatch(_S, '$udp_request'(Term), _From) :-
  356    black_list(Term), !, fail.
  357
  358ld_dispatch(_S, Term, _From) :-
  359    black_list(Term), !, fail.
  360
  361ld_dispatch(S, '$udp_request'(wru(Name)), From) :-
  362    !, gethostname(Name),
  363    term_to_atom(wru(Name), Atom),
  364    udp_send(S, Atom, From, []).
  365
  366ld_dispatch(S, '$udp_request'(Term), From) :-
  367    !, forall(broadcast_request(Term),
  368          (   term_to_atom(Term, Atom),
  369              udp_send(S, Atom, From, []))).
  370
  371ld_dispatch(_S, Term, _From) :-
  372    safely(broadcast(Term)).
  373
  374%  Thread 1 listens for directed traffic on the private port.
  375%
  376udp_listener_daemon1(S) :-
  377    repeat,
  378    safely(dispatch_traffic(S, S)).
  379
  380%  Thread 2 listens for broadcast traffic on the well-known public port
  381%  (S). All replies are originated from the private port (S1).
  382%
  383udp_listener_daemon2(Parent) :-
  384    udp_socket(S) ~> tcp_close_socket(S),
  385    udp_socket(S1) ~> tcp_close_socket(S1),
  386
  387    tcp_bind(S1, _PrivatePort),   % bind him to a private port now
  388
  389    make_thread(udp_listener_daemon1(S1),
  390                [ alias(udp_listener_daemon1)]),
  391
  392    tcp_setopt(S, reuseaddr),
  393
  394    udp_broadcast_service(udp_subnet, _Address:Port),
  395    tcp_bind(S, Port),      % bind to our public port
  396
  397    listen(udp_broadcast, Head, broadcast_listener(Head))
  398         ~> unlisten(udp_broadcast),
  399
  400    thread_send_message(Parent, udp_listener_daemon_ready),
  401
  402    repeat,
  403    safely(dispatch_traffic(S, S1)).
  404
  405dispatch_traffic(S, S1) :-
  406    udp_receive(S, Data, From,
  407                [as(atom), max_message_size(65535)]),
  408    udp_subnet_member(From),  % ignore all traffic that is foreign to my subnet
  409    term_to_atom(Term, Data),
  410    with_mutex(udp_broadcast, ld_dispatch(S1, Term, From)),
  411    !,
  412    dispatch_traffic(S, S1).
  413
  414start_udp_listener_daemon :-
  415    catch(thread_property(udp_listener_daemon2, status(running)),_, fail),
  416
  417    !.
  418
  419start_udp_listener_daemon :-
  420    thread_self(Self),
  421    thread_create(udp_listener_daemon2(Self), _,
  422           [alias(udp_listener_daemon2), detached(true)]),
  423    call_with_time_limit(6.0,
  424                         thread_get_message(udp_listener_daemon_ready)).
  425
  426:- multifile udp:host_to_address/2.  427%
  428broadcast_listener(udp_host_to_address(Host, Addr)) :-
  429    udp:host_to_address(Host, Addr).
  430
  431broadcast_listener(udp_broadcast_service(Class, Addr)) :-
  432    udp_broadcast_service(Class, Addr).
  433
  434broadcast_listener(udp_subnet(X)) :-
  435    udp_broadcast(X, udp_subnet, 0.250).
  436
  437broadcast_listener(udp_subnet(X, Timeout)) :-
  438    udp_broadcast(X, udp_subnet, Timeout).
  439
  440%
  441%
  442udp_basic_broadcast(S, Port, Term, Address) :-
  443    udp_socket(S)
  444      ~> tcp_close_socket(S),
  445
  446    (   udp_broadcast_service(udp_subnet, Address)
  447           -> tcp_setopt(S, broadcast)
  448           ; true
  449    ),
  450
  451    tcp_bind(S, Port),  % find our own ephemeral Port
  452    term_to_atom(Term, Atom),
  453
  454    (   udp_subnet_member(Address)  % talk only to your local subnet
  455        -> safely(udp_send(S, Atom, Address, []))
  456        ;  true).
  457
  458% directed broadcast to a single listener
  459udp_broadcast(Term:To, _Scope, _Timeout) :-
  460    ground(Term), ground(To),
  461    !,
  462    udp_basic_broadcast(_S, _Port, Term, To),
  463
  464    !.
  465
  466% broadcast to all listeners
  467udp_broadcast(Term, Scope, _Timeout) :-
  468    ground(Term),
  469    !,
  470    udp_broadcast_service(Scope, Address),
  471    udp_basic_broadcast(_S, _Port, Term, Address),
  472
  473    !.
  474
  475% directed broadcast_request to a single listener
  476udp_broadcast(Term:Address, _Scope, Timeout) :-
  477    ground(Address),
  478    !,
  479    udp_basic_broadcast(S, Port, '$udp_request'(Term), Address),
  480    udp_br_collect_replies(S, Port, Timeout, Term:Address).
  481
  482% broadcast_request to all listeners returning responder port-id
  483udp_broadcast(Term:From, Scope, Timeout) :-
  484    !, udp_broadcast_service(Scope, Address),
  485    udp_basic_broadcast(S, Port, '$udp_request'(Term), Address),
  486    udp_br_collect_replies(S, Port, Timeout, Term:From).
  487
  488% broadcast_request to all listeners ignoring responder port-id
  489udp_broadcast(Term, Scope, Timeout) :-
  490    udp_broadcast(Term:_, Scope, Timeout).
  491
  492udp_br_send_timeout(Port) :-
  493    udp_socket(S)
  494      ~> tcp_close_socket(S),
  495    udp_send(S, '$udp_br_timeout', localhost:Port, []),
  496
  497    !.
  498
  499udp_br_collect_replies(S, Port, Timeout, Term:From) :-
  500    alarm(Timeout, udp_br_send_timeout(Port), Id, [remove(false)])
  501      ~> remove_alarm(Id),
  502
  503    tcp_setopt(S, dispatch(false)),
  504
  505    repeat,
  506    udp_receive(S, Atom, From1, [as(atom)]),
  507    (   (Atom \== '$udp_br_timeout')
  508        -> (From1 = From, safely(term_to_atom(Term, Atom)))
  509        ;  (!, fail)).
 udp_host_to_address(?Service, ?Address) is nondet
locates a UDP service by name. Service is an atom or grounded term representing the common name of the service. Address is a UDP address structure. A server may advertise its services by name by including the fact, udp:host_to_address(+Service, +Address), somewhere in its source. This predicate can also be used to perform reverse searches. That is it will also resolve an Address to a Service name.
  522udp_host_to_address(Host, Address) :-
  523    broadcast_request(udp_subnet(udp_host_to_address(Host, Address))).
 udp_initialize is semidet
See udp_initialize/0

:- multifile udp_stack_initialize/0.

 udp_broadcast_initialize(+IPAddress, +SubnetMask) is semidet
causes any required runtime initialization to occur. At present, proper operation of UDP broadcast depends on local information that is not easily obtained mechanically. In order to determine the appropriate UDP broadcast address, you must supply the IPAddress and SubnetMask for the node that is running this module. These data are supplied in the form of ip/4 terms. This is now required to be included in an applications intialization directive.
  541udp_broadcast_initialize(IPAddress, Subnet) :-
  542    retractall(udp_broadcast_service(_,_)),
  543    retractall(udp_subnet_member(_)),
  544
  545    udp_broadcast_address(IPAddress, Subnet, BroadcastAddr),
  546    assert(udp_broadcast_service(udp_subnet, BroadcastAddr:20005)),
  547    assert(udp_subnet_member(Address:_Port) :- udp_broadcast_address(Address, Subnet, BroadcastAddr)),
  548
  549    start_udp_listener_daemon