View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2006-2015, University of Amsterdam
    7                              VU University Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(rdf_persistency,
   37          [ rdf_attach_db/2,            % +Directory, +Options
   38            rdf_detach_db/0,            % +Detach current Graph
   39            rdf_current_db/1,           % -Directory
   40            rdf_persistency/2,          % +Graph, +Bool
   41            rdf_flush_journals/1,       % +Options
   42            rdf_persistency_property/1, % ?Property
   43            rdf_journal_file/2,         % ?Graph, ?JournalFile
   44            rdf_snapshot_file/2,        % ?Graph, ?SnapshotFile
   45            rdf_db_to_file/2            % ?Graph, ?FileBase
   46          ]).   47:- use_module(library(semweb/rdf_db)).   48:- use_module(library(filesex)).   49:- use_module(library(lists)).   50:- use_module(library(uri)).   51:- use_module(library(debug)).   52:- use_module(library(option)).   53:- use_module(library(error)).   54:- use_module(library(thread)).   55:- use_module(library(apply)).

RDF persistency plugin

This module provides persistency for rdf_db.pl based on the rdf_monitor/2 predicate to track changes to the repository. Where previous versions used autosafe of the whole database using the quick-load format of rdf_db, this version is based on a quick-load file per source (4th argument of rdf/4), and journalling for edit operations.

The result is safe, avoids frequent small changes to large files which makes synchronisation and backup expensive and avoids long disruption of the server doing the autosafe. Only loading large files disrupts service for some time.

The persistent backup of the database is realised in a directory, using a lock file to avoid corruption due to concurrent access. Each source is represented by two files, the latest snapshot and a journal. The state is restored by loading the snapshot and replaying the journal. The predicate rdf_flush_journals/1 can be used to create fresh snapshots and delete the journals.

See also
- rdf_edit.pl */
To be done
- If there is a complete `.new' snapshot and no journal, we should move the .new to the plain snapshot name as a means of recovery.
- Backup of each graph using one or two files is very costly if there are many graphs. Although the currently used subdirectories avoid hitting OS limits early, this is still not ideal. Probably we should collect (small, older?) files and combine them into a single quick load file. We could call this (similar to GIT) a `pack'.
   90:- volatile
   91    rdf_directory/1,
   92    rdf_lock/2,
   93    rdf_option/1,
   94    source_journal_fd/2,
   95    file_base_db/2.   96:- dynamic
   97    rdf_directory/1,                % Absolute path
   98    rdf_lock/2,                     % Dir, Lock
   99    rdf_option/1,                   % Defined options
  100    source_journal_fd/2,            % DB, JournalFD
  101    file_base_db/2.                 % FileBase, DB
  102
  103:- meta_predicate
  104    no_agc(0).  105
  106:- predicate_options(rdf_attach_db/2, 2,
  107                     [ access(oneof([read_write,read_only])),
  108                       concurrency(positive_integer),
  109                       max_open_journals(positive_integer),
  110                       silent(oneof([true,false,brief])),
  111                       log_nested_transactions(boolean)
  112                     ]).
 rdf_attach_db(+Directory, +Options) is det
Start persistent operations using Directory as place to store files. There are several cases:

Options:

access(+AccessMode)
One of auto (default), read_write or read_only. Read-only access implies that the RDF store is not locked. It is read at startup and all modifications to the data are temporary. The default auto mode is read_write if the directory is writeable and the lock can be acquired. Otherwise it reverts to read_only.
concurrency(+Jobs)
Number of threads to use for loading the initial database. If not provided it is the number of CPUs as optained from the flag cpu_count.
max_open_journals(+Count)
Maximum number of journals kept open. If not provided, the default is 10. See limit_fd_pool/0.
directory_levels(+Count)
Number of levels of intermediate directories for storing the graph files. Default is 2.
silent(+BoolOrBrief)
If true (default false), do not print informational messages. Finally, if brief it will show minimal feedback.
log_nested_transactions(+Boolean)
If true, nested log transactions are added to the journal information. By default (false), no log-term is added for nested transactions.\\
Errors
- existence_error(source_sink, Directory)
- permission_error(write, directory, Directory)
  162rdf_attach_db(DirSpec, Options) :-
  163    option(access(read_only), Options),
  164    !,
  165    absolute_file_name(DirSpec,
  166                       Directory,
  167                       [ access(read),
  168                         file_type(directory)
  169                       ]),
  170    rdf_attach_db_ro(Directory, Options).
  171rdf_attach_db(DirSpec, Options) :-
  172    option(access(read_write), Options),
  173    !,
  174    rdf_attach_db_rw(DirSpec, Options).
  175rdf_attach_db(DirSpec, Options) :-
  176    absolute_file_name(DirSpec,
  177                       Directory,
  178                       [ access(exist),
  179                         file_type(directory),
  180                         file_errors(fail)
  181                       ]),
  182    !,
  183    (   access_file(Directory, write)
  184    ->  catch(rdf_attach_db_rw(Directory, Options), E, true),
  185        (   var(E)
  186        ->  true
  187        ;   E = error(permission_error(lock, rdf_db, _), _)
  188        ->  print_message(warning, E),
  189            print_message(warning, rdf(read_only)),
  190            rdf_attach_db(DirSpec, [access(read_only)|Options])
  191        ;   throw(E)
  192        )
  193    ;   print_message(warning,
  194                      error(permission_error(write, directory, Directory))),
  195        print_message(warning, rdf(read_only)),
  196        rdf_attach_db_ro(Directory, Options)
  197    ).
  198rdf_attach_db(DirSpec, Options) :-
  199    catch(rdf_attach_db_rw(DirSpec, Options), E, true),
  200    (   var(E)
  201    ->  true
  202    ;   print_message(warning, E),
  203        print_message(warning, rdf(read_only)),
  204        rdf_attach_db(DirSpec, [access(read_only)|Options])
  205    ).
  206
  207
  208rdf_attach_db_rw(DirSpec, Options) :-
  209    absolute_file_name(DirSpec,
  210                       Directory,
  211                       [ access(write),
  212                         file_type(directory),
  213                         file_errors(fail)
  214                       ]),
  215    !,
  216    (   rdf_directory(Directory)
  217    ->  true                        % update settings?
  218    ;   rdf_detach_db,
  219        mkdir(Directory),
  220        lock_db(Directory),
  221        assert(rdf_directory(Directory)),
  222        assert_options(Options),
  223        stop_monitor,               % make sure not to register load
  224        no_agc(load_db),
  225        at_halt(rdf_detach_db),
  226        start_monitor
  227    ).
  228rdf_attach_db_rw(DirSpec, Options) :-
  229    absolute_file_name(DirSpec,
  230                       Directory,
  231                       [ solutions(all)
  232                       ]),
  233    (   exists_directory(Directory)
  234    ->  access_file(Directory, write)
  235    ;   catch(make_directory(Directory), _, fail)
  236    ),
  237    !,
  238    rdf_attach_db(Directory, Options).
  239rdf_attach_db_rw(DirSpec, _) :-         % Generate an existence or
  240    absolute_file_name(DirSpec,     % permission error
  241                       Directory,
  242                       [ access(exist),
  243                         file_type(directory)
  244                       ]),
  245    permission_error(write, directory, Directory).
 rdf_attach_db_ro(+Directory, +Options)
Open an RDF database in read-only mode.
  251rdf_attach_db_ro(Directory, Options) :-
  252    rdf_detach_db,
  253    assert(rdf_directory(Directory)),
  254    assert_options(Options),
  255    stop_monitor,           % make sure not to register load
  256    no_agc(load_db).
  257
  258
  259assert_options([]).
  260assert_options([H|T]) :-
  261    (   option_type(H, Check)
  262    ->  Check,
  263        assert(rdf_option(H))
  264    ;   true                        % ignore options we do not understand
  265    ),
  266    assert_options(T).
  267
  268option_type(concurrency(X),             must_be(positive_integer, X)).
  269option_type(max_open_journals(X),       must_be(positive_integer, X)).
  270option_type(directory_levels(X),        must_be(positive_integer, X)).
  271option_type(silent(X),                  must_be(oneof([true,false,brief]), X)).
  272option_type(log_nested_transactions(X), must_be(boolean, X)).
  273option_type(access(X),                  must_be(oneof([read_write,
  274                                                       read_only]), X)).
 rdf_persistency_property(?Property) is nondet
True if Property is a property of the current persistent database. Currently makes to options passed to rdf_attach_db/2 available. Notable rdf_persistency_property(access(read_only)) is true if the database is mounted in read-only mode. Other properties:
directory(Dir)
Directory in which the database resides.
  288rdf_persistency_property(Property) :-
  289    var(Property),
  290    !,
  291    rdf_persistency_property_(Property).
  292rdf_persistency_property(Property) :-
  293    rdf_persistency_property_(Property),
  294    !.
  295
  296rdf_persistency_property_(Property) :-
  297    rdf_option(Property).
  298rdf_persistency_property_(directory(Dir)) :-
  299    rdf_directory(Dir).
 no_agc(:Goal)
Run Goal with atom garbage collection disabled. Loading an RDF database creates large amounts of atoms we know are not garbage.
  307no_agc(Goal) :-
  308    current_prolog_flag(agc_margin, Old),
  309    setup_call_cleanup(
  310        set_prolog_flag(agc_margin, 0),
  311        Goal,
  312        set_prolog_flag(agc_margin, Old)).
 rdf_detach_db is det
Detach from the current database. Succeeds silently if no database is attached. Normally called at the end of the program through at_halt/1.
  321rdf_detach_db :-
  322    debug(halt, 'Detaching RDF database', []),
  323    stop_monitor,
  324    close_journals,
  325    (   retract(rdf_directory(Dir))
  326    ->  debug(halt, 'DB Directory: ~w', [Dir]),
  327        save_prefixes(Dir),
  328        retractall(rdf_option(_)),
  329        retractall(source_journal_fd(_,_)),
  330        unlock_db(Dir)
  331    ;   true
  332    ).
 rdf_current_db(?Dir)
True if Dir is the current RDF persistent database.
  339rdf_current_db(Directory) :-
  340    rdf_directory(Dir),
  341    !,
  342    Dir = Directory.
 rdf_flush_journals(+Options)
Flush dirty journals. Options:
min_size(+KB)
Only flush if journal is over KB in size.
graph(+Graph)
Only flush the journal of Graph
To be done
- Provide a default for min_size?
  356rdf_flush_journals(Options) :-
  357    option(graph(Graph), Options, _),
  358    forall(rdf_graph(Graph),
  359           rdf_flush_journal(Graph, Options)).
  360
  361rdf_flush_journal(Graph, Options) :-
  362    db_files(Graph, _SnapshotFile, JournalFile),
  363    db_file(JournalFile, File),
  364    (   \+ exists_file(File)
  365    ->  true
  366    ;   memberchk(min_size(KB), Options),
  367        size_file(JournalFile, Size),
  368        Size / 1024 < KB
  369    ->  true
  370    ;   create_db(Graph)
  371    ).
  372
  373                 /*******************************
  374                 *             LOAD             *
  375                 *******************************/
 load_db is det
Reload database from the directory specified by rdf_directory/1. First we find all names graphs using find_dbs/1 and then we load them.
  383load_db :-
  384    rdf_directory(Dir),
  385    concurrency(Jobs),
  386    cpu_stat_key(Jobs, StatKey),
  387    get_time(Wall0),
  388    statistics(StatKey, T0),
  389    load_prefixes(Dir),
  390    verbosity(Silent),
  391    find_dbs(Dir, Graphs, SnapShots, Journals),
  392    length(Graphs, GraphCount),
  393    maplist(rdf_unload_graph, Graphs),
  394    rdf_statistics(triples(Triples0)),
  395    load_sources(snapshots, SnapShots, Silent, Jobs),
  396    load_sources(journals, Journals, Silent, Jobs),
  397    rdf_statistics(triples(Triples1)),
  398    statistics(StatKey, T1),
  399    get_time(Wall1),
  400    T is T1 - T0,
  401    Wall is Wall1 - Wall0,
  402    Triples = Triples1 - Triples0,
  403    message_level(Silent, Level),
  404    print_message(Level, rdf(restore(attached(GraphCount, Triples, T/Wall)))).
  405
  406load_sources(_, [], _, _) :- !.
  407load_sources(Type, Sources, Silent, Jobs) :-
  408    length(Sources, Count),
  409    RunJobs is min(Count, Jobs),
  410    print_message(informational, rdf(restoring(Type, Count, RunJobs))),
  411    make_goals(Sources, Silent, 1, Count, Goals),
  412    concurrent(RunJobs, Goals, []).
 make_goals(+DBs, +Silent, +Index, +Total, -Goals)
  417make_goals([], _, _, _, []).
  418make_goals([DB|T0], Silent, I,  Total,
  419           [load_source(DB, Silent, I, Total)|T]) :-
  420    I2 is I + 1,
  421    make_goals(T0, Silent, I2, Total, T).
  422
  423verbosity(Silent) :-
  424    rdf_option(silent(Silent)),
  425    !.
  426verbosity(Silent) :-
  427    current_prolog_flag(verbose, silent),
  428    !,
  429    Silent = true.
  430verbosity(brief).
 concurrency(-Jobs)
Number of jobs to run concurrently.
  437concurrency(Jobs) :-
  438    rdf_option(concurrency(Jobs)),
  439    !.
  440concurrency(Jobs) :-
  441    current_prolog_flag(cpu_count, Jobs),
  442    Jobs > 0,
  443    !.
  444concurrency(1).
  445
  446cpu_stat_key(1, cputime) :- !.
  447cpu_stat_key(_, process_cputime).
 find_dbs(+Dir, -Graphs, -SnapBySize, -JournalBySize) is det
Scan the persistent database and return a list of snapshots and journals, both sorted by file-size. Each term is of the form
db(Size, Ext, DB, DBFile, Depth)
  459find_dbs(Dir, Graphs, SnapBySize, JournalBySize) :-
  460    directory_files(Dir, Files),
  461    phrase(scan_db_files(Files, Dir, '.', 0), Scanned),
  462    maplist(db_graph, Scanned, UnsortedGraphs),
  463    sort(UnsortedGraphs, Graphs),
  464    (   consider_reindex_db(Dir, Graphs, Scanned)
  465    ->  find_dbs(Dir, Graphs, SnapBySize, JournalBySize)
  466    ;   partition(db_is_snapshot, Scanned, Snapshots, Journals),
  467        sort(Snapshots, SnapBySize),
  468        sort(Journals, JournalBySize)
  469    ).
  470
  471consider_reindex_db(Dir, Graphs, Scanned) :-
  472    length(Graphs, Count),
  473    Count > 0,
  474    DepthNeeded is floor(log(Count)/log(256)),
  475    (   maplist(depth_db(DepthNow), Scanned)
  476    ->  (   DepthNeeded > DepthNow
  477        ->  true
  478        ;   retractall(rdf_option(directory_levels(_))),
  479            assertz(rdf_option(directory_levels(DepthNow))),
  480            fail
  481        )
  482    ;   true
  483    ),
  484    reindex_db(Dir, DepthNeeded).
  485
  486db_is_snapshot(Term) :-
  487    arg(2, Term, trp).
  488
  489db_graph(Term, DB) :-
  490    arg(3, Term, DB).
  491
  492db_file_name(Term, File) :-
  493    arg(4, Term, File).
  494
  495depth_db(Depth, DB) :-
  496    arg(5, DB, Depth).
 scan_db_files(+Files, +Dir, +Prefix, +Depth)// is det
Produces a list of db(DB, Size, File) for all recognised RDF database files. File is relative to the database directory Dir.
  503scan_db_files([], _, _, _) -->
  504    [].
  505scan_db_files([Nofollow|T], Dir, Prefix, Depth) -->
  506    { nofollow(Nofollow) },
  507    !,
  508    scan_db_files(T, Dir, Prefix, Depth).
  509scan_db_files([File|T], Dir, Prefix, Depth) -->
  510    { file_name_extension(Base, Ext, File),
  511      db_extension(Ext),
  512      !,
  513      rdf_db_to_file(DB, Base),
  514      directory_file_path(Prefix, File, DBFile),
  515      directory_file_path(Dir, DBFile, AbsFile),
  516      size_file(AbsFile, Size)
  517    },
  518    [ db(Size, Ext, DB, AbsFile, Depth) ],
  519    scan_db_files(T, Dir, Prefix, Depth).
  520scan_db_files([D|T], Dir, Prefix, Depth) -->
  521    { directory_file_path(Prefix, D, SubD),
  522      directory_file_path(Dir, SubD, AbsD),
  523      exists_directory(AbsD),
  524      \+ read_link(AbsD, _, _),    % Do not follow links
  525      !,
  526      directory_files(AbsD, SubFiles),
  527      SubDepth is Depth + 1
  528    },
  529    scan_db_files(SubFiles, Dir, SubD, SubDepth),
  530    scan_db_files(T, Dir, Prefix, Depth).
  531scan_db_files([_|T], Dir, Prefix, Depth) -->
  532    scan_db_files(T, Dir, Prefix, Depth).
  533
  534nofollow(.).
  535nofollow(..).
  536
  537db_extension(trp).
  538db_extension(jrn).
  539
  540:- public load_source/4.                % called through make_goals/5
  541
  542load_source(DB, Silent, Nth, Total) :-
  543    db_file_name(DB, File),
  544    db_graph(DB, Graph),
  545    message_level(Silent, Level),
  546    graph_triple_count(Graph, Count0),
  547    statistics(cputime, T0),
  548    (   db_is_snapshot(DB)
  549    ->  print_message(Level, rdf(restore(Silent, snapshot(Graph, File)))),
  550        rdf_load_db(File)
  551    ;   print_message(Level, rdf(restore(Silent, journal(Graph, File)))),
  552        load_journal(File, Graph)
  553    ),
  554    statistics(cputime, T1),
  555    T is T1 - T0,
  556    graph_triple_count(Graph, Count1),
  557    Count is Count1 - Count0,
  558    print_message(Level, rdf(restore(Silent,
  559                                     done(Graph, T, Count, Nth, Total)))).
  560
  561
  562graph_triple_count(Graph, Count) :-
  563    rdf_statistics(triples_by_graph(Graph, Count)),
  564    !.
  565graph_triple_count(_, 0).
 attach_graph(+Graph, +Options) is det
Load triples and reload journal from the indicated snapshot file.
  573attach_graph(Graph, Options) :-
  574    (   option(silent(true), Options)
  575    ->  Level = silent
  576    ;   Level = informational
  577    ),
  578    db_files(Graph, SnapshotFile, JournalFile),
  579    rdf_retractall(_,_,_,Graph),
  580    statistics(cputime, T0),
  581    print_message(Level, rdf(restore(Silent, Graph))),
  582    db_file(SnapshotFile, AbsSnapShot),
  583    (   exists_file(AbsSnapShot)
  584    ->  print_message(Level, rdf(restore(Silent, snapshot(SnapshotFile)))),
  585        rdf_load_db(AbsSnapShot)
  586    ;   true
  587    ),
  588    (   exists_db(JournalFile)
  589    ->  print_message(Level, rdf(restore(Silent, journal(JournalFile)))),
  590        load_journal(JournalFile, Graph)
  591    ;   true
  592    ),
  593    statistics(cputime, T1),
  594    T is T1 - T0,
  595    (   rdf_statistics(triples_by_graph(Graph, Count))
  596    ->  true
  597    ;   Count = 0
  598    ),
  599    print_message(Level, rdf(restore(Silent,
  600                                     done(Graph, T, Count)))).
  601
  602message_level(true, silent) :- !.
  603message_level(_, informational).
  604
  605
  606                 /*******************************
  607                 *         LOAD JOURNAL         *
  608                 *******************************/
 load_journal(+File:atom, +DB:atom) is det
Process transactions from the RDF journal File, adding the given named graph.
  615load_journal(File, DB) :-
  616    rdf_create_graph(DB),
  617    setup_call_cleanup(
  618        open(File, read, In, [encoding(utf8)]),
  619        ( read(In, T0),
  620          process_journal(T0, In, DB)
  621        ),
  622        close(In)).
  623
  624process_journal(end_of_file, _, _) :- !.
  625process_journal(Term, In, DB) :-
  626    (   process_journal_term(Term, DB)
  627    ->  true
  628    ;   throw(error(type_error(journal_term, Term), _))
  629    ),
  630    read(In, T2),
  631    process_journal(T2, In, DB).
  632
  633process_journal_term(assert(S,P,O), DB) :-
  634    rdf_assert(S,P,O,DB).
  635process_journal_term(assert(S,P,O,Line), DB) :-
  636    rdf_assert(S,P,O,DB:Line).
  637process_journal_term(retract(S,P,O), DB) :-
  638    rdf_retractall(S,P,O,DB).
  639process_journal_term(retract(S,P,O,Line), DB) :-
  640    rdf_retractall(S,P,O,DB:Line).
  641process_journal_term(update(S,P,O,Action), DB) :-
  642    (   rdf_update(S,P,O,DB, Action)
  643    ->  true
  644    ;   print_message(warning, rdf(update_failed(S,P,O,Action)))
  645    ).
  646process_journal_term(start(_), _).      % journal open/close
  647process_journal_term(end(_), _).
  648process_journal_term(begin(_), _).      % logged transaction (compatibility)
  649process_journal_term(end, _).
  650process_journal_term(begin(_,_,_,_), _). % logged transaction (current)
  651process_journal_term(end(_,_,_), _).
  652
  653
  654                 /*******************************
  655                 *         CREATE JOURNAL       *
  656                 *******************************/
  657
  658:- dynamic
  659    blocked_db/2,                   % DB, Reason
  660    transaction_message/3,          % Nesting, Time, Message
  661    transaction_db/3.               % Nesting, DB, Id
 rdf_persistency(+DB, Bool)
Specify whether a database is persistent. Switching to false kills the persistent state. Switching to true creates it.
  668rdf_persistency(DB, Bool) :-
  669    must_be(atom, DB),
  670    must_be(boolean, Bool),
  671    fail.
  672rdf_persistency(DB, false) :-
  673    !,
  674    (   blocked_db(DB, persistency)
  675    ->  true
  676    ;   assert(blocked_db(DB, persistency)),
  677        delete_db(DB)
  678    ).
  679rdf_persistency(DB, true) :-
  680    (   retract(blocked_db(DB, persistency))
  681    ->  create_db(DB)
  682    ;   true
  683    ).
 rdf_db:property_of_graph(?Property, +Graph) is nondet
Extend rdf_graph_property/2 with new properties.
  689:- multifile
  690    rdf_db:property_of_graph/2.  691
  692rdf_db:property_of_graph(persistent(State), Graph) :-
  693    (   blocked_db(Graph, persistency)
  694    ->  State = false
  695    ;   State = true
  696    ).
 start_monitor is det
 stop_monitor is det
Start/stop monitoring the RDF database for changes and update the journal.
  705start_monitor :-
  706    rdf_monitor(monitor,
  707                [ -assert(load)
  708                ]).
  709stop_monitor :-
  710    rdf_monitor(monitor,
  711                [ -all
  712                ]).
 monitor(+Term) is semidet
Handle an rdf_monitor/2 callback to deal with persistency. Note that the monitor calls that come from rdf_db.pl that deal with database changes are serialized. They do come from different threads though.
  721monitor(Msg) :-
  722    debug(monitor, 'Monitor: ~p~n', [Msg]),
  723    fail.
  724monitor(assert(S,P,O,DB:Line)) :-
  725    !,
  726    \+ blocked_db(DB, _),
  727    journal_fd(DB, Fd),
  728    open_transaction(DB, Fd),
  729    format(Fd, '~q.~n', [assert(S,P,O,Line)]),
  730    sync_journal(DB, Fd).
  731monitor(assert(S,P,O,DB)) :-
  732    \+ blocked_db(DB, _),
  733    journal_fd(DB, Fd),
  734    open_transaction(DB, Fd),
  735    format(Fd, '~q.~n', [assert(S,P,O)]),
  736    sync_journal(DB, Fd).
  737monitor(retract(S,P,O,DB:Line)) :-
  738    !,
  739    \+ blocked_db(DB, _),
  740    journal_fd(DB, Fd),
  741    open_transaction(DB, Fd),
  742    format(Fd, '~q.~n', [retract(S,P,O,Line)]),
  743    sync_journal(DB, Fd).
  744monitor(retract(S,P,O,DB)) :-
  745    \+ blocked_db(DB, _),
  746    journal_fd(DB, Fd),
  747    open_transaction(DB, Fd),
  748    format(Fd, '~q.~n', [retract(S,P,O)]),
  749    sync_journal(DB, Fd).
  750monitor(update(S,P,O,DB:Line,Action)) :-
  751    !,
  752    \+ blocked_db(DB, _),
  753    (   Action = graph(NewDB)
  754    ->  monitor(assert(S,P,O,NewDB)),
  755        monitor(retract(S,P,O,DB:Line))
  756    ;   journal_fd(DB, Fd),
  757        format(Fd, '~q.~n', [update(S,P,O,Action)]),
  758        sync_journal(DB, Fd)
  759    ).
  760monitor(update(S,P,O,DB,Action)) :-
  761    \+ blocked_db(DB, _),
  762    (   Action = graph(NewDB)
  763    ->  monitor(assert(S,P,O,NewDB)),
  764        monitor(retract(S,P,O,DB))
  765    ;   journal_fd(DB, Fd),
  766        open_transaction(DB, Fd),
  767        format(Fd, '~q.~n', [update(S,P,O,Action)]),
  768        sync_journal(DB, Fd)
  769    ).
  770monitor(load(BE, _DumpFileURI)) :-
  771    (   BE = end(Graphs)
  772    ->  sync_loaded_graphs(Graphs)
  773    ;   true
  774    ).
  775monitor(create_graph(Graph)) :-
  776    \+ blocked_db(Graph, _),
  777    journal_fd(Graph, Fd),
  778    open_transaction(Graph, Fd),
  779    sync_journal(Graph, Fd).
  780monitor(reset) :-
  781    forall(rdf_graph(Graph), delete_db(Graph)).
  782                                        % TBD: Remove empty directories?
  783
  784monitor(transaction(BE, Id)) :-
  785    monitor_transaction(Id, BE).
  786
  787monitor_transaction(load_journal(DB), begin(_)) :-
  788    !,
  789    assert(blocked_db(DB, journal)).
  790monitor_transaction(load_journal(DB), end(_)) :-
  791    !,
  792    retractall(blocked_db(DB, journal)).
  793
  794monitor_transaction(parse(URI), begin(_)) :-
  795    !,
  796    (   blocked_db(URI, persistency)
  797    ->  true
  798    ;   assert(blocked_db(URI, parse))
  799    ).
  800monitor_transaction(parse(URI), end(_)) :-
  801    !,
  802    (   retract(blocked_db(URI, parse))
  803    ->  create_db(URI)
  804    ;   true
  805    ).
  806monitor_transaction(unload(DB), begin(_)) :-
  807    !,
  808    (   blocked_db(DB, persistency)
  809    ->  true
  810    ;   assert(blocked_db(DB, unload))
  811    ).
  812monitor_transaction(unload(DB), end(_)) :-
  813    !,
  814    (   retract(blocked_db(DB, unload))
  815    ->  delete_db(DB)
  816    ;   true
  817    ).
  818monitor_transaction(log(Msg), begin(N)) :-
  819    !,
  820    check_nested(N),
  821    get_time(Time),
  822    asserta(transaction_message(N, Time, Msg)).
  823monitor_transaction(log(_), end(N)) :-
  824    check_nested(N),
  825    retract(transaction_message(N, _, _)),
  826    !,
  827    findall(DB:Id, retract(transaction_db(N, DB, Id)), DBs),
  828    end_transactions(DBs, N).
  829monitor_transaction(log(Msg, DB), begin(N)) :-
  830    !,
  831    check_nested(N),
  832    get_time(Time),
  833    asserta(transaction_message(N, Time, Msg)),
  834    journal_fd(DB, Fd),
  835    open_transaction(DB, Fd).
  836monitor_transaction(log(Msg, _DB), end(N)) :-
  837    monitor_transaction(log(Msg), end(N)).
 check_nested(+Level) is semidet
True if we must log this transaction. This is always the case for toplevel transactions. Nested transactions are only logged if log_nested_transactions(true) is defined.
  846check_nested(0) :- !.
  847check_nested(_) :-
  848    rdf_option(log_nested_transactions(true)).
 open_transaction(+DB, +Fd) is det
Add a begin(Id, Level, Time, Message) term if a transaction involves DB. Id is an incremental integer, where each database has its own counter. Level is the nesting level, Time a floating point timestamp and Message te message provided as argument to the log message.
  859open_transaction(DB, Fd) :-
  860    transaction_message(N, Time, Msg),
  861    !,
  862    (   transaction_db(N, DB, _)
  863    ->  true
  864    ;   next_transaction_id(DB, Id),
  865        assert(transaction_db(N, DB, Id)),
  866        RoundedTime is round(Time*100)/100,
  867        format(Fd, '~q.~n', [begin(Id, N, RoundedTime, Msg)])
  868    ).
  869open_transaction(_,_).
 next_transaction_id(+DB, -Id) is det
Id is the number to user for the next logged transaction on DB. Transactions in each named graph are numbered in sequence. Searching the Id of the last transaction is performed by the 2nd clause starting 1Kb from the end and doubling this offset each failure.
  880:- dynamic
  881    current_transaction_id/2.  882
  883next_transaction_id(DB, Id) :-
  884    retract(current_transaction_id(DB, Last)),
  885    !,
  886    Id is Last + 1,
  887    assert(current_transaction_id(DB, Id)).
  888next_transaction_id(DB, Id) :-
  889    db_files(DB, _, Journal),
  890    exists_file(Journal),
  891    !,
  892    size_file(Journal, Size),
  893    open_db(Journal, read, In, []),
  894    call_cleanup(iterative_expand(In, Size, Last), close(In)),
  895    Id is Last + 1,
  896    assert(current_transaction_id(DB, Id)).
  897next_transaction_id(DB, 1) :-
  898    assert(current_transaction_id(DB, 1)).
  899
  900iterative_expand(_, 0, 0) :- !.
  901iterative_expand(In, Size, Last) :-     % Scan growing sections from the end
  902    Max is floor(log(Size)/log(2)),
  903    between(10, Max, Step),
  904    Offset is -(1<<Step),
  905    seek(In, Offset, eof, _),
  906    skip(In, 10),                   % records are line-based
  907    read(In, T0),
  908    last_transaction_id(T0, In, 0, Last),
  909    Last > 0,
  910    !.
  911iterative_expand(In, _, Last) :-        % Scan the whole file
  912    seek(In, 0, bof, _),
  913    read(In, T0),
  914    last_transaction_id(T0, In, 0, Last).
  915
  916last_transaction_id(end_of_file, _, Last, Last) :- !.
  917last_transaction_id(end(Id, _, _), In, _, Last) :-
  918    read(In, T1),
  919    last_transaction_id(T1, In, Id, Last).
  920last_transaction_id(_, In, Id, Last) :-
  921    read(In, T1),
  922    last_transaction_id(T1, In, Id, Last).
 end_transactions(+DBs:list(atom:id)) is det
End a transaction that affected the given list of databases. We write the list of other affected databases as an argument to the end-term to facilitate fast finding of the related transactions.

In each database, the transaction is ended with a term end(Id, Nesting, Others), where Id and Nesting are the transaction identifier and nesting (see open_transaction/2) and Others is a list of DB:Id, indicating other databases affected by the transaction.

  937end_transactions(DBs, N) :-
  938    end_transactions(DBs, DBs, N).
  939
  940end_transactions([], _, _).
  941end_transactions([DB:Id|T], DBs, N) :-
  942    journal_fd(DB, Fd),
  943    once(select(DB:Id, DBs, Others)),
  944    format(Fd, 'end(~q, ~q, ~q).~n', [Id, N, Others]),
  945    sync_journal(DB, Fd),
  946    end_transactions(T, DBs, N).
 sync_loaded_graphs(+Graphs)
Called after a binary triple has been loaded that added triples to the given graphs.
  954sync_loaded_graphs(Graphs) :-
  955    maplist(create_db, Graphs).
  956
  957
  958                 /*******************************
  959                 *         JOURNAL FILES        *
  960                 *******************************/
 journal_fd(+DB, -Stream) is det
Get an open stream to a journal. If the journal is not open, old journals are closed to satisfy the max_open_journals option. Then the journal is opened in append mode. Journal files are always encoded as UTF-8 for portability as well as to ensure full coverage of Unicode.
  970journal_fd(DB, Fd) :-
  971    source_journal_fd(DB, Fd),
  972    !.
  973journal_fd(DB, Fd) :-
  974    with_mutex(rdf_journal_file,
  975               journal_fd_(DB, Out)),
  976    Fd = Out.
  977
  978journal_fd_(DB, Fd) :-
  979    source_journal_fd(DB, Fd),
  980    !.
  981journal_fd_(DB, Fd) :-
  982    limit_fd_pool,
  983    db_files(DB, _Snapshot, Journal),
  984    open_db(Journal, append, Fd,
  985            [ close_on_abort(false)
  986            ]),
  987    time_stamp(Now),
  988    format(Fd, '~q.~n', [start([time(Now)])]),
  989    assert(source_journal_fd(DB, Fd)).              % new one at the end
 limit_fd_pool is det
Limit the number of open journals to max_open_journals (10). Note that calls from rdf_monitor/2 are issued in different threads, but as they are part of write operations they are fully synchronised.
  998limit_fd_pool :-
  999    predicate_property(source_journal_fd(_, _), number_of_clauses(N)),
 1000    !,
 1001    (   rdf_option(max_open_journals(Max))
 1002    ->  true
 1003    ;   Max = 10
 1004    ),
 1005    Close is N - Max,
 1006    forall(between(1, Close, _),
 1007           close_oldest_journal).
 1008limit_fd_pool.
 1009
 1010close_oldest_journal :-
 1011    source_journal_fd(DB, _Fd),
 1012    !,
 1013    debug(rdf_persistency, 'Closing old journal for ~q', [DB]),
 1014    close_journal(DB).
 1015close_oldest_journal.
 sync_journal(+DB, +Fd)
Sync journal represented by database and stream. If the DB is involved in a transaction there is no point flushing until the end of the transaction.
 1024sync_journal(DB, _) :-
 1025    transaction_db(_, DB, _),
 1026    !.
 1027sync_journal(_, Fd) :-
 1028    flush_output(Fd).
 close_journal(+DB) is det
Close the journal associated with DB if it is open.
 1034close_journal(DB) :-
 1035    with_mutex(rdf_journal_file,
 1036               close_journal_(DB)).
 1037
 1038close_journal_(DB) :-
 1039    (   retract(source_journal_fd(DB, Fd))
 1040    ->  time_stamp(Now),
 1041        format(Fd, '~q.~n', [end([time(Now)])]),
 1042        close(Fd, [force(true)])
 1043    ;   true
 1044    ).
 close_journals
Close all open journals.
 1050close_journals :-
 1051    forall(source_journal_fd(DB, _),
 1052           catch(close_journal(DB), E,
 1053                 print_message(error, E))).
 create_db(+Graph)
Create a saved version of Graph in corresponding file, close and delete journals.
 1060create_db(Graph) :-
 1061    \+ rdf(_,_,_,Graph),
 1062    !,
 1063    debug(rdf_persistency, 'Deleting empty Graph ~w', [Graph]),
 1064    delete_db(Graph).
 1065create_db(Graph) :-
 1066    debug(rdf_persistency, 'Saving Graph ~w', [Graph]),
 1067    close_journal(Graph),
 1068    db_abs_files(Graph, Snapshot, Journal),
 1069    atom_concat(Snapshot, '.new', NewSnapshot),
 1070    (   catch(( create_directory_levels(Snapshot),
 1071                rdf_save_db(NewSnapshot, Graph)
 1072              ), Error,
 1073              ( print_message(warning, Error),
 1074                fail
 1075              ))
 1076    ->  (   exists_file(Journal)
 1077        ->  delete_file(Journal)
 1078        ;   true
 1079        ),
 1080        rename_file(NewSnapshot, Snapshot),
 1081        debug(rdf_persistency, 'Saved Graph ~w', [Graph])
 1082    ;   catch(delete_file(NewSnapshot), _, true)
 1083    ).
 delete_db(+DB)
Remove snapshot and journal file for DB.
 1090delete_db(DB) :-
 1091    with_mutex(rdf_journal_file,
 1092               delete_db_(DB)).
 1093
 1094delete_db_(DB) :-
 1095    close_journal_(DB),
 1096    db_abs_files(DB, Snapshot, Journal),
 1097    !,
 1098    (   exists_file(Journal)
 1099    ->  delete_file(Journal)
 1100    ;   true
 1101    ),
 1102    (   exists_file(Snapshot)
 1103    ->  delete_file(Snapshot)
 1104    ;   true
 1105    ).
 1106delete_db_(_).
 1107
 1108                 /*******************************
 1109                 *             LOCKING          *
 1110                 *******************************/
 lock_db(+Dir)
Lock the database directory Dir.
 1116lock_db(Dir) :-
 1117    lockfile(Dir, File),
 1118    catch(open(File, update, Out, [lock(write), wait(false)]),
 1119          error(permission_error(Access, _, _), _),
 1120          locked_error(Access, Dir)),
 1121    (   current_prolog_flag(pid, PID)
 1122    ->  true
 1123    ;   PID = 0                     % TBD: Fix in Prolog
 1124    ),
 1125    time_stamp(Now),
 1126    gethostname(Host),
 1127    format(Out, '/* RDF Database is in use */~n~n', []),
 1128    format(Out, '~q.~n', [ locked([ time(Now),
 1129                                    pid(PID),
 1130                                    host(Host)
 1131                                  ])
 1132                         ]),
 1133    flush_output(Out),
 1134    set_end_of_stream(Out),
 1135    assert(rdf_lock(Dir, lock(Out, File))),
 1136    at_halt(unlock_db(Dir)).
 1137
 1138locked_error(lock, Dir) :-
 1139    lockfile(Dir, File),
 1140    (   catch(read_file_to_terms(File, Terms, []), _, fail),
 1141        Terms = [locked(Args)]
 1142    ->  Context = rdf_locked(Args)
 1143    ;   Context = context(_, 'Database is in use')
 1144    ),
 1145    throw(error(permission_error(lock, rdf_db, Dir), Context)).
 1146locked_error(open, Dir) :-
 1147    throw(error(permission_error(lock, rdf_db, Dir),
 1148                context(_, 'Lock file cannot be opened'))).
 unlock_db(+Dir) is det
 unlock_db(+Stream, +File) is det
 1153unlock_db(Dir) :-
 1154    retract(rdf_lock(Dir, lock(Out, File))),
 1155    !,
 1156    unlock_db(Out, File).
 1157unlock_db(_).
 1158
 1159unlock_db(Out, File) :-
 1160    close(Out),
 1161    delete_file(File).
 1162
 1163                 /*******************************
 1164                 *           FILENAMES          *
 1165                 *******************************/
 1166
 1167lockfile(Dir, LockFile) :-
 1168    atomic_list_concat([Dir, /, lock], LockFile).
 1169
 1170directory_levels(Levels) :-
 1171    rdf_option(directory_levels(Levels)),
 1172    !.
 1173directory_levels(2).
 1174
 1175db_file(Base, File) :-
 1176    rdf_directory(Dir),
 1177    directory_levels(Levels),
 1178    db_file(Dir, Base, Levels, File).
 1179
 1180db_file(Dir, Base, Levels, File) :-
 1181    dir_levels(Base, Levels, Segments, [Base]),
 1182    atomic_list_concat([Dir|Segments], /, File).
 1183
 1184open_db(Base, Mode, Stream, Options) :-
 1185    db_file(Base, File),
 1186    create_directory_levels(File),
 1187    open(File, Mode, Stream, [encoding(utf8)|Options]).
 1188
 1189create_directory_levels(_File) :-
 1190    rdf_option(directory_levels(0)),
 1191    !.
 1192create_directory_levels(File) :-
 1193    file_directory_name(File, Dir),
 1194    make_directory_path(Dir).
 1195
 1196exists_db(Base) :-
 1197    db_file(Base, File),
 1198    exists_file(File).
 dir_levels(+File, +Levels, ?Segments, ?Tail) is det
Create a list of intermediate directory names for File. Each directory consists of two hexadecimal digits.
 1205dir_levels(_, 0, Segments, Segments) :- !.
 1206dir_levels(File, Levels, Segments, Tail) :-
 1207    rdf_atom_md5(File, 1, Hash),
 1208    create_dir_levels(Levels, 0, Hash, Segments, Tail).
 1209
 1210create_dir_levels(0, _, _, Segments, Segments) :- !.
 1211create_dir_levels(N, S, Hash, [S1|Segments0], Tail) :-
 1212    sub_atom(Hash, S, 2, _, S1),
 1213    S2 is S+2,
 1214    N2 is N-1,
 1215    create_dir_levels(N2, S2, Hash, Segments0, Tail).
 db_files(+DB, -Snapshot, -Journal)
db_files(-DB, +Snapshot, -Journal)
db_files(-DB, -Snapshot, +Journal)
True if named graph DB is represented by the files Snapshot and Journal. The filenames are local to the directory representing the store.
 1226db_files(DB, Snapshot, Journal) :-
 1227    nonvar(DB),
 1228    !,
 1229    rdf_db_to_file(DB, Base),
 1230    atom_concat(Base, '.trp', Snapshot),
 1231    atom_concat(Base, '.jrn', Journal).
 1232db_files(DB, Snapshot, Journal) :-
 1233    nonvar(Snapshot),
 1234    !,
 1235    atom_concat(Base, '.trp', Snapshot),
 1236    atom_concat(Base, '.jrn', Journal),
 1237    rdf_db_to_file(DB, Base).
 1238db_files(DB, Snapshot, Journal) :-
 1239    nonvar(Journal),
 1240    !,
 1241    atom_concat(Base, '.jrn', Journal),
 1242    atom_concat(Base, '.trp', Snapshot),
 1243    rdf_db_to_file(DB, Base).
 1244
 1245db_abs_files(DB, Snapshot, Journal) :-
 1246    db_files(DB, Snapshot0, Journal0),
 1247    db_file(Snapshot0, Snapshot),
 1248    db_file(Journal0, Journal).
 rdf_journal_file(+Graph, -File) is semidet
rdf_journal_file(-Graph, -File) is nondet
True if File the name of the existing journal file for Graph.
 1256rdf_journal_file(Graph, Journal) :-
 1257    (   var(Graph)
 1258    ->  rdf_graph(Graph)
 1259    ;   true
 1260    ),
 1261    db_abs_files(Graph, _Snapshot, Journal),
 1262    exists_file(Journal).
 rdf_snapshot_file(+Graph, -File) is semidet
rdf_snapshot_file(-Graph, -File) is nondet
True if File the name of the existing snapshot file for Graph.
 1270rdf_snapshot_file(Graph, Snapshot) :-
 1271    (   var(Graph)
 1272    ->  rdf_graph(Graph)    % also pick the empty graphs
 1273    ;   true
 1274    ),
 1275    db_abs_files(Graph, Snapshot, _Journal),
 1276    exists_file(Snapshot).
 rdf_db_to_file(+DB, -File) is det
rdf_db_to_file(-DB, +File) is det
Translate between database encoding (often an file or URL) and the name we store in the directory. We keep a cache for two reasons. Speed, but much more important is that the mapping of raw --> encoded provided by www_form_encode/2 is not guaranteed to be unique by the W3C standards.
 1288rdf_db_to_file(DB, File) :-
 1289    file_base_db(File, DB),
 1290    !.
 1291rdf_db_to_file(DB, File) :-
 1292    url_to_filename(DB, File),
 1293    assert(file_base_db(File, DB)).
 url_to_filename(+URL, -FileName) is det
url_to_filename(-URL, +FileName) is det
Turn a valid URL into a filename. Earlier versions used www_form_encode/2, but this can produce characters that are not valid in filenames. We will use the same encoding as www_form_encode/2, but using our own rules for allowed characters. The only requirement is that we avoid any filename special character in use. The current encoding use US-ASCII alnum characters, _ and %
 1306url_to_filename(URL, FileName) :-
 1307    atomic(URL),
 1308    !,
 1309    atom_codes(URL, Codes),
 1310    phrase(url_encode(EncCodes), Codes),
 1311    atom_codes(FileName, EncCodes).
 1312url_to_filename(URL, FileName) :-
 1313    uri_encoded(path, URL, FileName).
 1314
 1315url_encode([0'+|T]) -->
 1316    " ",
 1317    !,
 1318    url_encode(T).
 1319url_encode([C|T]) -->
 1320    alphanum(C),
 1321    !,
 1322    url_encode(T).
 1323url_encode([C|T]) -->
 1324    no_enc_extra(C),
 1325    !,
 1326    url_encode(T).
 1327url_encode(Enc) -->
 1328    (   "\r\n"
 1329    ;   "\n"
 1330    ),
 1331    !,
 1332    { string_codes("%0D%0A", Codes),
 1333      append(Codes, T, Enc)
 1334    },
 1335    url_encode(T).
 1336url_encode([]) -->
 1337    eos,
 1338    !.
 1339url_encode([0'%,D1,D2|T]) -->
 1340    [C],
 1341    { Dv1 is (C>>4 /\ 0xf),
 1342      Dv2 is (C /\ 0xf),
 1343      code_type(D1, xdigit(Dv1)),
 1344      code_type(D2, xdigit(Dv2))
 1345    },
 1346    url_encode(T).
 1347
 1348eos([], []).
 1349
 1350alphanum(C) -->
 1351    [C],
 1352    { C < 128,                      % US-ASCII
 1353      code_type(C, alnum)
 1354    }.
 1355
 1356no_enc_extra(0'_) --> "_".
 1357
 1358
 1359                 /*******************************
 1360                 *             REINDEX          *
 1361                 *******************************/
 reindex_db(+Dir, +Levels)
Reindex the database by creating intermediate directories.
 1367reindex_db(Dir, Levels) :-
 1368    directory_files(Dir, Files),
 1369    reindex_files(Files, Dir, '.', 0, Levels),
 1370    remove_empty_directories(Files, Dir).
 1371
 1372reindex_files([], _, _, _, _).
 1373reindex_files([Nofollow|Files], Dir, Prefix, CLevel, Levels) :-
 1374    nofollow(Nofollow),
 1375    !,
 1376    reindex_files(Files, Dir, Prefix, CLevel, Levels).
 1377reindex_files([File|Files], Dir, Prefix, CLevel, Levels) :-
 1378    CLevel \== Levels,
 1379    file_name_extension(_Base, Ext, File),
 1380    db_extension(Ext),
 1381    !,
 1382    directory_file_path(Prefix, File, DBFile),
 1383    directory_file_path(Dir, DBFile, OldPath),
 1384    db_file(Dir, File, Levels, NewPath),
 1385    debug(rdf_persistency, 'Rename ~q --> ~q', [OldPath, NewPath]),
 1386    file_directory_name(NewPath, NewDir),
 1387    make_directory_path(NewDir),
 1388    rename_file(OldPath, NewPath),
 1389    reindex_files(Files, Dir, Prefix, CLevel, Levels).
 1390reindex_files([D|Files], Dir, Prefix, CLevel, Levels) :-
 1391    directory_file_path(Prefix, D, SubD),
 1392    directory_file_path(Dir, SubD, AbsD),
 1393    exists_directory(AbsD),
 1394    \+ read_link(AbsD, _, _),      % Do not follow links
 1395    !,
 1396    directory_files(AbsD, SubFiles),
 1397    CLevel2 is CLevel + 1,
 1398    reindex_files(SubFiles, Dir, SubD, CLevel2, Levels),
 1399    reindex_files(Files, Dir, Prefix, CLevel, Levels).
 1400reindex_files([_|Files], Dir, Prefix, CLevel, Levels) :-
 1401    reindex_files(Files, Dir, Prefix, CLevel, Levels).
 1402
 1403
 1404remove_empty_directories([], _).
 1405remove_empty_directories([File|Files], Dir) :-
 1406    \+ nofollow(File),
 1407    directory_file_path(Dir, File, Path),
 1408    exists_directory(Path),
 1409    \+ read_link(Path, _, _),
 1410    !,
 1411    directory_files(Path, Content),
 1412    exclude(nofollow, Content, RealContent),
 1413    (   RealContent == []
 1414    ->  debug(rdf_persistency, 'Remove empty dir ~q', [Path]),
 1415        delete_directory(Path)
 1416    ;   remove_empty_directories(RealContent, Path)
 1417    ),
 1418    remove_empty_directories(Files, Dir).
 1419remove_empty_directories([_|Files], Dir) :-
 1420    remove_empty_directories(Files, Dir).
 1421
 1422
 1423                 /*******************************
 1424                 *            PREFIXES          *
 1425                 *******************************/
 1426
 1427save_prefixes(Dir) :-
 1428    atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile),
 1429    setup_call_cleanup(open(PrefixFile, write, Out, [encoding(utf8)]),
 1430                       write_prefixes(Out),
 1431                       close(Out)).
 1432
 1433write_prefixes(Out) :-
 1434    format(Out, '% Snapshot of defined RDF prefixes~n~n', []),
 1435    forall(rdf_current_ns(Alias, URI),
 1436           format(Out, 'prefix(~q, ~q).~n', [Alias, URI])).
 load_prefixes(+RDFDBDir) is det
If the file RDFDBDir/prefixes.db exists, load the prefixes. The prefixes are registered using rdf_register_ns/3. Possible errors because the prefix definitions have changed are printed as warnings, retaining the old definition. Note that changing prefixes generally requires reloading all RDF from the source.
 1446load_prefixes(Dir) :-
 1447    atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile),
 1448    (   exists_file(PrefixFile)
 1449    ->  setup_call_cleanup(open(PrefixFile, read, In, [encoding(utf8)]),
 1450                           read_prefixes(In),
 1451                           close(In))
 1452    ;   true
 1453    ).
 1454
 1455read_prefixes(Stream) :-
 1456    read_term(Stream, T0, []),
 1457    read_prefixes(T0, Stream).
 1458
 1459read_prefixes(end_of_file, _) :- !.
 1460read_prefixes(prefix(Alias, URI), Stream) :-
 1461    !,
 1462    must_be(atom, Alias),
 1463    must_be(atom, URI),
 1464    catch(rdf_register_ns(Alias, URI, []), E,
 1465          print_message(warning, E)),
 1466    read_term(Stream, T, []),
 1467    read_prefixes(T, Stream).
 1468read_prefixes(Term, _) :-
 1469    domain_error(prefix_term, Term).
 1470
 1471
 1472                 /*******************************
 1473                 *              UTIL            *
 1474                 *******************************/
 mkdir(+Directory)
Create a directory if it does not already exist.
 1480mkdir(Directory) :-
 1481    exists_directory(Directory),
 1482    !.
 1483mkdir(Directory) :-
 1484    make_directory(Directory).
 time_stamp(-Integer)
Return time-stamp rounded to integer.
 1490time_stamp(Int) :-
 1491    get_time(Now),
 1492    Int is round(Now).
 1493
 1494
 1495                 /*******************************
 1496                 *            MESSAGES          *
 1497                 *******************************/
 1498
 1499:- multifile
 1500    prolog:message/3,
 1501    prolog:message_context/3. 1502
 1503prolog:message(rdf(Term)) -->
 1504    message(Term).
 1505
 1506message(restoring(Type, Count, Jobs)) -->
 1507    [ 'Restoring ~D ~w using ~D concurrent workers'-[Count, Type, Jobs] ].
 1508message(restore(attached(Graphs, Triples, Time/Wall))) -->
 1509    { catch(Percent is round(100*Time/Wall), _, Percent = 0) },
 1510    [ 'Loaded ~D graphs (~D triples) in ~2f sec. (~d% CPU = ~2f sec.)'-
 1511      [Graphs, Triples, Wall, Percent, Time] ].
 1512% attach_graph/2
 1513message(restore(true, Action)) -->
 1514    !,
 1515    silent_message(Action).
 1516message(restore(brief, Action)) -->
 1517    !,
 1518    brief_message(Action).
 1519message(restore(_, Graph)) -->
 1520    [ 'Restoring ~p ... '-[Graph], flush ].
 1521message(restore(_, snapshot(_))) -->
 1522    [ at_same_line, '(snapshot) '-[], flush ].
 1523message(restore(_, journal(_))) -->
 1524    [ at_same_line, '(journal) '-[], flush ].
 1525message(restore(_, done(_, Time, Count))) -->
 1526    [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ].
 1527% load_source/4
 1528message(restore(_, snapshot(G, _))) -->
 1529    [ 'Restoring ~p\t(snapshot)'-[G], flush ].
 1530message(restore(_, journal(G, _))) -->
 1531    [ 'Restoring ~p\t(journal)'-[G], flush ].
 1532message(restore(_, done(_, Time, Count))) -->
 1533    [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ].
 1534% journal handling
 1535message(update_failed(S,P,O,Action)) -->
 1536    [ 'Failed to update <~p ~p ~p> with ~p'-[S,P,O,Action] ].
 1537% directory reindexing
 1538message(reindex(Count, Depth)) -->
 1539    [ 'Restructuring database with ~d levels (~D graphs)'-[Depth, Count] ].
 1540message(reindex(Depth)) -->
 1541    [ 'Fixing database directory structure (~d levels)'-[Depth] ].
 1542message(read_only) -->
 1543    [ 'Cannot write persistent store; continuing in read-only mode.', nl,
 1544      'All changes to the RDF store will be lost if this process terminates.'
 1545    ].
 1546
 1547silent_message(_Action) --> [].
 1548
 1549brief_message(done(Graph, _Time, _Count, Nth, Total)) -->
 1550    { file_base_name(Graph, Base) },
 1551    [ at_same_line,
 1552      '\r~p~`.t ~D of ~D graphs~72|'-[Base, Nth, Total],
 1553      flush
 1554    ].
 1555brief_message(_) --> [].
 1556
 1557
 1558prolog:message_context(rdf_locked(Args)) -->
 1559    { memberchk(time(Time), Args),
 1560      memberchk(pid(Pid), Args),
 1561      format_time(string(S), '%+', Time)
 1562    },
 1563    [ nl,
 1564      'locked at ~s by process id ~w'-[S,Pid]
 1565    ]