View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2006-2015, University of Amsterdam
    7                              VU University Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(rdf_persistency,
   37          [ rdf_attach_db/2,            % +Directory, +Options
   38            rdf_detach_db/0,            % +Detach current Graph
   39            rdf_current_db/1,           % -Directory
   40            rdf_persistency/2,          % +Graph, +Bool
   41            rdf_flush_journals/1,       % +Options
   42            rdf_persistency_property/1, % ?Property
   43            rdf_journal_file/2,         % ?Graph, ?JournalFile
   44            rdf_snapshot_file/2,        % ?Graph, ?SnapshotFile
   45            rdf_db_to_file/2            % ?Graph, ?FileBase
   46          ]).   47:- use_module(library(semweb/rdf_db)).   48:- use_module(library(filesex)).   49:- use_module(library(lists)).   50:- use_module(library(uri)).   51:- use_module(library(debug)).   52:- use_module(library(option)).   53:- use_module(library(error)).   54:- use_module(library(thread)).   55:- use_module(library(apply)).   56
   57
   58/** <module> RDF persistency plugin
   59
   60This  module  provides  persistency   for    rdf_db.pl   based   on  the
   61rdf_monitor/2 predicate to  track  changes   to  the  repository.  Where
   62previous  versions  used  autosafe  of  the  whole  database  using  the
   63quick-load format of rdf_db, this version is  based on a quick-load file
   64per source (4th argument of rdf/4), and journalling for edit operations.
   65
   66The result is safe, avoids frequent small   changes to large files which
   67makes synchronisation and backup expensive and avoids long disruption of
   68the server doing the autosafe. Only loading large files disrupts service
   69for some time.
   70
   71The persistent backup of the database is  realised in a directory, using
   72a lock file to avoid corruption due to concurrent access. Each source is
   73represented by two files, the latest snapshot   and a journal. The state
   74is restored by loading  the  snapshot   and  replaying  the journal. The
   75predicate rdf_flush_journals/1 can be used to create fresh snapshots and
   76delete the journals.
   77
   78@tbd If there is a complete `.new'   snapshot  and no journal, we should
   79     move the .new to the plain snapshot name as a means of recovery.
   80
   81@tbd Backup of each graph using one or two files is very costly if there
   82     are many graphs.  Although the currently used subdirectories avoid
   83     hitting OS limits early, this is still not ideal. Probably we
   84     should collect (small, older?) files and combine them into a single
   85     quick load file.  We could call this (similar to GIT) a `pack'.
   86
   87@see    rdf_edit.pl
   88*/
   89
   90:- volatile
   91    rdf_directory/1,
   92    rdf_lock/2,
   93    rdf_option/1,
   94    source_journal_fd/2,
   95    file_base_db/2.   96:- dynamic
   97    rdf_directory/1,                % Absolute path
   98    rdf_lock/2,                     % Dir, Lock
   99    rdf_option/1,                   % Defined options
  100    source_journal_fd/2,            % DB, JournalFD
  101    file_base_db/2.                 % FileBase, DB
  102
  103:- meta_predicate
  104    no_agc(0).  105
  106:- predicate_options(rdf_attach_db/2, 2,
  107                     [ access(oneof([read_write,read_only])),
  108                       concurrency(positive_integer),
  109                       max_open_journals(positive_integer),
  110                       silent(oneof([true,false,brief])),
  111                       log_nested_transactions(boolean)
  112                     ]).  113
  114%!  rdf_attach_db(+Directory, +Options) is det.
  115%
  116%   Start persistent operations using Directory   as  place to store
  117%   files.   There are several cases:
  118%
  119%           * Empty DB, existing directory
  120%           Load the DB from the existing directory
  121%
  122%           * Full DB, empty directory
  123%           Create snapshots for all sources in directory
  124%
  125%   Options:
  126%
  127%           * access(+AccessMode)
  128%           One of =auto= (default), =read_write= or
  129%           =read_only=. Read-only access implies that the RDF
  130%           store is not locked. It is read at startup and all
  131%           modifications to the data are temporary. The default
  132%           =auto= mode is =read_write= if the directory is
  133%           writeable and the lock can be acquired.  Otherwise
  134%           it reverts to =read_only=.
  135%
  136%           * concurrency(+Jobs)
  137%           Number of threads to use for loading the initial
  138%           database.  If not provided it is the number of CPUs
  139%           as optained from the flag =cpu_count=.
  140%
  141%           * max_open_journals(+Count)
  142%           Maximum number of journals kept open.  If not provided,
  143%           the default is 10.  See limit_fd_pool/0.
  144%
  145%           * directory_levels(+Count)
  146%           Number of levels of intermediate directories for storing
  147%           the graph files.  Default is 2.
  148%
  149%           * silent(+BoolOrBrief)
  150%           If =true= (default =false=), do not print informational
  151%           messages.  Finally, if =brief= it will show minimal
  152%           feedback.
  153%
  154%           * log_nested_transactions(+Boolean)
  155%           If =true=, nested _log_ transactions are added to the
  156%           journal information.  By default (=false=), no log-term
  157%           is added for nested transactions.\\
  158%
  159%   @error existence_error(source_sink, Directory)
  160%   @error permission_error(write, directory, Directory)
  161
  162rdf_attach_db(DirSpec, Options) :-
  163    option(access(read_only), Options),
  164    !,
  165    absolute_file_name(DirSpec,
  166                       Directory,
  167                       [ access(read),
  168                         file_type(directory)
  169                       ]),
  170    rdf_attach_db_ro(Directory, Options).
  171rdf_attach_db(DirSpec, Options) :-
  172    option(access(read_write), Options),
  173    !,
  174    rdf_attach_db_rw(DirSpec, Options).
  175rdf_attach_db(DirSpec, Options) :-
  176    absolute_file_name(DirSpec,
  177                       Directory,
  178                       [ access(exist),
  179                         file_type(directory),
  180                         file_errors(fail)
  181                       ]),
  182    !,
  183    (   access_file(Directory, write)
  184    ->  catch(rdf_attach_db_rw(Directory, Options), E, true),
  185        (   var(E)
  186        ->  true
  187        ;   E = error(permission_error(lock, rdf_db, _), _)
  188        ->  print_message(warning, E),
  189            print_message(warning, rdf(read_only)),
  190            rdf_attach_db(DirSpec, [access(read_only)|Options])
  191        ;   throw(E)
  192        )
  193    ;   print_message(warning,
  194                      error(permission_error(write, directory, Directory))),
  195        print_message(warning, rdf(read_only)),
  196        rdf_attach_db_ro(Directory, Options)
  197    ).
  198rdf_attach_db(DirSpec, Options) :-
  199    catch(rdf_attach_db_rw(DirSpec, Options), E, true),
  200    (   var(E)
  201    ->  true
  202    ;   print_message(warning, E),
  203        print_message(warning, rdf(read_only)),
  204        rdf_attach_db(DirSpec, [access(read_only)|Options])
  205    ).
  206
  207
  208rdf_attach_db_rw(DirSpec, Options) :-
  209    absolute_file_name(DirSpec,
  210                       Directory,
  211                       [ access(write),
  212                         file_type(directory),
  213                         file_errors(fail)
  214                       ]),
  215    !,
  216    (   rdf_directory(Directory)
  217    ->  true                        % update settings?
  218    ;   rdf_detach_db,
  219        mkdir(Directory),
  220        lock_db(Directory),
  221        assert(rdf_directory(Directory)),
  222        assert_options(Options),
  223        stop_monitor,               % make sure not to register load
  224        no_agc(load_db),
  225        at_halt(rdf_detach_db),
  226        start_monitor
  227    ).
  228rdf_attach_db_rw(DirSpec, Options) :-
  229    absolute_file_name(DirSpec,
  230                       Directory,
  231                       [ solutions(all)
  232                       ]),
  233    (   exists_directory(Directory)
  234    ->  access_file(Directory, write)
  235    ;   catch(make_directory(Directory), _, fail)
  236    ),
  237    !,
  238    rdf_attach_db(Directory, Options).
  239rdf_attach_db_rw(DirSpec, _) :-         % Generate an existence or
  240    absolute_file_name(DirSpec,     % permission error
  241                       Directory,
  242                       [ access(exist),
  243                         file_type(directory)
  244                       ]),
  245    permission_error(write, directory, Directory).
  246
  247%!  rdf_attach_db_ro(+Directory, +Options)
  248%
  249%   Open an RDF database in read-only mode.
  250
  251rdf_attach_db_ro(Directory, Options) :-
  252    rdf_detach_db,
  253    assert(rdf_directory(Directory)),
  254    assert_options(Options),
  255    stop_monitor,           % make sure not to register load
  256    no_agc(load_db).
  257
  258
  259assert_options([]).
  260assert_options([H|T]) :-
  261    (   option_type(H, Check)
  262    ->  Check,
  263        assert(rdf_option(H))
  264    ;   true                        % ignore options we do not understand
  265    ),
  266    assert_options(T).
  267
  268option_type(concurrency(X),             must_be(positive_integer, X)).
  269option_type(max_open_journals(X),       must_be(positive_integer, X)).
  270option_type(directory_levels(X),        must_be(positive_integer, X)).
  271option_type(silent(X),                  must_be(oneof([true,false,brief]), X)).
  272option_type(log_nested_transactions(X), must_be(boolean, X)).
  273option_type(access(X),                  must_be(oneof([read_write,
  274                                                       read_only]), X)).
  275
  276
  277%!  rdf_persistency_property(?Property) is nondet.
  278%
  279%   True if Property  is  a  property   of  the  current  persistent
  280%   database. Currently makes to options   passed to rdf_attach_db/2
  281%   available.  Notable  rdf_persistency_property(access(read_only))
  282%   is true if the database  is   mounted  in  read-only mode. Other
  283%   properties:
  284%
  285%     - directory(Dir)
  286%     Directory in which the database resides.
  287
  288rdf_persistency_property(Property) :-
  289    var(Property),
  290    !,
  291    rdf_persistency_property_(Property).
  292rdf_persistency_property(Property) :-
  293    rdf_persistency_property_(Property),
  294    !.
  295
  296rdf_persistency_property_(Property) :-
  297    rdf_option(Property).
  298rdf_persistency_property_(directory(Dir)) :-
  299    rdf_directory(Dir).
  300
  301%!  no_agc(:Goal)
  302%
  303%   Run Goal with atom garbage collection   disabled. Loading an RDF
  304%   database creates large amounts  of  atoms   we  *know*  are  not
  305%   garbage.
  306
  307no_agc(Goal) :-
  308    current_prolog_flag(agc_margin, Old),
  309    setup_call_cleanup(
  310        set_prolog_flag(agc_margin, 0),
  311        Goal,
  312        set_prolog_flag(agc_margin, Old)).
  313
  314
  315%!  rdf_detach_db is det.
  316%
  317%   Detach from the  current  database.   Succeeds  silently  if  no
  318%   database is attached. Normally called at  the end of the program
  319%   through at_halt/1.
  320
  321rdf_detach_db :-
  322    debug(halt, 'Detaching RDF database', []),
  323    stop_monitor,
  324    close_journals,
  325    (   retract(rdf_directory(Dir))
  326    ->  debug(halt, 'DB Directory: ~w', [Dir]),
  327        save_prefixes(Dir),
  328        retractall(rdf_option(_)),
  329        retractall(source_journal_fd(_,_)),
  330        unlock_db(Dir)
  331    ;   true
  332    ).
  333
  334
  335%!  rdf_current_db(?Dir)
  336%
  337%   True if Dir is the current RDF persistent database.
  338
  339rdf_current_db(Directory) :-
  340    rdf_directory(Dir),
  341    !,
  342    Dir = Directory.
  343
  344
  345%!  rdf_flush_journals(+Options)
  346%
  347%   Flush dirty journals.  Options:
  348%
  349%           * min_size(+KB)
  350%           Only flush if journal is over KB in size.
  351%           * graph(+Graph)
  352%           Only flush the journal of Graph
  353%
  354%   @tbd Provide a default for min_size?
  355
  356rdf_flush_journals(Options) :-
  357    option(graph(Graph), Options, _),
  358    forall(rdf_graph(Graph),
  359           rdf_flush_journal(Graph, Options)).
  360
  361rdf_flush_journal(Graph, Options) :-
  362    db_files(Graph, _SnapshotFile, JournalFile),
  363    db_file(JournalFile, File),
  364    (   \+ exists_file(File)
  365    ->  true
  366    ;   memberchk(min_size(KB), Options),
  367        size_file(JournalFile, Size),
  368        Size / 1024 < KB
  369    ->  true
  370    ;   create_db(Graph)
  371    ).
  372
  373                 /*******************************
  374                 *             LOAD             *
  375                 *******************************/
  376
  377%!  load_db is det.
  378%
  379%   Reload database from the directory specified by rdf_directory/1.
  380%   First we find all names graphs using find_dbs/1 and then we load
  381%   them.
  382
  383load_db :-
  384    rdf_directory(Dir),
  385    concurrency(Jobs),
  386    cpu_stat_key(Jobs, StatKey),
  387    get_time(Wall0),
  388    statistics(StatKey, T0),
  389    load_prefixes(Dir),
  390    verbosity(Silent),
  391    find_dbs(Dir, Graphs, SnapShots, Journals),
  392    length(Graphs, GraphCount),
  393    maplist(rdf_unload_graph, Graphs),
  394    rdf_statistics(triples(Triples0)),
  395    load_sources(snapshots, SnapShots, Silent, Jobs),
  396    load_sources(journals, Journals, Silent, Jobs),
  397    rdf_statistics(triples(Triples1)),
  398    statistics(StatKey, T1),
  399    get_time(Wall1),
  400    T is T1 - T0,
  401    Wall is Wall1 - Wall0,
  402    Triples = Triples1 - Triples0,
  403    message_level(Silent, Level),
  404    print_message(Level, rdf(restore(attached(GraphCount, Triples, T/Wall)))).
  405
  406load_sources(_, [], _, _) :- !.
  407load_sources(Type, Sources, Silent, Jobs) :-
  408    length(Sources, Count),
  409    RunJobs is min(Count, Jobs),
  410    print_message(informational, rdf(restoring(Type, Count, RunJobs))),
  411    make_goals(Sources, Silent, 1, Count, Goals),
  412    concurrent(RunJobs, Goals, []).
  413
  414
  415%!  make_goals(+DBs, +Silent, +Index, +Total, -Goals)
  416
  417make_goals([], _, _, _, []).
  418make_goals([DB|T0], Silent, I,  Total,
  419           [load_source(DB, Silent, I, Total)|T]) :-
  420    I2 is I + 1,
  421    make_goals(T0, Silent, I2, Total, T).
  422
  423verbosity(Silent) :-
  424    rdf_option(silent(Silent)),
  425    !.
  426verbosity(Silent) :-
  427    current_prolog_flag(verbose, silent),
  428    !,
  429    Silent = true.
  430verbosity(brief).
  431
  432
  433%!  concurrency(-Jobs)
  434%
  435%   Number of jobs to run concurrently.
  436
  437concurrency(Jobs) :-
  438    rdf_option(concurrency(Jobs)),
  439    !.
  440concurrency(Jobs) :-
  441    current_prolog_flag(cpu_count, Jobs),
  442    Jobs > 0,
  443    !.
  444concurrency(1).
  445
  446cpu_stat_key(1, cputime) :- !.
  447cpu_stat_key(_, process_cputime).
  448
  449
  450%!  find_dbs(+Dir, -Graphs, -SnapBySize, -JournalBySize) is det.
  451%
  452%   Scan the persistent database and return a list of snapshots and
  453%   journals, both sorted by file-size.  Each term is of the form
  454%
  455%     ==
  456%     db(Size, Ext, DB, DBFile, Depth)
  457%     ==
  458
  459find_dbs(Dir, Graphs, SnapBySize, JournalBySize) :-
  460    directory_files(Dir, Files),
  461    phrase(scan_db_files(Files, Dir, '.', 0), Scanned),
  462    maplist(db_graph, Scanned, UnsortedGraphs),
  463    sort(UnsortedGraphs, Graphs),
  464    (   consider_reindex_db(Dir, Graphs, Scanned)
  465    ->  find_dbs(Dir, Graphs, SnapBySize, JournalBySize)
  466    ;   partition(db_is_snapshot, Scanned, Snapshots, Journals),
  467        sort(Snapshots, SnapBySize),
  468        sort(Journals, JournalBySize)
  469    ).
  470
  471consider_reindex_db(Dir, Graphs, Scanned) :-
  472    length(Graphs, Count),
  473    Count > 0,
  474    DepthNeeded is floor(log(Count)/log(256)),
  475    (   maplist(depth_db(DepthNow), Scanned)
  476    ->  (   DepthNeeded > DepthNow
  477        ->  true
  478        ;   retractall(rdf_option(directory_levels(_))),
  479            assertz(rdf_option(directory_levels(DepthNow))),
  480            fail
  481        )
  482    ;   true
  483    ),
  484    reindex_db(Dir, DepthNeeded).
  485
  486db_is_snapshot(Term) :-
  487    arg(2, Term, trp).
  488
  489db_graph(Term, DB) :-
  490    arg(3, Term, DB).
  491
  492db_file_name(Term, File) :-
  493    arg(4, Term, File).
  494
  495depth_db(Depth, DB) :-
  496    arg(5, DB, Depth).
  497
  498%!  scan_db_files(+Files, +Dir, +Prefix, +Depth)// is det.
  499%
  500%   Produces a list of db(DB,  Size,   File)  for all recognised RDF
  501%   database files.  File is relative to the database directory Dir.
  502
  503scan_db_files([], _, _, _) -->
  504    [].
  505scan_db_files([Nofollow|T], Dir, Prefix, Depth) -->
  506    { nofollow(Nofollow) },
  507    !,
  508    scan_db_files(T, Dir, Prefix, Depth).
  509scan_db_files([File|T], Dir, Prefix, Depth) -->
  510    { file_name_extension(Base, Ext, File),
  511      db_extension(Ext),
  512      !,
  513      rdf_db_to_file(DB, Base),
  514      directory_file_path(Prefix, File, DBFile),
  515      directory_file_path(Dir, DBFile, AbsFile),
  516      size_file(AbsFile, Size)
  517    },
  518    [ db(Size, Ext, DB, AbsFile, Depth) ],
  519    scan_db_files(T, Dir, Prefix, Depth).
  520scan_db_files([D|T], Dir, Prefix, Depth) -->
  521    { directory_file_path(Prefix, D, SubD),
  522      directory_file_path(Dir, SubD, AbsD),
  523      exists_directory(AbsD),
  524      \+ read_link(AbsD, _, _),    % Do not follow links
  525      !,
  526      directory_files(AbsD, SubFiles),
  527      SubDepth is Depth + 1
  528    },
  529    scan_db_files(SubFiles, Dir, SubD, SubDepth),
  530    scan_db_files(T, Dir, Prefix, Depth).
  531scan_db_files([_|T], Dir, Prefix, Depth) -->
  532    scan_db_files(T, Dir, Prefix, Depth).
  533
  534nofollow(.).
  535nofollow(..).
  536
  537db_extension(trp).
  538db_extension(jrn).
  539
  540:- public load_source/4.                % called through make_goals/5
  541
  542load_source(DB, Silent, Nth, Total) :-
  543    db_file_name(DB, File),
  544    db_graph(DB, Graph),
  545    message_level(Silent, Level),
  546    graph_triple_count(Graph, Count0),
  547    statistics(cputime, T0),
  548    (   db_is_snapshot(DB)
  549    ->  print_message(Level, rdf(restore(Silent, snapshot(Graph, File)))),
  550        rdf_load_db(File)
  551    ;   print_message(Level, rdf(restore(Silent, journal(Graph, File)))),
  552        load_journal(File, Graph)
  553    ),
  554    statistics(cputime, T1),
  555    T is T1 - T0,
  556    graph_triple_count(Graph, Count1),
  557    Count is Count1 - Count0,
  558    print_message(Level, rdf(restore(Silent,
  559                                     done(Graph, T, Count, Nth, Total)))).
  560
  561
  562graph_triple_count(Graph, Count) :-
  563    rdf_statistics(triples_by_graph(Graph, Count)),
  564    !.
  565graph_triple_count(_, 0).
  566
  567
  568%!  attach_graph(+Graph, +Options) is det.
  569%
  570%   Load triples and reload  journal   from  the  indicated snapshot
  571%   file.
  572
  573attach_graph(Graph, Options) :-
  574    (   option(silent(true), Options)
  575    ->  Level = silent
  576    ;   Level = informational
  577    ),
  578    db_files(Graph, SnapshotFile, JournalFile),
  579    rdf_retractall(_,_,_,Graph),
  580    statistics(cputime, T0),
  581    print_message(Level, rdf(restore(Silent, Graph))),
  582    db_file(SnapshotFile, AbsSnapShot),
  583    (   exists_file(AbsSnapShot)
  584    ->  print_message(Level, rdf(restore(Silent, snapshot(SnapshotFile)))),
  585        rdf_load_db(AbsSnapShot)
  586    ;   true
  587    ),
  588    (   exists_db(JournalFile)
  589    ->  print_message(Level, rdf(restore(Silent, journal(JournalFile)))),
  590        load_journal(JournalFile, Graph)
  591    ;   true
  592    ),
  593    statistics(cputime, T1),
  594    T is T1 - T0,
  595    (   rdf_statistics(triples_by_graph(Graph, Count))
  596    ->  true
  597    ;   Count = 0
  598    ),
  599    print_message(Level, rdf(restore(Silent,
  600                                     done(Graph, T, Count)))).
  601
  602message_level(true, silent) :- !.
  603message_level(_, informational).
  604
  605
  606                 /*******************************
  607                 *         LOAD JOURNAL         *
  608                 *******************************/
  609
  610%!  load_journal(+File:atom, +DB:atom) is det.
  611%
  612%   Process transactions from the RDF journal File, adding the given
  613%   named graph.
  614
  615load_journal(File, DB) :-
  616    rdf_create_graph(DB),
  617    setup_call_cleanup(
  618        open(File, read, In, [encoding(utf8)]),
  619        ( read(In, T0),
  620          process_journal(T0, In, DB)
  621        ),
  622        close(In)).
  623
  624process_journal(end_of_file, _, _) :- !.
  625process_journal(Term, In, DB) :-
  626    (   process_journal_term(Term, DB)
  627    ->  true
  628    ;   throw(error(type_error(journal_term, Term), _))
  629    ),
  630    read(In, T2),
  631    process_journal(T2, In, DB).
  632
  633process_journal_term(assert(S,P,O), DB) :-
  634    rdf_assert(S,P,O,DB).
  635process_journal_term(assert(S,P,O,Line), DB) :-
  636    rdf_assert(S,P,O,DB:Line).
  637process_journal_term(retract(S,P,O), DB) :-
  638    rdf_retractall(S,P,O,DB).
  639process_journal_term(retract(S,P,O,Line), DB) :-
  640    rdf_retractall(S,P,O,DB:Line).
  641process_journal_term(update(S,P,O,Action), DB) :-
  642    (   rdf_update(S,P,O,DB, Action)
  643    ->  true
  644    ;   print_message(warning, rdf(update_failed(S,P,O,Action)))
  645    ).
  646process_journal_term(start(_), _).      % journal open/close
  647process_journal_term(end(_), _).
  648process_journal_term(begin(_), _).      % logged transaction (compatibility)
  649process_journal_term(end, _).
  650process_journal_term(begin(_,_,_,_), _). % logged transaction (current)
  651process_journal_term(end(_,_,_), _).
  652
  653
  654                 /*******************************
  655                 *         CREATE JOURNAL       *
  656                 *******************************/
  657
  658:- dynamic
  659    blocked_db/2,                   % DB, Reason
  660    transaction_message/3,          % Nesting, Time, Message
  661    transaction_db/3.               % Nesting, DB, Id
  662
  663%!  rdf_persistency(+DB, Bool)
  664%
  665%   Specify whether a database is persistent.  Switching to =false=
  666%   kills the persistent state.  Switching to =true= creates it.
  667
  668rdf_persistency(DB, Bool) :-
  669    must_be(atom, DB),
  670    must_be(boolean, Bool),
  671    fail.
  672rdf_persistency(DB, false) :-
  673    !,
  674    (   blocked_db(DB, persistency)
  675    ->  true
  676    ;   assert(blocked_db(DB, persistency)),
  677        delete_db(DB)
  678    ).
  679rdf_persistency(DB, true) :-
  680    (   retract(blocked_db(DB, persistency))
  681    ->  create_db(DB)
  682    ;   true
  683    ).
  684
  685%!  rdf_db:property_of_graph(?Property, +Graph) is nondet.
  686%
  687%   Extend rdf_graph_property/2 with new properties.
  688
  689:- multifile
  690    rdf_db:property_of_graph/2.  691
  692rdf_db:property_of_graph(persistent(State), Graph) :-
  693    (   blocked_db(Graph, persistency)
  694    ->  State = false
  695    ;   State = true
  696    ).
  697
  698
  699%!  start_monitor is det.
  700%!  stop_monitor is det.
  701%
  702%   Start/stop monitoring the RDF database   for  changes and update
  703%   the journal.
  704
  705start_monitor :-
  706    rdf_monitor(monitor,
  707                [ -assert(load)
  708                ]).
  709stop_monitor :-
  710    rdf_monitor(monitor,
  711                [ -all
  712                ]).
  713
  714%!  monitor(+Term) is semidet.
  715%
  716%   Handle an rdf_monitor/2 callback to  deal with persistency. Note
  717%   that the monitor calls that come   from rdf_db.pl that deal with
  718%   database changes are serialized.  They   do  come from different
  719%   threads though.
  720
  721monitor(Msg) :-
  722    debug(monitor, 'Monitor: ~p~n', [Msg]),
  723    fail.
  724monitor(assert(S,P,O,DB:Line)) :-
  725    !,
  726    \+ blocked_db(DB, _),
  727    journal_fd(DB, Fd),
  728    open_transaction(DB, Fd),
  729    format(Fd, '~q.~n', [assert(S,P,O,Line)]),
  730    sync_journal(DB, Fd).
  731monitor(assert(S,P,O,DB)) :-
  732    \+ blocked_db(DB, _),
  733    journal_fd(DB, Fd),
  734    open_transaction(DB, Fd),
  735    format(Fd, '~q.~n', [assert(S,P,O)]),
  736    sync_journal(DB, Fd).
  737monitor(retract(S,P,O,DB:Line)) :-
  738    !,
  739    \+ blocked_db(DB, _),
  740    journal_fd(DB, Fd),
  741    open_transaction(DB, Fd),
  742    format(Fd, '~q.~n', [retract(S,P,O,Line)]),
  743    sync_journal(DB, Fd).
  744monitor(retract(S,P,O,DB)) :-
  745    \+ blocked_db(DB, _),
  746    journal_fd(DB, Fd),
  747    open_transaction(DB, Fd),
  748    format(Fd, '~q.~n', [retract(S,P,O)]),
  749    sync_journal(DB, Fd).
  750monitor(update(S,P,O,DB:Line,Action)) :-
  751    !,
  752    \+ blocked_db(DB, _),
  753    (   Action = graph(NewDB)
  754    ->  monitor(assert(S,P,O,NewDB)),
  755        monitor(retract(S,P,O,DB:Line))
  756    ;   journal_fd(DB, Fd),
  757        format(Fd, '~q.~n', [update(S,P,O,Action)]),
  758        sync_journal(DB, Fd)
  759    ).
  760monitor(update(S,P,O,DB,Action)) :-
  761    \+ blocked_db(DB, _),
  762    (   Action = graph(NewDB)
  763    ->  monitor(assert(S,P,O,NewDB)),
  764        monitor(retract(S,P,O,DB))
  765    ;   journal_fd(DB, Fd),
  766        open_transaction(DB, Fd),
  767        format(Fd, '~q.~n', [update(S,P,O,Action)]),
  768        sync_journal(DB, Fd)
  769    ).
  770monitor(load(BE, _DumpFileURI)) :-
  771    (   BE = end(Graphs)
  772    ->  sync_loaded_graphs(Graphs)
  773    ;   true
  774    ).
  775monitor(create_graph(Graph)) :-
  776    \+ blocked_db(Graph, _),
  777    journal_fd(Graph, Fd),
  778    open_transaction(Graph, Fd),
  779    sync_journal(Graph, Fd).
  780monitor(reset) :-
  781    forall(rdf_graph(Graph), delete_db(Graph)).
  782                                        % TBD: Remove empty directories?
  783
  784monitor(transaction(BE, Id)) :-
  785    monitor_transaction(Id, BE).
  786
  787monitor_transaction(load_journal(DB), begin(_)) :-
  788    !,
  789    assert(blocked_db(DB, journal)).
  790monitor_transaction(load_journal(DB), end(_)) :-
  791    !,
  792    retractall(blocked_db(DB, journal)).
  793
  794monitor_transaction(parse(URI), begin(_)) :-
  795    !,
  796    (   blocked_db(URI, persistency)
  797    ->  true
  798    ;   assert(blocked_db(URI, parse))
  799    ).
  800monitor_transaction(parse(URI), end(_)) :-
  801    !,
  802    (   retract(blocked_db(URI, parse))
  803    ->  create_db(URI)
  804    ;   true
  805    ).
  806monitor_transaction(unload(DB), begin(_)) :-
  807    !,
  808    (   blocked_db(DB, persistency)
  809    ->  true
  810    ;   assert(blocked_db(DB, unload))
  811    ).
  812monitor_transaction(unload(DB), end(_)) :-
  813    !,
  814    (   retract(blocked_db(DB, unload))
  815    ->  delete_db(DB)
  816    ;   true
  817    ).
  818monitor_transaction(log(Msg), begin(N)) :-
  819    !,
  820    check_nested(N),
  821    get_time(Time),
  822    asserta(transaction_message(N, Time, Msg)).
  823monitor_transaction(log(_), end(N)) :-
  824    check_nested(N),
  825    retract(transaction_message(N, _, _)),
  826    !,
  827    findall(DB:Id, retract(transaction_db(N, DB, Id)), DBs),
  828    end_transactions(DBs, N).
  829monitor_transaction(log(Msg, DB), begin(N)) :-
  830    !,
  831    check_nested(N),
  832    get_time(Time),
  833    asserta(transaction_message(N, Time, Msg)),
  834    journal_fd(DB, Fd),
  835    open_transaction(DB, Fd).
  836monitor_transaction(log(Msg, _DB), end(N)) :-
  837    monitor_transaction(log(Msg), end(N)).
  838
  839
  840%!  check_nested(+Level) is semidet.
  841%
  842%   True if we must log this transaction.   This  is always the case
  843%   for toplevel transactions. Nested transactions   are only logged
  844%   if log_nested_transactions(true) is defined.
  845
  846check_nested(0) :- !.
  847check_nested(_) :-
  848    rdf_option(log_nested_transactions(true)).
  849
  850
  851%!  open_transaction(+DB, +Fd) is det.
  852%
  853%   Add a begin(Id, Level, Time,  Message)   term  if  a transaction
  854%   involves DB. Id is an incremental   integer, where each database
  855%   has its own counter. Level is the nesting level, Time a floating
  856%   point timestamp and Message te message   provided as argument to
  857%   the log message.
  858
  859open_transaction(DB, Fd) :-
  860    transaction_message(N, Time, Msg),
  861    !,
  862    (   transaction_db(N, DB, _)
  863    ->  true
  864    ;   next_transaction_id(DB, Id),
  865        assert(transaction_db(N, DB, Id)),
  866        RoundedTime is round(Time*100)/100,
  867        format(Fd, '~q.~n', [begin(Id, N, RoundedTime, Msg)])
  868    ).
  869open_transaction(_,_).
  870
  871
  872%!  next_transaction_id(+DB, -Id) is det.
  873%
  874%   Id is the number to user for  the next logged transaction on DB.
  875%   Transactions in each  named  graph   are  numbered  in sequence.
  876%   Searching the Id of the last transaction is performed by the 2nd
  877%   clause starting 1Kb from the end   and doubling this offset each
  878%   failure.
  879
  880:- dynamic
  881    current_transaction_id/2.  882
  883next_transaction_id(DB, Id) :-
  884    retract(current_transaction_id(DB, Last)),
  885    !,
  886    Id is Last + 1,
  887    assert(current_transaction_id(DB, Id)).
  888next_transaction_id(DB, Id) :-
  889    db_files(DB, _, Journal),
  890    exists_file(Journal),
  891    !,
  892    size_file(Journal, Size),
  893    open_db(Journal, read, In, []),
  894    call_cleanup(iterative_expand(In, Size, Last), close(In)),
  895    Id is Last + 1,
  896    assert(current_transaction_id(DB, Id)).
  897next_transaction_id(DB, 1) :-
  898    assert(current_transaction_id(DB, 1)).
  899
  900iterative_expand(_, 0, 0) :- !.
  901iterative_expand(In, Size, Last) :-     % Scan growing sections from the end
  902    Max is floor(log(Size)/log(2)),
  903    between(10, Max, Step),
  904    Offset is -(1<<Step),
  905    seek(In, Offset, eof, _),
  906    skip(In, 10),                   % records are line-based
  907    read(In, T0),
  908    last_transaction_id(T0, In, 0, Last),
  909    Last > 0,
  910    !.
  911iterative_expand(In, _, Last) :-        % Scan the whole file
  912    seek(In, 0, bof, _),
  913    read(In, T0),
  914    last_transaction_id(T0, In, 0, Last).
  915
  916last_transaction_id(end_of_file, _, Last, Last) :- !.
  917last_transaction_id(end(Id, _, _), In, _, Last) :-
  918    read(In, T1),
  919    last_transaction_id(T1, In, Id, Last).
  920last_transaction_id(_, In, Id, Last) :-
  921    read(In, T1),
  922    last_transaction_id(T1, In, Id, Last).
  923
  924
  925%!  end_transactions(+DBs:list(atom:id)) is det.
  926%
  927%   End a transaction that affected the  given list of databases. We
  928%   write the list of other affected databases as an argument to the
  929%   end-term to facilitate fast finding of the related transactions.
  930%
  931%   In each database, the transaction is   ended with a term end(Id,
  932%   Nesting, Others), where  Id  and   Nesting  are  the transaction
  933%   identifier and nesting (see open_transaction/2)  and Others is a
  934%   list of DB:Id,  indicating  other   databases  affected  by  the
  935%   transaction.
  936
  937end_transactions(DBs, N) :-
  938    end_transactions(DBs, DBs, N).
  939
  940end_transactions([], _, _).
  941end_transactions([DB:Id|T], DBs, N) :-
  942    journal_fd(DB, Fd),
  943    once(select(DB:Id, DBs, Others)),
  944    format(Fd, 'end(~q, ~q, ~q).~n', [Id, N, Others]),
  945    sync_journal(DB, Fd),
  946    end_transactions(T, DBs, N).
  947
  948
  949%!  sync_loaded_graphs(+Graphs)
  950%
  951%   Called after a binary triple has been loaded that added triples
  952%   to the given graphs.
  953
  954sync_loaded_graphs(Graphs) :-
  955    maplist(create_db, Graphs).
  956
  957
  958                 /*******************************
  959                 *         JOURNAL FILES        *
  960                 *******************************/
  961
  962%!  journal_fd(+DB, -Stream) is det.
  963%
  964%   Get an open stream to a journal. If the journal is not open, old
  965%   journals are closed to satisfy   the =max_open_journals= option.
  966%   Then the journal is opened in   =append= mode. Journal files are
  967%   always encoded as UTF-8 for  portability   as  well as to ensure
  968%   full coverage of Unicode.
  969
  970journal_fd(DB, Fd) :-
  971    source_journal_fd(DB, Fd),
  972    !.
  973journal_fd(DB, Fd) :-
  974    with_mutex(rdf_journal_file,
  975               journal_fd_(DB, Out)),
  976    Fd = Out.
  977
  978journal_fd_(DB, Fd) :-
  979    source_journal_fd(DB, Fd),
  980    !.
  981journal_fd_(DB, Fd) :-
  982    limit_fd_pool,
  983    db_files(DB, _Snapshot, Journal),
  984    open_db(Journal, append, Fd,
  985            [ close_on_abort(false)
  986            ]),
  987    time_stamp(Now),
  988    format(Fd, '~q.~n', [start([time(Now)])]),
  989    assert(source_journal_fd(DB, Fd)).              % new one at the end
  990
  991%!  limit_fd_pool is det.
  992%
  993%   Limit the number of  open   journals  to max_open_journals (10).
  994%   Note that calls  from  rdf_monitor/2   are  issued  in different
  995%   threads, but as they are part of write operations they are fully
  996%   synchronised.
  997
  998limit_fd_pool :-
  999    predicate_property(source_journal_fd(_, _), number_of_clauses(N)),
 1000    !,
 1001    (   rdf_option(max_open_journals(Max))
 1002    ->  true
 1003    ;   Max = 10
 1004    ),
 1005    Close is N - Max,
 1006    forall(between(1, Close, _),
 1007           close_oldest_journal).
 1008limit_fd_pool.
 1009
 1010close_oldest_journal :-
 1011    source_journal_fd(DB, _Fd),
 1012    !,
 1013    debug(rdf_persistency, 'Closing old journal for ~q', [DB]),
 1014    close_journal(DB).
 1015close_oldest_journal.
 1016
 1017
 1018%!  sync_journal(+DB, +Fd)
 1019%
 1020%   Sync journal represented by database and   stream.  If the DB is
 1021%   involved in a transaction there is   no point flushing until the
 1022%   end of the transaction.
 1023
 1024sync_journal(DB, _) :-
 1025    transaction_db(_, DB, _),
 1026    !.
 1027sync_journal(_, Fd) :-
 1028    flush_output(Fd).
 1029
 1030%!  close_journal(+DB) is det.
 1031%
 1032%   Close the journal associated with DB if it is open.
 1033
 1034close_journal(DB) :-
 1035    with_mutex(rdf_journal_file,
 1036               close_journal_(DB)).
 1037
 1038close_journal_(DB) :-
 1039    (   retract(source_journal_fd(DB, Fd))
 1040    ->  time_stamp(Now),
 1041        format(Fd, '~q.~n', [end([time(Now)])]),
 1042        close(Fd, [force(true)])
 1043    ;   true
 1044    ).
 1045
 1046%!  close_journals
 1047%
 1048%   Close all open journals.
 1049
 1050close_journals :-
 1051    forall(source_journal_fd(DB, _),
 1052           catch(close_journal(DB), E,
 1053                 print_message(error, E))).
 1054
 1055%!  create_db(+Graph)
 1056%
 1057%   Create a saved version of Graph in corresponding file, close and
 1058%   delete journals.
 1059
 1060create_db(Graph) :-
 1061    \+ rdf(_,_,_,Graph),
 1062    !,
 1063    debug(rdf_persistency, 'Deleting empty Graph ~w', [Graph]),
 1064    delete_db(Graph).
 1065create_db(Graph) :-
 1066    debug(rdf_persistency, 'Saving Graph ~w', [Graph]),
 1067    close_journal(Graph),
 1068    db_abs_files(Graph, Snapshot, Journal),
 1069    atom_concat(Snapshot, '.new', NewSnapshot),
 1070    (   catch(( create_directory_levels(Snapshot),
 1071                rdf_save_db(NewSnapshot, Graph)
 1072              ), Error,
 1073              ( print_message(warning, Error),
 1074                fail
 1075              ))
 1076    ->  (   exists_file(Journal)
 1077        ->  delete_file(Journal)
 1078        ;   true
 1079        ),
 1080        rename_file(NewSnapshot, Snapshot),
 1081        debug(rdf_persistency, 'Saved Graph ~w', [Graph])
 1082    ;   catch(delete_file(NewSnapshot), _, true)
 1083    ).
 1084
 1085
 1086%!  delete_db(+DB)
 1087%
 1088%   Remove snapshot and journal file for DB.
 1089
 1090delete_db(DB) :-
 1091    with_mutex(rdf_journal_file,
 1092               delete_db_(DB)).
 1093
 1094delete_db_(DB) :-
 1095    close_journal_(DB),
 1096    db_abs_files(DB, Snapshot, Journal),
 1097    !,
 1098    (   exists_file(Journal)
 1099    ->  delete_file(Journal)
 1100    ;   true
 1101    ),
 1102    (   exists_file(Snapshot)
 1103    ->  delete_file(Snapshot)
 1104    ;   true
 1105    ).
 1106delete_db_(_).
 1107
 1108                 /*******************************
 1109                 *             LOCKING          *
 1110                 *******************************/
 1111
 1112%!  lock_db(+Dir)
 1113%
 1114%   Lock the database directory Dir.
 1115
 1116lock_db(Dir) :-
 1117    lockfile(Dir, File),
 1118    catch(open(File, update, Out, [lock(write), wait(false)]),
 1119          error(permission_error(Access, _, _), _),
 1120          locked_error(Access, Dir)),
 1121    (   current_prolog_flag(pid, PID)
 1122    ->  true
 1123    ;   PID = 0                     % TBD: Fix in Prolog
 1124    ),
 1125    time_stamp(Now),
 1126    gethostname(Host),
 1127    format(Out, '/* RDF Database is in use */~n~n', []),
 1128    format(Out, '~q.~n', [ locked([ time(Now),
 1129                                    pid(PID),
 1130                                    host(Host)
 1131                                  ])
 1132                         ]),
 1133    flush_output(Out),
 1134    set_end_of_stream(Out),
 1135    assert(rdf_lock(Dir, lock(Out, File))),
 1136    at_halt(unlock_db(Dir)).
 1137
 1138locked_error(lock, Dir) :-
 1139    lockfile(Dir, File),
 1140    (   catch(read_file_to_terms(File, Terms, []), _, fail),
 1141        Terms = [locked(Args)]
 1142    ->  Context = rdf_locked(Args)
 1143    ;   Context = context(_, 'Database is in use')
 1144    ),
 1145    throw(error(permission_error(lock, rdf_db, Dir), Context)).
 1146locked_error(open, Dir) :-
 1147    throw(error(permission_error(lock, rdf_db, Dir),
 1148                context(_, 'Lock file cannot be opened'))).
 1149
 1150%!  unlock_db(+Dir) is det.
 1151%!  unlock_db(+Stream, +File) is det.
 1152
 1153unlock_db(Dir) :-
 1154    retract(rdf_lock(Dir, lock(Out, File))),
 1155    !,
 1156    unlock_db(Out, File).
 1157unlock_db(_).
 1158
 1159unlock_db(Out, File) :-
 1160    close(Out),
 1161    delete_file(File).
 1162
 1163                 /*******************************
 1164                 *           FILENAMES          *
 1165                 *******************************/
 1166
 1167lockfile(Dir, LockFile) :-
 1168    atomic_list_concat([Dir, /, lock], LockFile).
 1169
 1170directory_levels(Levels) :-
 1171    rdf_option(directory_levels(Levels)),
 1172    !.
 1173directory_levels(2).
 1174
 1175db_file(Base, File) :-
 1176    rdf_directory(Dir),
 1177    directory_levels(Levels),
 1178    db_file(Dir, Base, Levels, File).
 1179
 1180db_file(Dir, Base, Levels, File) :-
 1181    dir_levels(Base, Levels, Segments, [Base]),
 1182    atomic_list_concat([Dir|Segments], /, File).
 1183
 1184open_db(Base, Mode, Stream, Options) :-
 1185    db_file(Base, File),
 1186    create_directory_levels(File),
 1187    open(File, Mode, Stream, [encoding(utf8)|Options]).
 1188
 1189create_directory_levels(_File) :-
 1190    rdf_option(directory_levels(0)),
 1191    !.
 1192create_directory_levels(File) :-
 1193    file_directory_name(File, Dir),
 1194    make_directory_path(Dir).
 1195
 1196exists_db(Base) :-
 1197    db_file(Base, File),
 1198    exists_file(File).
 1199
 1200%!  dir_levels(+File, +Levels, ?Segments, ?Tail) is det.
 1201%
 1202%   Create a list of intermediate directory names for File.  Each
 1203%   directory consists of two hexadecimal digits.
 1204
 1205dir_levels(_, 0, Segments, Segments) :- !.
 1206dir_levels(File, Levels, Segments, Tail) :-
 1207    rdf_atom_md5(File, 1, Hash),
 1208    create_dir_levels(Levels, 0, Hash, Segments, Tail).
 1209
 1210create_dir_levels(0, _, _, Segments, Segments) :- !.
 1211create_dir_levels(N, S, Hash, [S1|Segments0], Tail) :-
 1212    sub_atom(Hash, S, 2, _, S1),
 1213    S2 is S+2,
 1214    N2 is N-1,
 1215    create_dir_levels(N2, S2, Hash, Segments0, Tail).
 1216
 1217
 1218%!  db_files(+DB, -Snapshot, -Journal).
 1219%!  db_files(-DB, +Snapshot, -Journal).
 1220%!  db_files(-DB, -Snapshot, +Journal).
 1221%
 1222%   True if named graph DB is represented  by the files Snapshot and
 1223%   Journal. The filenames are local   to the directory representing
 1224%   the store.
 1225
 1226db_files(DB, Snapshot, Journal) :-
 1227    nonvar(DB),
 1228    !,
 1229    rdf_db_to_file(DB, Base),
 1230    atom_concat(Base, '.trp', Snapshot),
 1231    atom_concat(Base, '.jrn', Journal).
 1232db_files(DB, Snapshot, Journal) :-
 1233    nonvar(Snapshot),
 1234    !,
 1235    atom_concat(Base, '.trp', Snapshot),
 1236    atom_concat(Base, '.jrn', Journal),
 1237    rdf_db_to_file(DB, Base).
 1238db_files(DB, Snapshot, Journal) :-
 1239    nonvar(Journal),
 1240    !,
 1241    atom_concat(Base, '.jrn', Journal),
 1242    atom_concat(Base, '.trp', Snapshot),
 1243    rdf_db_to_file(DB, Base).
 1244
 1245db_abs_files(DB, Snapshot, Journal) :-
 1246    db_files(DB, Snapshot0, Journal0),
 1247    db_file(Snapshot0, Snapshot),
 1248    db_file(Journal0, Journal).
 1249
 1250
 1251%!  rdf_journal_file(+Graph, -File) is semidet.
 1252%!  rdf_journal_file(-Graph, -File) is nondet.
 1253%
 1254%   True if File the name of the existing journal file for Graph.
 1255
 1256rdf_journal_file(Graph, Journal) :-
 1257    (   var(Graph)
 1258    ->  rdf_graph(Graph)
 1259    ;   true
 1260    ),
 1261    db_abs_files(Graph, _Snapshot, Journal),
 1262    exists_file(Journal).
 1263
 1264
 1265%!  rdf_snapshot_file(+Graph, -File) is semidet.
 1266%!  rdf_snapshot_file(-Graph, -File) is nondet.
 1267%
 1268%   True if File the name of the existing snapshot file for Graph.
 1269
 1270rdf_snapshot_file(Graph, Snapshot) :-
 1271    (   var(Graph)
 1272    ->  rdf_graph(Graph)    % also pick the empty graphs
 1273    ;   true
 1274    ),
 1275    db_abs_files(Graph, Snapshot, _Journal),
 1276    exists_file(Snapshot).
 1277
 1278
 1279%!  rdf_db_to_file(+DB, -File) is det.
 1280%!  rdf_db_to_file(-DB, +File) is det.
 1281%
 1282%   Translate between database encoding (often an   file or URL) and
 1283%   the name we store in the  directory.   We  keep  a cache for two
 1284%   reasons. Speed, but much more important   is that the mapping of
 1285%   raw --> encoded provided by  www_form_encode/2 is not guaranteed
 1286%   to be unique by the W3C standards.
 1287
 1288rdf_db_to_file(DB, File) :-
 1289    file_base_db(File, DB),
 1290    !.
 1291rdf_db_to_file(DB, File) :-
 1292    url_to_filename(DB, File),
 1293    assert(file_base_db(File, DB)).
 1294
 1295%!  url_to_filename(+URL, -FileName) is det.
 1296%!  url_to_filename(-URL, +FileName) is det.
 1297%
 1298%   Turn  a  valid  URL  into  a  filename.  Earlier  versions  used
 1299%   www_form_encode/2, but this can produce  characters that are not
 1300%   valid  in  filenames.  We  will  use    the   same  encoding  as
 1301%   www_form_encode/2,  but  using  our  own    rules   for  allowed
 1302%   characters. The only requirement is that   we avoid any filename
 1303%   special character in use.  The   current  encoding  use US-ASCII
 1304%   alnum characters, _ and %
 1305
 1306url_to_filename(URL, FileName) :-
 1307    atomic(URL),
 1308    !,
 1309    atom_codes(URL, Codes),
 1310    phrase(url_encode(EncCodes), Codes),
 1311    atom_codes(FileName, EncCodes).
 1312url_to_filename(URL, FileName) :-
 1313    uri_encoded(path, URL, FileName).
 1314
 1315url_encode([0'+|T]) -->
 1316    " ",
 1317    !,
 1318    url_encode(T).
 1319url_encode([C|T]) -->
 1320    alphanum(C),
 1321    !,
 1322    url_encode(T).
 1323url_encode([C|T]) -->
 1324    no_enc_extra(C),
 1325    !,
 1326    url_encode(T).
 1327url_encode(Enc) -->
 1328    (   "\r\n"
 1329    ;   "\n"
 1330    ),
 1331    !,
 1332    { string_codes("%0D%0A", Codes),
 1333      append(Codes, T, Enc)
 1334    },
 1335    url_encode(T).
 1336url_encode([]) -->
 1337    eos,
 1338    !.
 1339url_encode([0'%,D1,D2|T]) -->
 1340    [C],
 1341    { Dv1 is (C>>4 /\ 0xf),
 1342      Dv2 is (C /\ 0xf),
 1343      code_type(D1, xdigit(Dv1)),
 1344      code_type(D2, xdigit(Dv2))
 1345    },
 1346    url_encode(T).
 1347
 1348eos([], []).
 1349
 1350alphanum(C) -->
 1351    [C],
 1352    { C < 128,                      % US-ASCII
 1353      code_type(C, alnum)
 1354    }.
 1355
 1356no_enc_extra(0'_) --> "_".
 1357
 1358
 1359                 /*******************************
 1360                 *             REINDEX          *
 1361                 *******************************/
 1362
 1363%!  reindex_db(+Dir, +Levels)
 1364%
 1365%   Reindex the database by creating intermediate directories.
 1366
 1367reindex_db(Dir, Levels) :-
 1368    directory_files(Dir, Files),
 1369    reindex_files(Files, Dir, '.', 0, Levels),
 1370    remove_empty_directories(Files, Dir).
 1371
 1372reindex_files([], _, _, _, _).
 1373reindex_files([Nofollow|Files], Dir, Prefix, CLevel, Levels) :-
 1374    nofollow(Nofollow),
 1375    !,
 1376    reindex_files(Files, Dir, Prefix, CLevel, Levels).
 1377reindex_files([File|Files], Dir, Prefix, CLevel, Levels) :-
 1378    CLevel \== Levels,
 1379    file_name_extension(_Base, Ext, File),
 1380    db_extension(Ext),
 1381    !,
 1382    directory_file_path(Prefix, File, DBFile),
 1383    directory_file_path(Dir, DBFile, OldPath),
 1384    db_file(Dir, File, Levels, NewPath),
 1385    debug(rdf_persistency, 'Rename ~q --> ~q', [OldPath, NewPath]),
 1386    file_directory_name(NewPath, NewDir),
 1387    make_directory_path(NewDir),
 1388    rename_file(OldPath, NewPath),
 1389    reindex_files(Files, Dir, Prefix, CLevel, Levels).
 1390reindex_files([D|Files], Dir, Prefix, CLevel, Levels) :-
 1391    directory_file_path(Prefix, D, SubD),
 1392    directory_file_path(Dir, SubD, AbsD),
 1393    exists_directory(AbsD),
 1394    \+ read_link(AbsD, _, _),      % Do not follow links
 1395    !,
 1396    directory_files(AbsD, SubFiles),
 1397    CLevel2 is CLevel + 1,
 1398    reindex_files(SubFiles, Dir, SubD, CLevel2, Levels),
 1399    reindex_files(Files, Dir, Prefix, CLevel, Levels).
 1400reindex_files([_|Files], Dir, Prefix, CLevel, Levels) :-
 1401    reindex_files(Files, Dir, Prefix, CLevel, Levels).
 1402
 1403
 1404remove_empty_directories([], _).
 1405remove_empty_directories([File|Files], Dir) :-
 1406    \+ nofollow(File),
 1407    directory_file_path(Dir, File, Path),
 1408    exists_directory(Path),
 1409    \+ read_link(Path, _, _),
 1410    !,
 1411    directory_files(Path, Content),
 1412    exclude(nofollow, Content, RealContent),
 1413    (   RealContent == []
 1414    ->  debug(rdf_persistency, 'Remove empty dir ~q', [Path]),
 1415        delete_directory(Path)
 1416    ;   remove_empty_directories(RealContent, Path)
 1417    ),
 1418    remove_empty_directories(Files, Dir).
 1419remove_empty_directories([_|Files], Dir) :-
 1420    remove_empty_directories(Files, Dir).
 1421
 1422
 1423                 /*******************************
 1424                 *            PREFIXES          *
 1425                 *******************************/
 1426
 1427save_prefixes(Dir) :-
 1428    atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile),
 1429    setup_call_cleanup(open(PrefixFile, write, Out, [encoding(utf8)]),
 1430                       write_prefixes(Out),
 1431                       close(Out)).
 1432
 1433write_prefixes(Out) :-
 1434    format(Out, '% Snapshot of defined RDF prefixes~n~n', []),
 1435    forall(rdf_current_ns(Alias, URI),
 1436           format(Out, 'prefix(~q, ~q).~n', [Alias, URI])).
 1437
 1438%!  load_prefixes(+RDFDBDir) is det.
 1439%
 1440%   If the file RDFDBDir/prefixes.db exists,  load the prefixes. The
 1441%   prefixes are registered using rdf_register_ns/3. Possible errors
 1442%   because the prefix  definitions  have   changed  are  printed as
 1443%   warnings, retaining the  old  definition.   Note  that  changing
 1444%   prefixes generally requires reloading all RDF from the source.
 1445
 1446load_prefixes(Dir) :-
 1447    atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile),
 1448    (   exists_file(PrefixFile)
 1449    ->  setup_call_cleanup(open(PrefixFile, read, In, [encoding(utf8)]),
 1450                           read_prefixes(In),
 1451                           close(In))
 1452    ;   true
 1453    ).
 1454
 1455read_prefixes(Stream) :-
 1456    read_term(Stream, T0, []),
 1457    read_prefixes(T0, Stream).
 1458
 1459read_prefixes(end_of_file, _) :- !.
 1460read_prefixes(prefix(Alias, URI), Stream) :-
 1461    !,
 1462    must_be(atom, Alias),
 1463    must_be(atom, URI),
 1464    catch(rdf_register_ns(Alias, URI, []), E,
 1465          print_message(warning, E)),
 1466    read_term(Stream, T, []),
 1467    read_prefixes(T, Stream).
 1468read_prefixes(Term, _) :-
 1469    domain_error(prefix_term, Term).
 1470
 1471
 1472                 /*******************************
 1473                 *              UTIL            *
 1474                 *******************************/
 1475
 1476%!  mkdir(+Directory)
 1477%
 1478%   Create a directory if it does not already exist.
 1479
 1480mkdir(Directory) :-
 1481    exists_directory(Directory),
 1482    !.
 1483mkdir(Directory) :-
 1484    make_directory(Directory).
 1485
 1486%!  time_stamp(-Integer)
 1487%
 1488%   Return time-stamp rounded to integer.
 1489
 1490time_stamp(Int) :-
 1491    get_time(Now),
 1492    Int is round(Now).
 1493
 1494
 1495                 /*******************************
 1496                 *            MESSAGES          *
 1497                 *******************************/
 1498
 1499:- multifile
 1500    prolog:message/3,
 1501    prolog:message_context/3. 1502
 1503prolog:message(rdf(Term)) -->
 1504    message(Term).
 1505
 1506message(restoring(Type, Count, Jobs)) -->
 1507    [ 'Restoring ~D ~w using ~D concurrent workers'-[Count, Type, Jobs] ].
 1508message(restore(attached(Graphs, Triples, Time/Wall))) -->
 1509    { catch(Percent is round(100*Time/Wall), _, Percent = 0) },
 1510    [ 'Loaded ~D graphs (~D triples) in ~2f sec. (~d% CPU = ~2f sec.)'-
 1511      [Graphs, Triples, Wall, Percent, Time] ].
 1512% attach_graph/2
 1513message(restore(true, Action)) -->
 1514    !,
 1515    silent_message(Action).
 1516message(restore(brief, Action)) -->
 1517    !,
 1518    brief_message(Action).
 1519message(restore(_, Graph)) -->
 1520    [ 'Restoring ~p ... '-[Graph], flush ].
 1521message(restore(_, snapshot(_))) -->
 1522    [ at_same_line, '(snapshot) '-[], flush ].
 1523message(restore(_, journal(_))) -->
 1524    [ at_same_line, '(journal) '-[], flush ].
 1525message(restore(_, done(_, Time, Count))) -->
 1526    [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ].
 1527% load_source/4
 1528message(restore(_, snapshot(G, _))) -->
 1529    [ 'Restoring ~p\t(snapshot)'-[G], flush ].
 1530message(restore(_, journal(G, _))) -->
 1531    [ 'Restoring ~p\t(journal)'-[G], flush ].
 1532message(restore(_, done(_, Time, Count))) -->
 1533    [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ].
 1534% journal handling
 1535message(update_failed(S,P,O,Action)) -->
 1536    [ 'Failed to update <~p ~p ~p> with ~p'-[S,P,O,Action] ].
 1537% directory reindexing
 1538message(reindex(Count, Depth)) -->
 1539    [ 'Restructuring database with ~d levels (~D graphs)'-[Depth, Count] ].
 1540message(reindex(Depth)) -->
 1541    [ 'Fixing database directory structure (~d levels)'-[Depth] ].
 1542message(read_only) -->
 1543    [ 'Cannot write persistent store; continuing in read-only mode.', nl,
 1544      'All changes to the RDF store will be lost if this process terminates.'
 1545    ].
 1546
 1547silent_message(_Action) --> [].
 1548
 1549brief_message(done(Graph, _Time, _Count, Nth, Total)) -->
 1550    { file_base_name(Graph, Base) },
 1551    [ at_same_line,
 1552      '\r~p~`.t ~D of ~D graphs~72|'-[Base, Nth, Total],
 1553      flush
 1554    ].
 1555brief_message(_) --> [].
 1556
 1557
 1558prolog:message_context(rdf_locked(Args)) -->
 1559    { memberchk(time(Time), Args),
 1560      memberchk(pid(Pid), Args),
 1561      format_time(string(S), '%+', Time)
 1562    },
 1563    [ nl,
 1564      'locked at ~s by process id ~w'-[S,Pid]
 1565    ]