View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jeffrey Rosenwald
    4    E-mail:        jeffrose@acm.org
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2012-2013, Jeffrey Rosenwald
    7    All rights reserved.
    8
    9    Redistribution and use in source and binary forms, with or without
   10    modification, are permitted provided that the following conditions
   11    are met:
   12
   13    1. Redistributions of source code must retain the above copyright
   14       notice, this list of conditions and the following disclaimer.
   15
   16    2. Redistributions in binary form must reproduce the above copyright
   17       notice, this list of conditions and the following disclaimer in
   18       the documentation and/or other materials provided with the
   19       distribution.
   20
   21    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   22    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   23    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   24    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   25    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   26    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   27    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   28    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   29    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   30    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   31    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   32    POSSIBILITY OF SUCH DAMAGE.
   33*/
   34
   35:- module(udp_broadcast,
   36             [
   37             udp_host_to_address/2,    % ? Host, ? Address
   38             udp_broadcast_initialize/2,   % +IPAddress, +Subnet
   39             udp_broadcast_service/2   % ? Domain, ? Address
   40             ]).   41
   42/** <module> A UDP Broadcast Bridge
   43
   44SWI-Prolog's broadcast library provides a  means   that  may  be used to
   45facilitate publish and subscribe communication regimes between anonymous
   46members of a community of interest.  The   members  of the community are
   47however, necessarily limited to a  single   instance  of Prolog. The UDP
   48broadcast library removes that restriction.   With  this library loaded,
   49any member on your local IP subnetwork that also has this library loaded
   50may hear and respond to your broadcasts.
   51
   52This  module  has  only  two  public  predicates.  When  the  module  is
   53initialized, it starts a two listener threads that listen for broadcasts
   54from others, received as UDP datagrams.
   55
   56Unlike TIPC broadcast, UDP broadcast has only one scope, =udp_subnet=. A
   57broadcast/1 or broadcast_request/1 that is not  directed to the listener
   58above, behaves as usual and is confined   to the instance of Prolog that
   59originated it. But when so directed, the   broadcast will be sent to all
   60participating systems, including  itself,  by   way  of  UDP's multicast
   61addressing facility. A UDP broadcast  or   broadcast  request  takes the
   62typical form: =|broadcast(udp_subnet(+Term, +Timeout))|=. To prevent the
   63potential for feedback loops, the scope   qualifier is stripped from the
   64message before transmission. The timeout is   optional. It specifies the
   65amount to time  to  wait  for  replies   to  arrive  in  response  to  a
   66broadcast_request. The default period is 0.250   seconds. The timeout is
   67ignored for broadcasts.
   68
   69An example of three separate processes cooperating on the same Node:
   70
   71==
   72Process A:
   73
   74   ?- listen(number(X), between(1, 5, X)).
   75   true.
   76
   77   ?-
   78
   79Process B:
   80
   81   ?- listen(number(X), between(7, 9, X)).
   82   true.
   83
   84   ?-
   85
   86Process C:
   87
   88   ?- findall(X, broadcast_request(udp_subnet(number(X))), Xs).
   89   Xs = [1, 2, 3, 4, 5, 7, 8, 9].
   90
   91   ?-
   92==
   93
   94It is also  possible  to  carry  on   a  private  dialog  with  a single
   95responder. To do this, you supply a   compound of the form, Term:PortId,
   96to a UDP scoped broadcast/1 or  broadcast_request/1, where PortId is the
   97ip-address and port-id of  the  intended   listener.  If  you  supply an
   98unbound variable, PortId, to broadcast_request, it  will be unified with
   99the address of the listener  that  responds   to  Term.  You  may send a
  100directed broadcast to a specific member by simply providing this address
  101in a similarly structured compound  to   a  UDP  scoped broadcast/1. The
  102message is sent via unicast to that member   only by way of the member's
  103broadcast listener. It is received by  the   listener  just as any other
  104broadcast would be. The listener does not know the difference.
  105
  106For example, in order to discover who responded with a particular value:
  107
  108==
  109Host B Process 1:
  110
  111   ?- listen(number(X), between(1, 5, X)).
  112   true.
  113
  114   ?-
  115
  116Host A Process 1:
  117
  118
  119   ?- listen(number(X), between(7, 9, X)).
  120   true.
  121
  122   ?-
  123
  124Host A Process 2:
  125
  126   ?- listen(number(X), between(1, 5, X)).
  127   true.
  128
  129   ?- bagof(X, broadcast_request(udp_subnet(number(X):From,1)), Xs).
  130   From = ip(192, 168, 1, 103):34855,
  131   Xs = [7, 8, 9] ;
  132   From = ip(192, 168, 1, 103):56331,
  133   Xs = [1, 2, 3, 4, 5] ;
  134   From = ip(192, 168, 1, 104):3217,
  135   Xs = [1, 2, 3, 4, 5].
  136
  137==
  138
  139## Caveats {#udp-broadcase-caveats}
  140
  141While the implementation is mostly transparent, there are some important
  142and subtle differences that must be taken into consideration:
  143
  144    * UDP broadcast requires an initialization step in order to
  145    launch the broadcast listener daemon. See udp_broadcast_initialize/2.
  146
  147    * Prolog's broadcast_request/1 is nondet. It sends the request,
  148    then evaluates the replies synchronously, backtracking as needed
  149    until a satisfactory reply is received. The remaining potential
  150    replies are not evaluated. This is not so when UDP is involved.
  151
  152    * A UDP broadcast/1 is completely asynchronous.
  153
  154    * A  UDP broadcast_request/1 is partially synchronous. A
  155    broadcast_request/1 is sent, then the sender balks for a period of
  156    time (default: 250 ms) while the replies are collected. Any reply
  157    that is received after this period is silently discarded. A
  158    optional second argument is provided so that a sender may specify
  159    more (or less) time for replies.
  160
  161    * Replies are presented to the user as a choice point on arrival,
  162    until the broadcast request timer finally expires. This
  163    allows traffic to propagate through the system faster and provides
  164    the requestor with the opportunity to terminate a broadcast request
  165    early if desired, by simply cutting choice points.
  166
  167    * Please beware that broadcast request transactions remain active
  168    and resources consumed until broadcast_request finally fails on
  169    backtracking, an uncaught exception occurs, or until choice points
  170    are cut. Failure to properly manage this will likely result in
  171    chronic exhaustion of UDP sockets.
  172
  173    * If a listener is connected to a generator that always succeeds
  174    (e.g. a random number generator), then the broadcast request will
  175    never terminate and trouble is bound to ensue.
  176
  177    * broadcast_request/1 with =|udp_subnet|= scope is _not_ reentrant.
  178    If a listener performs a broadcast_request/1 with UDP scope
  179    recursively, then disaster looms certain. This caveat does not apply
  180    to a UDP scoped broadcast/1, which can safely be performed from a
  181    listener context.
  182
  183    * UDP broadcast's capacity is not infinite. While it can tolerate
  184    substantial bursts of activity, it is designed for short bursts of
  185    small messages. Unlike TIPC, UDP is unreliable and has no QOS
  186    protections. Congestion is likely to cause trouble in the form of
  187    non-Byzantine failure. That is, late, lost (e.g. infinitely late),
  188    or duplicate datagrams. Caveat emptor.
  189
  190    * A UDP broadcast_request/1 term that is grounded is considered to
  191    be a broadcast only. No replies are collected unless the there is at
  192    least one unbound variable to unify.
  193
  194    * A UDP broadcast/1 always succeeds, even if there are no
  195    listeners.
  196
  197    * A UDP broadcast_request/1 that receives no replies will fail.
  198
  199    * Replies may be coming from many different places in the network
  200    (or none at all). No ordering of replies is implied.
  201
  202    * Prolog terms are sent to others after first converting them to
  203    atoms using term_to_atom/2. Passing real numbers this way may
  204    result in a substantial truncation of precision.
  205
  206    * The broadcast model is based on anonymity and a presumption of
  207    trust--a perfect recipe for compromise. UDP is an Internet protocol.
  208    A UDP broadcast listener exposes a public port (20005), which is
  209    static and shared by all listeners, and a private port, which is
  210    semi-static and unique to the listener instance. Both can be seen
  211    from off-cluster nodes and networks. Usage of this module exposes
  212    the node and consequently, the cluster to significant security
  213    risks. So have a care when designing your application. You must talk
  214    only to those who share and contribute to your concerns using a
  215    carefully prescribed protocol.
  216
  217    * UDP broadcast categorically and silently ignores all message
  218    traffic originating from or terminating on nodes that are not
  219    members of the local subnet. This security measure only keeps honest
  220    people honest!
  221
  222@author    Jeffrey Rosenwald (JeffRose@acm.org)
  223@license   BSD-2
  224@see       tipc.pl
  225*/
  226
  227:- use_module(library(socket)).  228:- use_module(library(broadcast)).  229:- use_module(library(time)).  230
  231:- require([ thread_self/1
  232           , forall/2
  233           , term_to_atom/2
  234           , thread_send_message/2
  235           , catch/3
  236           , setup_call_cleanup/3
  237           , thread_create/3
  238           ]).  239
  240% %     ~>(:P, :Q) is nondet.
  241% %     eventually_implies(P, Q) is nondet.
  242%    asserts temporal Liveness (something good happens, eventually) and
  243%    Safety (nothing bad ever happens) properties. Analogous to the
  244%    "leads-to" operator of Owicki and Lamport, 1982. Provides a sort of
  245%    lazy implication described informally as:
  246%
  247%    * Liveness: For all possible outcomes, P -> Q, eventually.
  248%    * Safety: For all possible outcomes, (\+P ; Q), is invariant.
  249%
  250%  Described practically:
  251%
  252%    P ~> Q, declares that if P is true, then Q must be true, now or at
  253%    some point in the future.
  254%
  255
  256:- meta_predicate ~>(0,0).  257:- op(950, xfy, ~>).  258
  259~>(P, Q) :-
  260    setup_call_cleanup(P,
  261                       (true; fail),
  262                       (   Q -> true;
  263                       throw(error(goal_failed(Q), context(~>, _))))
  264                      ).
  265
  266:- meta_predicate safely(0).  267
  268safely(Predicate) :-
  269    catch(Predicate, Err,
  270          (Err == '$aborted' -> (!, fail);
  271          print_message(error, Err), fail)).
  272
  273:- meta_predicate make_thread(0, +).  274
  275% You can't thread_signal a thread that isn't running.
  276
  277join_thread(Id) :-
  278    catch(thread_signal(Id, abort),
  279          error(existence_error(thread, Id), _Context),
  280          true),
  281
  282    thread_join(Id, exception('$aborted')).
  283
  284make_thread(Goal, Options) :-
  285    thread_create(safely(Goal), Id, [ detached(false) | Options ])
  286      ~> join_thread(Id).
  287
  288udp_broadcast_address(IPAddress, Subnet, BroadcastAddress) :-
  289    IPAddress = ip(A1, A2, A3, A4),
  290    Subnet = ip(S1, S2, S3, S4),
  291    BroadcastAddress = ip(B1, B2, B3, B4),
  292
  293    B1 is A1 \/ (S1 xor 255),
  294    B2 is A2 \/ (S2 xor 255),
  295    B3 is A3 \/ (S3 xor 255),
  296    B4 is A4 \/ (S4 xor 255).
  297
  298%!  udp_broadcast_service(?Domain, ?Address) is nondet.
  299%   provides the UDP broadcast address for a given Domain. At present,
  300%   only one domain is supported, =|udp_subnet|=.
  301%
  302
  303%  The following are defined at initialization:
  304:- dynamic
  305    udp_subnet_member/1,      % +IpAddress:Port
  306    udp_broadcast_service/2.  % ?Domain, ?BroadcastAddress:Port
  307
  308:- volatile
  309    udp_subnet_member/1,      % +IpAddress:Port
  310    udp_broadcast_service/2.  % ?Domain, ?BroadcastAddress:Port
  311%
  312%  Here's a UDP bridge to Prolog's broadcast library
  313%
  314%  A sender may extend a broadcast  to  a   subnet  of  a UDP network by
  315%  specifying a =|udp_subnet|= scoping qualifier   in his/her broadcast.
  316%  The qualifier has the effect of  selecting the appropriate multi-cast
  317%  address for the transmission. Thus,  the   sender  of the message has
  318%  control over the scope of his/her traffic on a per-message basis.
  319%
  320%  All in-scope listeners receive the   broadcast and simply rebroadcast
  321%  the message locally. All broadcast replies, if any, are sent directly
  322%  to the sender via the port-id that   was received with the broadcast.
  323%
  324%  Each listener exposes two UDP ports,  a   shared  public port that is
  325%  bound to a well-known port number and   a  private port that uniquely
  326%  indentifies the listener. Broadcasts are received  on the public port
  327%  and replies are  sent  on  the   private  port.  Directed  broadcasts
  328%  (unicasts) are received on the private port   and replies are sent on
  329%  the private port.
  330%
  331%  Interactions with TIPC Broadcast
  332%
  333%  As a security precaution, we do   not allow unsupervised transactions
  334%  directly between UDP and TIPC broadcast. These terms are black-listed
  335%  and ignored when received via UDP   listeners.  A UDP enabled service
  336%  that wishes to use TIPC resources on  the cluster must have a sponsor
  337%  on the TIPC cluster to filter   incoming  UDP broadcast traffic. This
  338%  can be as simple as loading  and initializing both tipc_broadcast and
  339%  udp_broadcast within the same TIPC broadcast service.
  340%
  341%  Because the UDP  and  TIPC  broadcast   listeners  are  operating  in
  342%  separate threads of execution, a  UDP   broadcast  sponsor can safely
  343%  perform a broadcast_request with  TIPC  scope   from  within  the UDP
  344%  broadcast listener context. This is one of the few scenarios where a
  345%  recursive broadcast_request with TIPC scope is safe.
  346%
  347
  348black_list(tipc_node(_)).
  349black_list(tipc_node(_,_)).
  350black_list(tipc_cluster(_)).
  351black_list(tipc_cluster(_,_)).
  352black_list(tipc_zone(_)).
  353black_list(tipc_zone(_,_)).
  354%
  355ld_dispatch(_S, '$udp_request'(Term), _From) :-
  356    black_list(Term), !, fail.
  357
  358ld_dispatch(_S, Term, _From) :-
  359    black_list(Term), !, fail.
  360
  361ld_dispatch(S, '$udp_request'(wru(Name)), From) :-
  362    !, gethostname(Name),
  363    term_to_atom(wru(Name), Atom),
  364    udp_send(S, Atom, From, []).
  365
  366ld_dispatch(S, '$udp_request'(Term), From) :-
  367    !, forall(broadcast_request(Term),
  368          (   term_to_atom(Term, Atom),
  369              udp_send(S, Atom, From, []))).
  370
  371ld_dispatch(_S, Term, _From) :-
  372    safely(broadcast(Term)).
  373
  374%  Thread 1 listens for directed traffic on the private port.
  375%
  376udp_listener_daemon1(S) :-
  377    repeat,
  378    safely(dispatch_traffic(S, S)).
  379
  380%  Thread 2 listens for broadcast traffic on the well-known public port
  381%  (S). All replies are originated from the private port (S1).
  382%
  383udp_listener_daemon2(Parent) :-
  384    udp_socket(S) ~> tcp_close_socket(S),
  385    udp_socket(S1) ~> tcp_close_socket(S1),
  386
  387    tcp_bind(S1, _PrivatePort),   % bind him to a private port now
  388
  389    make_thread(udp_listener_daemon1(S1),
  390                [ alias(udp_listener_daemon1)]),
  391
  392    tcp_setopt(S, reuseaddr),
  393
  394    udp_broadcast_service(udp_subnet, _Address:Port),
  395    tcp_bind(S, Port),      % bind to our public port
  396
  397    listen(udp_broadcast, Head, broadcast_listener(Head))
  398         ~> unlisten(udp_broadcast),
  399
  400    thread_send_message(Parent, udp_listener_daemon_ready),
  401
  402    repeat,
  403    safely(dispatch_traffic(S, S1)).
  404
  405dispatch_traffic(S, S1) :-
  406    udp_receive(S, Data, From,
  407                [as(atom), max_message_size(65535)]),
  408    udp_subnet_member(From),  % ignore all traffic that is foreign to my subnet
  409    term_to_atom(Term, Data),
  410    with_mutex(udp_broadcast, ld_dispatch(S1, Term, From)),
  411    !,
  412    dispatch_traffic(S, S1).
  413
  414start_udp_listener_daemon :-
  415    catch(thread_property(udp_listener_daemon2, status(running)),_, fail),
  416
  417    !.
  418
  419start_udp_listener_daemon :-
  420    thread_self(Self),
  421    thread_create(udp_listener_daemon2(Self), _,
  422           [alias(udp_listener_daemon2), detached(true)]),
  423    call_with_time_limit(6.0,
  424                         thread_get_message(udp_listener_daemon_ready)).
  425
  426:- multifile udp:host_to_address/2.  427%
  428broadcast_listener(udp_host_to_address(Host, Addr)) :-
  429    udp:host_to_address(Host, Addr).
  430
  431broadcast_listener(udp_broadcast_service(Class, Addr)) :-
  432    udp_broadcast_service(Class, Addr).
  433
  434broadcast_listener(udp_subnet(X)) :-
  435    udp_broadcast(X, udp_subnet, 0.250).
  436
  437broadcast_listener(udp_subnet(X, Timeout)) :-
  438    udp_broadcast(X, udp_subnet, Timeout).
  439
  440%
  441%
  442udp_basic_broadcast(S, Port, Term, Address) :-
  443    udp_socket(S)
  444      ~> tcp_close_socket(S),
  445
  446    (   udp_broadcast_service(udp_subnet, Address)
  447           -> tcp_setopt(S, broadcast)
  448           ; true
  449    ),
  450
  451    tcp_bind(S, Port),  % find our own ephemeral Port
  452    term_to_atom(Term, Atom),
  453
  454    (   udp_subnet_member(Address)  % talk only to your local subnet
  455        -> safely(udp_send(S, Atom, Address, []))
  456        ;  true).
  457
  458% directed broadcast to a single listener
  459udp_broadcast(Term:To, _Scope, _Timeout) :-
  460    ground(Term), ground(To),
  461    !,
  462    udp_basic_broadcast(_S, _Port, Term, To),
  463
  464    !.
  465
  466% broadcast to all listeners
  467udp_broadcast(Term, Scope, _Timeout) :-
  468    ground(Term),
  469    !,
  470    udp_broadcast_service(Scope, Address),
  471    udp_basic_broadcast(_S, _Port, Term, Address),
  472
  473    !.
  474
  475% directed broadcast_request to a single listener
  476udp_broadcast(Term:Address, _Scope, Timeout) :-
  477    ground(Address),
  478    !,
  479    udp_basic_broadcast(S, Port, '$udp_request'(Term), Address),
  480    udp_br_collect_replies(S, Port, Timeout, Term:Address).
  481
  482% broadcast_request to all listeners returning responder port-id
  483udp_broadcast(Term:From, Scope, Timeout) :-
  484    !, udp_broadcast_service(Scope, Address),
  485    udp_basic_broadcast(S, Port, '$udp_request'(Term), Address),
  486    udp_br_collect_replies(S, Port, Timeout, Term:From).
  487
  488% broadcast_request to all listeners ignoring responder port-id
  489udp_broadcast(Term, Scope, Timeout) :-
  490    udp_broadcast(Term:_, Scope, Timeout).
  491
  492udp_br_send_timeout(Port) :-
  493    udp_socket(S)
  494      ~> tcp_close_socket(S),
  495    udp_send(S, '$udp_br_timeout', localhost:Port, []),
  496
  497    !.
  498
  499udp_br_collect_replies(S, Port, Timeout, Term:From) :-
  500    alarm(Timeout, udp_br_send_timeout(Port), Id, [remove(false)])
  501      ~> remove_alarm(Id),
  502
  503    tcp_setopt(S, dispatch(false)),
  504
  505    repeat,
  506    udp_receive(S, Atom, From1, [as(atom)]),
  507    (   (Atom \== '$udp_br_timeout')
  508        -> (From1 = From, safely(term_to_atom(Term, Atom)))
  509        ;  (!, fail)).
  510
  511%!  udp_host_to_address(?Service, ?Address) is nondet.
  512%
  513%   locates a UDP service by name. Service  is an atom or grounded term
  514%   representing the common name  of  the   service.  Address  is a UDP
  515%   address structure. A server may advertise   its  services by name by
  516%   including  the  fact,    udp:host_to_address(+Service,   +Address),
  517%   somewhere in its source. This predicate can  also be used to perform
  518%   reverse searches. That is it  will  also   resolve  an  Address to a
  519%   Service name.
  520%
  521
  522udp_host_to_address(Host, Address) :-
  523    broadcast_request(udp_subnet(udp_host_to_address(Host, Address))).
  524
  525%!  udp_initialize is semidet.
  526%   See udp:udp_initialize/0
  527%
  528%:- multifile udp:udp_stack_initialize/0.
  529
  530
  531%!  udp_broadcast_initialize(+IPAddress, +SubnetMask) is semidet.
  532%   causes any required runtime initialization to occur. At present,
  533%   proper operation of UDP broadcast depends on local information
  534%   that is not easily obtained mechanically. In order to determine
  535%   the appropriate UDP broadcast address, you must supply the
  536%   IPAddress and SubnetMask for the node that is running this module.
  537%   These data are supplied in the form of ip/4 terms. This is now
  538%   required to be included in an applications intialization directive.
  539%
  540
  541udp_broadcast_initialize(IPAddress, Subnet) :-
  542    retractall(udp_broadcast_service(_,_)),
  543    retractall(udp_subnet_member(_)),
  544
  545    udp_broadcast_address(IPAddress, Subnet, BroadcastAddr),
  546    assert(udp_broadcast_service(udp_subnet, BroadcastAddr:20005)),
  547    assert(udp_subnet_member(Address:_Port) :- udp_broadcast_address(Address, Subnet, BroadcastAddr)),
  548
  549    start_udp_listener_daemon