1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2006-2015, University of Amsterdam 7 VU University Amsterdam 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36:- module(rdf_persistency, 37 [ rdf_attach_db/2, % +Directory, +Options 38 rdf_detach_db/0, % +Detach current Graph 39 rdf_current_db/1, % -Directory 40 rdf_persistency/2, % +Graph, +Bool 41 rdf_flush_journals/1, % +Options 42 rdf_persistency_property/1, % ?Property 43 rdf_journal_file/2, % ?Graph, ?JournalFile 44 rdf_snapshot_file/2, % ?Graph, ?SnapshotFile 45 rdf_db_to_file/2 % ?Graph, ?FileBase 46 ]). 47:- use_module(library(semweb/rdf_db)). 48:- use_module(library(filesex)). 49:- use_module(library(lists)). 50:- use_module(library(uri)). 51:- use_module(library(debug)). 52:- use_module(library(option)). 53:- use_module(library(error)). 54:- use_module(library(thread)). 55:- use_module(library(apply)).
90:- volatile 91 rdf_directory/1, 92 rdf_lock/2, 93 rdf_option/1, 94 source_journal_fd/2, 95 file_base_db/2. 96:- dynamic 97 rdf_directory/1, % Absolute path 98 rdf_lock/2, % Dir, Lock 99 rdf_option/1, % Defined options 100 source_journal_fd/2, % DB, JournalFD 101 file_base_db/2. % FileBase, DB 102 103:- meta_predicate 104 no_agc( ). 105 106:- predicate_options(rdf_attach_db/2, 2, 107 [ access(oneof([read_write,read_only])), 108 concurrency(positive_integer), 109 max_open_journals(positive_integer), 110 silent(oneof([true,false,brief])), 111 log_nested_transactions(boolean) 112 ]).
Options:
auto
(default), read_write
or
read_only
. Read-only access implies that the RDF
store is not locked. It is read at startup and all
modifications to the data are temporary. The default
auto
mode is read_write
if the directory is
writeable and the lock can be acquired. Otherwise
it reverts to read_only
.cpu_count
.true
(default false
), do not print informational
messages. Finally, if brief
it will show minimal
feedback.true
, nested log transactions are added to the
journal information. By default (false
), no log-term
is added for nested transactions.\\162rdf_attach_db(DirSpec, Options) :- 163 option(access(read_only), Options), 164 !, 165 absolute_file_name(DirSpec, 166 Directory, 167 [ access(read), 168 file_type(directory) 169 ]), 170 rdf_attach_db_ro(Directory, Options). 171rdf_attach_db(DirSpec, Options) :- 172 option(access(read_write), Options), 173 !, 174 rdf_attach_db_rw(DirSpec, Options). 175rdf_attach_db(DirSpec, Options) :- 176 absolute_file_name(DirSpec, 177 Directory, 178 [ access(exist), 179 file_type(directory), 180 file_errors(fail) 181 ]), 182 !, 183 ( access_file(Directory, write) 184 -> catch(rdf_attach_db_rw(Directory, Options), E, true), 185 ( var(E) 186 -> true 187 ; E = error(permission_error(lock, rdf_db, _), _) 188 -> print_message(warning, E), 189 print_message(warning, rdf(read_only)), 190 rdf_attach_db(DirSpec, [access(read_only)|Options]) 191 ; throw(E) 192 ) 193 ; print_message(warning, 194 error(permission_error(write, directory, Directory))), 195 print_message(warning, rdf(read_only)), 196 rdf_attach_db_ro(Directory, Options) 197 ). 198rdf_attach_db(DirSpec, Options) :- 199 catch(rdf_attach_db_rw(DirSpec, Options), E, true), 200 ( var(E) 201 -> true 202 ; print_message(warning, E), 203 print_message(warning, rdf(read_only)), 204 rdf_attach_db(DirSpec, [access(read_only)|Options]) 205 ). 206 207 208rdf_attach_db_rw(DirSpec, Options) :- 209 absolute_file_name(DirSpec, 210 Directory, 211 [ access(write), 212 file_type(directory), 213 file_errors(fail) 214 ]), 215 !, 216 ( rdf_directory(Directory) 217 -> true % update settings? 218 ; rdf_detach_db, 219 mkdir(Directory), 220 lock_db(Directory), 221 assert(rdf_directory(Directory)), 222 assert_options(Options), 223 stop_monitor, % make sure not to register load 224 no_agc(load_db), 225 at_halt(rdf_detach_db), 226 start_monitor 227 ). 228rdf_attach_db_rw(DirSpec, Options) :- 229 absolute_file_name(DirSpec, 230 Directory, 231 [ solutions(all) 232 ]), 233 ( exists_directory(Directory) 234 -> access_file(Directory, write) 235 ; catch(make_directory(Directory), _, fail) 236 ), 237 !, 238 rdf_attach_db(Directory, Options). 239rdf_attach_db_rw(DirSpec, _) :- % Generate an existence or 240 absolute_file_name(DirSpec, % permission error 241 Directory, 242 [ access(exist), 243 file_type(directory) 244 ]), 245 permission_error(write, directory, Directory).
251rdf_attach_db_ro(Directory, Options) :- 252 rdf_detach_db, 253 assert(rdf_directory(Directory)), 254 assert_options(Options), 255 stop_monitor, % make sure not to register load 256 no_agc(load_db). 257 258 259assert_options([]). 260assert_options([H|T]) :- 261 ( option_type(H, Check) 262 -> , 263 assert(rdf_option(H)) 264 ; true % ignore options we do not understand 265 ), 266 assert_options(T). 267 268option_type(concurrency(X), must_be(positive_integer, X)). 269option_type(max_open_journals(X), must_be(positive_integer, X)). 270option_type(directory_levels(X), must_be(positive_integer, X)). 271option_type(silent(X), must_be(oneof([true,false,brief]), X)). 272option_type(log_nested_transactions(X), must_be(boolean, X)). 273option_type(access(X), must_be(oneof([read_write, 274 read_only]), X)).
rdf_persistency_property(access(read_only))
is true if the database is mounted in read-only mode. Other
properties:
288rdf_persistency_property(Property) :- 289 var(Property), 290 !, 291 rdf_persistency_property_(Property). 292rdf_persistency_property(Property) :- 293 rdf_persistency_property_(Property), 294 !. 295 296rdf_persistency_property_(Property) :- 297 rdf_option(Property). 298rdf_persistency_property_(directory(Dir)) :- 299 rdf_directory(Dir).
307no_agc(Goal) :-
308 current_prolog_flag(agc_margin, Old),
309 setup_call_cleanup(
310 set_prolog_flag(agc_margin, 0),
311 ,
312 set_prolog_flag(agc_margin, Old)).
321rdf_detach_db :-
322 debug(halt, 'Detaching RDF database', []),
323 stop_monitor,
324 close_journals,
325 ( retract(rdf_directory(Dir))
326 -> debug(halt, 'DB Directory: ~w', [Dir]),
327 save_prefixes(Dir),
328 retractall(rdf_option(_)),
329 retractall(source_journal_fd(_,_)),
330 unlock_db(Dir)
331 ; true
332 ).
339rdf_current_db(Directory) :-
340 rdf_directory(Dir),
341 !,
342 Dir = Directory.
356rdf_flush_journals(Options) :- 357 option(graph(Graph), Options, _), 358 forall(rdf_graph(Graph), 359 rdf_flush_journal(Graph, Options)). 360 361rdf_flush_journal(Graph, Options) :- 362 db_files(Graph, _SnapshotFile, JournalFile), 363 db_file(JournalFile, File), 364 ( \+ exists_file(File) 365 -> true 366 ; memberchk(min_size(KB), Options), 367 size_file(JournalFile, Size), 368 Size / 1024 < KB 369 -> true 370 ; create_db(Graph) 371 ). 372 373 /******************************* 374 * LOAD * 375 *******************************/
383load_db :- 384 rdf_directory(Dir), 385 concurrency(Jobs), 386 cpu_stat_key(Jobs, StatKey), 387 get_time(Wall0), 388 statistics(StatKey, T0), 389 load_prefixes(Dir), 390 verbosity(Silent), 391 find_dbs(Dir, Graphs, SnapShots, Journals), 392 length(Graphs, GraphCount), 393 maplist(rdf_unload_graph, Graphs), 394 rdf_statistics(triples(Triples0)), 395 load_sources(snapshots, SnapShots, Silent, Jobs), 396 load_sources(journals, Journals, Silent, Jobs), 397 rdf_statistics(triples(Triples1)), 398 statistics(StatKey, T1), 399 get_time(Wall1), 400 T is T1 - T0, 401 Wall is Wall1 - Wall0, 402 Triples = Triples1 - Triples0, 403 message_level(Silent, Level), 404 print_message(Level, rdf(restore(attached(GraphCount, Triples, T/Wall)))). 405 406load_sources(_, [], _, _) :- !. 407load_sources(Type, Sources, Silent, Jobs) :- 408 length(Sources, Count), 409 RunJobs is min(Count, Jobs), 410 print_message(informational, rdf(restoring(Type, Count, RunJobs))), 411 make_goals(Sources, Silent, 1, Count, Goals), 412 concurrent(RunJobs, Goals, []).
417make_goals([], _, _, _, []). 418make_goals([DB|T0], Silent, I, Total, 419 [load_source(DB, Silent, I, Total)|T]) :- 420 I2 is I + 1, 421 make_goals(T0, Silent, I2, Total, T). 422 423verbosity(Silent) :- 424 rdf_option(silent(Silent)), 425 !. 426verbosity(Silent) :- 427 current_prolog_flag(verbose, silent), 428 !, 429 Silent = true. 430verbosity(brief).
437concurrency(Jobs) :- 438 rdf_option(concurrency(Jobs)), 439 !. 440concurrency(Jobs) :- 441 current_prolog_flag(cpu_count, Jobs), 442 Jobs > 0, 443 !. 444concurrency(1). 445 446cpu_stat_key(1, cputime) :- !. 447cpu_stat_key(_, process_cputime).
db(Size, Ext, DB, DBFile, Depth)
459find_dbs(Dir, Graphs, SnapBySize, JournalBySize) :- 460 directory_files(Dir, Files), 461 phrase(scan_db_files(Files, Dir, '.', 0), Scanned), 462 maplist(db_graph, Scanned, UnsortedGraphs), 463 sort(UnsortedGraphs, Graphs), 464 ( consider_reindex_db(Dir, Graphs, Scanned) 465 -> find_dbs(Dir, Graphs, SnapBySize, JournalBySize) 466 ; partition(db_is_snapshot, Scanned, Snapshots, Journals), 467 sort(Snapshots, SnapBySize), 468 sort(Journals, JournalBySize) 469 ). 470 471consider_reindex_db(Dir, Graphs, Scanned) :- 472 length(Graphs, Count), 473 Count > 0, 474 DepthNeeded is floor(log(Count)/log(256)), 475 ( maplist(depth_db(DepthNow), Scanned) 476 -> ( DepthNeeded > DepthNow 477 -> true 478 ; retractall(rdf_option(directory_levels(_))), 479 assertz(rdf_option(directory_levels(DepthNow))), 480 fail 481 ) 482 ; true 483 ), 484 reindex_db(Dir, DepthNeeded). 485 486db_is_snapshot(Term) :- 487 arg(2, Term, trp). 488 489db_graph(Term, DB) :- 490 arg(3, Term, DB). 491 492db_file_name(Term, File) :- 493 arg(4, Term, File). 494 495depth_db(Depth, DB) :- 496 arg(5, DB, Depth).
db(DB, Size, File)
for all recognised RDF
database files. File is relative to the database directory Dir.503scan_db_files([], _, _, _) --> 504 []. 505scan_db_files([Nofollow|T], Dir, Prefix, Depth) --> 506 { nofollow(Nofollow) }, 507 !, 508 scan_db_files(T, Dir, Prefix, Depth). 509scan_db_files([File|T], Dir, Prefix, Depth) --> 510 { file_name_extension(Base, Ext, File), 511 db_extension(Ext), 512 !, 513 rdf_db_to_file(DB, Base), 514 directory_file_path(Prefix, File, DBFile), 515 directory_file_path(Dir, DBFile, AbsFile), 516 size_file(AbsFile, Size) 517 }, 518 [ db(Size, Ext, DB, AbsFile, Depth) ], 519 scan_db_files(T, Dir, Prefix, Depth). 520scan_db_files([D|T], Dir, Prefix, Depth) --> 521 { directory_file_path(Prefix, D, SubD), 522 directory_file_path(Dir, SubD, AbsD), 523 exists_directory(AbsD), 524 \+ read_link(AbsD, _, _), % Do not follow links 525 !, 526 directory_files(AbsD, SubFiles), 527 SubDepth is Depth + 1 528 }, 529 scan_db_files(SubFiles, Dir, SubD, SubDepth), 530 scan_db_files(T, Dir, Prefix, Depth). 531scan_db_files([_|T], Dir, Prefix, Depth) --> 532 scan_db_files(T, Dir, Prefix, Depth). 533 534nofollow(.). 535nofollow(..). 536 537db_extension(trp). 538db_extension(jrn). 539 540:- public load_source/4. % called through make_goals/5 541 542load_source(DB, Silent, Nth, Total) :- 543 db_file_name(DB, File), 544 db_graph(DB, Graph), 545 message_level(Silent, Level), 546 graph_triple_count(Graph, Count0), 547 statistics(cputime, T0), 548 ( db_is_snapshot(DB) 549 -> print_message(Level, rdf(restore(Silent, snapshot(Graph, File)))), 550 rdf_load_db(File) 551 ; print_message(Level, rdf(restore(Silent, journal(Graph, File)))), 552 load_journal(File, Graph) 553 ), 554 statistics(cputime, T1), 555 T is T1 - T0, 556 graph_triple_count(Graph, Count1), 557 Count is Count1 - Count0, 558 print_message(Level, rdf(restore(Silent, 559 done(Graph, T, Count, Nth, Total)))). 560 561 562graph_triple_count(Graph, Count) :- 563 rdf_statistics(triples_by_graph(Graph, Count)), 564 !. 565graph_triple_count(_, 0).
573attach_graph(Graph, Options) :- 574 ( option(silent(true), Options) 575 -> Level = silent 576 ; Level = informational 577 ), 578 db_files(Graph, SnapshotFile, JournalFile), 579 rdf_retractall(_,_,_,Graph), 580 statistics(cputime, T0), 581 print_message(Level, rdf(restore(Silent, Graph))), 582 db_file(SnapshotFile, AbsSnapShot), 583 ( exists_file(AbsSnapShot) 584 -> print_message(Level, rdf(restore(Silent, snapshot(SnapshotFile)))), 585 rdf_load_db(AbsSnapShot) 586 ; true 587 ), 588 ( exists_db(JournalFile) 589 -> print_message(Level, rdf(restore(Silent, journal(JournalFile)))), 590 load_journal(JournalFile, Graph) 591 ; true 592 ), 593 statistics(cputime, T1), 594 T is T1 - T0, 595 ( rdf_statistics(triples_by_graph(Graph, Count)) 596 -> true 597 ; Count = 0 598 ), 599 print_message(Level, rdf(restore(Silent, 600 done(Graph, T, Count)))). 601 602message_level(true, silent) :- !. 603message_level(_, informational). 604 605 606 /******************************* 607 * LOAD JOURNAL * 608 *******************************/
615load_journal(File, DB) :- 616 rdf_create_graph(DB), 617 setup_call_cleanup( 618 open(File, read, In, [encoding(utf8)]), 619 ( read(In, T0), 620 process_journal(T0, In, DB) 621 ), 622 close(In)). 623 624process_journal(end_of_file, _, _) :- !. 625process_journal(Term, In, DB) :- 626 ( process_journal_term(Term, DB) 627 -> true 628 ; throw(error(type_error(journal_term, Term), _)) 629 ), 630 read(In, T2), 631 process_journal(T2, In, DB). 632 633process_journal_term(assert(S,P,O), DB) :- 634 rdf_assert(S,P,O,DB). 635process_journal_term(assert(S,P,O,Line), DB) :- 636 rdf_assert(S,P,O,DB:Line). 637process_journal_term(retract(S,P,O), DB) :- 638 rdf_retractall(S,P,O,DB). 639process_journal_term(retract(S,P,O,Line), DB) :- 640 rdf_retractall(S,P,O,DB:Line). 641process_journal_term(update(S,P,O,Action), DB) :- 642 ( rdf_update(S,P,O,DB, Action) 643 -> true 644 ; print_message(warning, rdf(update_failed(S,P,O,Action))) 645 ). 646process_journal_term(start(_), _). % journal open/close 647process_journal_term(end(_), _). 648process_journal_term(begin(_), _). % logged transaction (compatibility) 649process_journal_term(end, _). 650process_journal_term(begin(_,_,_,_), _). % logged transaction (current) 651process_journal_term(end(_,_,_), _). 652 653 654 /******************************* 655 * CREATE JOURNAL * 656 *******************************/ 657 658:- dynamic 659 blocked_db/2, % DB, Reason 660 transaction_message/3, % Nesting, Time, Message 661 transaction_db/3. % Nesting, DB, Id
false
kills the persistent state. Switching to true
creates it.668rdf_persistency(DB, Bool) :- 669 must_be(atom, DB), 670 must_be(boolean, Bool), 671 fail. 672rdf_persistency(DB, false) :- 673 !, 674 ( blocked_db(DB, persistency) 675 -> true 676 ; assert(blocked_db(DB, persistency)), 677 delete_db(DB) 678 ). 679rdf_persistency(DB, true) :- 680 ( retract(blocked_db(DB, persistency)) 681 -> create_db(DB) 682 ; true 683 ).
689:- multifile 690 rdf_db:property_of_graph/2. 691 692rdf_dbproperty_of_graph(persistent(State), Graph) :- 693 ( blocked_db(Graph, persistency) 694 -> State = false 695 ; State = true 696 ).
705start_monitor :- 706 rdf_monitor(monitor, 707 [ -assert(load) 708 ]). 709stop_monitor :- 710 rdf_monitor(monitor, 711 [ -all 712 ]).
rdf_db.pl
that deal with
database changes are serialized. They do come from different
threads though.721monitor(Msg) :- 722 debug(monitor, 'Monitor: ~p~n', [Msg]), 723 fail. 724monitor(assert(S,P,O,DB:Line)) :- 725 !, 726 \+ blocked_db(DB, _), 727 journal_fd(DB, Fd), 728 open_transaction(DB, Fd), 729 format(Fd, '~q.~n', [assert(S,P,O,Line)]), 730 sync_journal(DB, Fd). 731monitor(assert(S,P,O,DB)) :- 732 \+ blocked_db(DB, _), 733 journal_fd(DB, Fd), 734 open_transaction(DB, Fd), 735 format(Fd, '~q.~n', [assert(S,P,O)]), 736 sync_journal(DB, Fd). 737monitor(retract(S,P,O,DB:Line)) :- 738 !, 739 \+ blocked_db(DB, _), 740 journal_fd(DB, Fd), 741 open_transaction(DB, Fd), 742 format(Fd, '~q.~n', [retract(S,P,O,Line)]), 743 sync_journal(DB, Fd). 744monitor(retract(S,P,O,DB)) :- 745 \+ blocked_db(DB, _), 746 journal_fd(DB, Fd), 747 open_transaction(DB, Fd), 748 format(Fd, '~q.~n', [retract(S,P,O)]), 749 sync_journal(DB, Fd). 750monitor(update(S,P,O,DB:Line,Action)) :- 751 !, 752 \+ blocked_db(DB, _), 753 ( Action = graph(NewDB) 754 -> monitor(assert(S,P,O,NewDB)), 755 monitor(retract(S,P,O,DB:Line)) 756 ; journal_fd(DB, Fd), 757 format(Fd, '~q.~n', [update(S,P,O,Action)]), 758 sync_journal(DB, Fd) 759 ). 760monitor(update(S,P,O,DB,Action)) :- 761 \+ blocked_db(DB, _), 762 ( Action = graph(NewDB) 763 -> monitor(assert(S,P,O,NewDB)), 764 monitor(retract(S,P,O,DB)) 765 ; journal_fd(DB, Fd), 766 open_transaction(DB, Fd), 767 format(Fd, '~q.~n', [update(S,P,O,Action)]), 768 sync_journal(DB, Fd) 769 ). 770monitor(load(BE, _DumpFileURI)) :- 771 ( BE = end(Graphs) 772 -> sync_loaded_graphs(Graphs) 773 ; true 774 ). 775monitor(create_graph(Graph)) :- 776 \+ blocked_db(Graph, _), 777 journal_fd(Graph, Fd), 778 open_transaction(Graph, Fd), 779 sync_journal(Graph, Fd). 780monitor(reset) :- 781 forall(rdf_graph(Graph), delete_db(Graph)). 782 % TBD: Remove empty directories? 783 784monitor(transaction(BE, Id)) :- 785 monitor_transaction(Id, BE). 786 787monitor_transaction(load_journal(DB), begin(_)) :- 788 !, 789 assert(blocked_db(DB, journal)). 790monitor_transaction(load_journal(DB), end(_)) :- 791 !, 792 retractall(blocked_db(DB, journal)). 793 794monitor_transaction(parse(URI), begin(_)) :- 795 !, 796 ( blocked_db(URI, persistency) 797 -> true 798 ; assert(blocked_db(URI, parse)) 799 ). 800monitor_transaction(parse(URI), end(_)) :- 801 !, 802 ( retract(blocked_db(URI, parse)) 803 -> create_db(URI) 804 ; true 805 ). 806monitor_transaction(unload(DB), begin(_)) :- 807 !, 808 ( blocked_db(DB, persistency) 809 -> true 810 ; assert(blocked_db(DB, unload)) 811 ). 812monitor_transaction(unload(DB), end(_)) :- 813 !, 814 ( retract(blocked_db(DB, unload)) 815 -> delete_db(DB) 816 ; true 817 ). 818monitor_transaction(log(Msg), begin(N)) :- 819 !, 820 check_nested(N), 821 get_time(Time), 822 asserta(transaction_message(N, Time, Msg)). 823monitor_transaction(log(_), end(N)) :- 824 check_nested(N), 825 retract(transaction_message(N, _, _)), 826 !, 827 findall(DB:Id, retract(transaction_db(N, DB, Id)), DBs), 828 end_transactions(DBs, N). 829monitor_transaction(log(Msg, DB), begin(N)) :- 830 !, 831 check_nested(N), 832 get_time(Time), 833 asserta(transaction_message(N, Time, Msg)), 834 journal_fd(DB, Fd), 835 open_transaction(DB, Fd). 836monitor_transaction(log(Msg, _DB), end(N)) :- 837 monitor_transaction(log(Msg), end(N)).
log_nested_transactions(true)
is defined.846check_nested(0) :- !. 847check_nested(_) :- 848 rdf_option(log_nested_transactions(true)).
begin(Id, Level, Time, Message)
term if a transaction
involves DB. Id is an incremental integer, where each database
has its own counter. Level is the nesting level, Time a floating
point timestamp and Message te message provided as argument to
the log message.859open_transaction(DB, Fd) :- 860 transaction_message(N, Time, Msg), 861 !, 862 ( transaction_db(N, DB, _) 863 -> true 864 ; next_transaction_id(DB, Id), 865 assert(transaction_db(N, DB, Id)), 866 RoundedTime is round(Time*100)/100, 867 format(Fd, '~q.~n', [begin(Id, N, RoundedTime, Msg)]) 868 ). 869open_transaction(_,_).
880:- dynamic 881 current_transaction_id/2. 882 883next_transaction_id(DB, Id) :- 884 retract(current_transaction_id(DB, Last)), 885 !, 886 Id is Last + 1, 887 assert(current_transaction_id(DB, Id)). 888next_transaction_id(DB, Id) :- 889 db_files(DB, _, Journal), 890 exists_file(Journal), 891 !, 892 size_file(Journal, Size), 893 open_db(Journal, read, In, []), 894 call_cleanup(iterative_expand(In, Size, Last), close(In)), 895 Id is Last + 1, 896 assert(current_transaction_id(DB, Id)). 897next_transaction_id(DB, 1) :- 898 assert(current_transaction_id(DB, 1)). 899 900iterative_expand(_, 0, 0) :- !. 901iterative_expand(In, Size, Last) :- % Scan growing sections from the end 902 Max is floor(log(Size)/log(2)), 903 between(10, Max, Step), 904 Offset is -(1<<Step), 905 seek(In, Offset, eof, _), 906 skip(In, 10), % records are line-based 907 read(In, T0), 908 last_transaction_id(T0, In, 0, Last), 909 Last > 0, 910 !. 911iterative_expand(In, _, Last) :- % Scan the whole file 912 seek(In, 0, bof, _), 913 read(In, T0), 914 last_transaction_id(T0, In, 0, Last). 915 916last_transaction_id(end_of_file, _, Last, Last) :- !. 917last_transaction_id(end(Id, _, _), In, _, Last) :- 918 read(In, T1), 919 last_transaction_id(T1, In, Id, Last). 920last_transaction_id(_, In, Id, Last) :- 921 read(In, T1), 922 last_transaction_id(T1, In, Id, Last).
In each database, the transaction is ended with a term end(Id,
Nesting, Others)
, where Id and Nesting are the transaction
identifier and nesting (see open_transaction/2) and Others is a
list of DB:Id, indicating other databases affected by the
transaction.
937end_transactions(DBs, N) :- 938 end_transactions(DBs, DBs, N). 939 940end_transactions([], _, _). 941end_transactions([DB:Id|T], DBs, N) :- 942 journal_fd(DB, Fd), 943 once(select(DB:Id, DBs, Others)), 944 format(Fd, 'end(~q, ~q, ~q).~n', [Id, N, Others]), 945 sync_journal(DB, Fd), 946 end_transactions(T, DBs, N).
954sync_loaded_graphs(Graphs) :- 955 maplist(create_db, Graphs). 956 957 958 /******************************* 959 * JOURNAL FILES * 960 *******************************/
max_open_journals
option.
Then the journal is opened in append
mode. Journal files are
always encoded as UTF-8 for portability as well as to ensure
full coverage of Unicode.970journal_fd(DB, Fd) :- 971 source_journal_fd(DB, Fd), 972 !. 973journal_fd(DB, Fd) :- 974 with_mutex(rdf_journal_file, 975 journal_fd_(DB, Out)), 976 Fd = Out. 977 978journal_fd_(DB, Fd) :- 979 source_journal_fd(DB, Fd), 980 !. 981journal_fd_(DB, Fd) :- 982 limit_fd_pool, 983 db_files(DB, _Snapshot, Journal), 984 open_db(Journal, append, Fd, 985 [ close_on_abort(false) 986 ]), 987 time_stamp(Now), 988 format(Fd, '~q.~n', [start([time(Now)])]), 989 assert(source_journal_fd(DB, Fd)). % new one at the end
998limit_fd_pool :- 999 predicate_property(source_journal_fd(_, _), number_of_clauses(N)), 1000 !, 1001 ( rdf_option(max_open_journals(Max)) 1002 -> true 1003 ; Max = 10 1004 ), 1005 Close is N - Max, 1006 forall(between(1, Close, _), 1007 close_oldest_journal). 1008limit_fd_pool. 1009 1010close_oldest_journal :- 1011 source_journal_fd(DB, _Fd), 1012 !, 1013 debug(rdf_persistency, 'Closing old journal for ~q', [DB]), 1014 close_journal(DB). 1015close_oldest_journal.
1024sync_journal(DB, _) :- 1025 transaction_db(_, DB, _), 1026 !. 1027sync_journal(_, Fd) :- 1028 flush_output(Fd).
1034close_journal(DB) :- 1035 with_mutex(rdf_journal_file, 1036 close_journal_(DB)). 1037 1038close_journal_(DB) :- 1039 ( retract(source_journal_fd(DB, Fd)) 1040 -> time_stamp(Now), 1041 format(Fd, '~q.~n', [end([time(Now)])]), 1042 close(Fd, [force(true)]) 1043 ; true 1044 ).
1050close_journals :-
1051 forall(source_journal_fd(DB, _),
1052 catch(close_journal(DB), E,
1053 print_message(error, E))).
1060create_db(Graph) :- 1061 \+ rdf(_,_,_,Graph), 1062 !, 1063 debug(rdf_persistency, 'Deleting empty Graph ~w', [Graph]), 1064 delete_db(Graph). 1065create_db(Graph) :- 1066 debug(rdf_persistency, 'Saving Graph ~w', [Graph]), 1067 close_journal(Graph), 1068 db_abs_files(Graph, Snapshot, Journal), 1069 atom_concat(Snapshot, '.new', NewSnapshot), 1070 ( catch(( create_directory_levels(Snapshot), 1071 rdf_save_db(NewSnapshot, Graph) 1072 ), Error, 1073 ( print_message(warning, Error), 1074 fail 1075 )) 1076 -> ( exists_file(Journal) 1077 -> delete_file(Journal) 1078 ; true 1079 ), 1080 rename_file(NewSnapshot, Snapshot), 1081 debug(rdf_persistency, 'Saved Graph ~w', [Graph]) 1082 ; catch(delete_file(NewSnapshot), _, true) 1083 ).
1090delete_db(DB) :- 1091 with_mutex(rdf_journal_file, 1092 delete_db_(DB)). 1093 1094delete_db_(DB) :- 1095 close_journal_(DB), 1096 db_abs_files(DB, Snapshot, Journal), 1097 !, 1098 ( exists_file(Journal) 1099 -> delete_file(Journal) 1100 ; true 1101 ), 1102 ( exists_file(Snapshot) 1103 -> delete_file(Snapshot) 1104 ; true 1105 ). 1106delete_db_(_). 1107 1108 /******************************* 1109 * LOCKING * 1110 *******************************/
1116lock_db(Dir) :- 1117 lockfile(Dir, File), 1118 catch(open(File, update, Out, [lock(write), wait(false)]), 1119 error(permission_error(Access, _, _), _), 1120 locked_error(Access, Dir)), 1121 ( current_prolog_flag(pid, PID) 1122 -> true 1123 ; PID = 0 % TBD: Fix in Prolog 1124 ), 1125 time_stamp(Now), 1126 gethostname(Host), 1127 format(Out, '/* RDF Database is in use */~n~n', []), 1128 format(Out, '~q.~n', [ locked([ time(Now), 1129 pid(PID), 1130 host(Host) 1131 ]) 1132 ]), 1133 flush_output(Out), 1134 set_end_of_stream(Out), 1135 assert(rdf_lock(Dir, lock(Out, File))), 1136 at_halt(unlock_db(Dir)). 1137 1138locked_error(lock, Dir) :- 1139 lockfile(Dir, File), 1140 ( catch(read_file_to_terms(File, Terms, []), _, fail), 1141 Terms = [locked(Args)] 1142 -> Context = rdf_locked(Args) 1143 ; Context = context(_, 'Database is in use') 1144 ), 1145 throw(error(permission_error(lock, rdf_db, Dir), Context)). 1146locked_error(open, Dir) :- 1147 throw(error(permission_error(lock, rdf_db, Dir), 1148 context(_, 'Lock file cannot be opened'))).
1153unlock_db(Dir) :- 1154 retract(rdf_lock(Dir, lock(Out, File))), 1155 !, 1156 unlock_db(Out, File). 1157unlock_db(_). 1158 1159unlock_db(Out, File) :- 1160 close(Out), 1161 delete_file(File). 1162 1163 /******************************* 1164 * FILENAMES * 1165 *******************************/ 1166 1167lockfile(Dir, LockFile) :- 1168 atomic_list_concat([Dir, /, lock], LockFile). 1169 1170directory_levels(Levels) :- 1171 rdf_option(directory_levels(Levels)), 1172 !. 1173directory_levels(2). 1174 1175db_file(Base, File) :- 1176 rdf_directory(Dir), 1177 directory_levels(Levels), 1178 db_file(Dir, Base, Levels, File). 1179 1180db_file(Dir, Base, Levels, File) :- 1181 dir_levels(Base, Levels, Segments, [Base]), 1182 atomic_list_concat([Dir|Segments], /, File). 1183 1184open_db(Base, Mode, Stream, Options) :- 1185 db_file(Base, File), 1186 create_directory_levels(File), 1187 open(File, Mode, Stream, [encoding(utf8)|Options]). 1188 1189create_directory_levels(_File) :- 1190 rdf_option(directory_levels(0)), 1191 !. 1192create_directory_levels(File) :- 1193 file_directory_name(File, Dir), 1194 make_directory_path(Dir). 1195 1196exists_db(Base) :- 1197 db_file(Base, File), 1198 exists_file(File).
1205dir_levels(_, 0, Segments, Segments) :- !. 1206dir_levels(File, Levels, Segments, Tail) :- 1207 rdf_atom_md5(File, 1, Hash), 1208 create_dir_levels(Levels, 0, Hash, Segments, Tail). 1209 1210create_dir_levels(0, _, _, Segments, Segments) :- !. 1211create_dir_levels(N, S, Hash, [S1|Segments0], Tail) :- 1212 sub_atom(Hash, S, 2, _, S1), 1213 S2 is S+2, 1214 N2 is N-1, 1215 create_dir_levels(N2, S2, Hash, Segments0, Tail).
1226db_files(DB, Snapshot, Journal) :- 1227 nonvar(DB), 1228 !, 1229 rdf_db_to_file(DB, Base), 1230 atom_concat(Base, '.trp', Snapshot), 1231 atom_concat(Base, '.jrn', Journal). 1232db_files(DB, Snapshot, Journal) :- 1233 nonvar(Snapshot), 1234 !, 1235 atom_concat(Base, '.trp', Snapshot), 1236 atom_concat(Base, '.jrn', Journal), 1237 rdf_db_to_file(DB, Base). 1238db_files(DB, Snapshot, Journal) :- 1239 nonvar(Journal), 1240 !, 1241 atom_concat(Base, '.jrn', Journal), 1242 atom_concat(Base, '.trp', Snapshot), 1243 rdf_db_to_file(DB, Base). 1244 1245db_abs_files(DB, Snapshot, Journal) :- 1246 db_files(DB, Snapshot0, Journal0), 1247 db_file(Snapshot0, Snapshot), 1248 db_file(Journal0, Journal).
1256rdf_journal_file(Graph, Journal) :-
1257 ( var(Graph)
1258 -> rdf_graph(Graph)
1259 ; true
1260 ),
1261 db_abs_files(Graph, _Snapshot, Journal),
1262 exists_file(Journal).
1270rdf_snapshot_file(Graph, Snapshot) :-
1271 ( var(Graph)
1272 -> rdf_graph(Graph) % also pick the empty graphs
1273 ; true
1274 ),
1275 db_abs_files(Graph, Snapshot, _Journal),
1276 exists_file(Snapshot).
1288rdf_db_to_file(DB, File) :- 1289 file_base_db(File, DB), 1290 !. 1291rdf_db_to_file(DB, File) :- 1292 url_to_filename(DB, File), 1293 assert(file_base_db(File, DB)).
1306url_to_filename(URL, FileName) :- 1307 atomic(URL), 1308 !, 1309 atom_codes(URL, Codes), 1310 phrase(url_encode(EncCodes), Codes), 1311 atom_codes(FileName, EncCodes). 1312url_to_filename(URL, FileName) :- 1313 uri_encoded(path, URL, FileName). 1314 1315url_encode([0'+|T]) --> 1316 " ", 1317 !, 1318 url_encode(T). 1319url_encode([C|T]) --> 1320 alphanum(C), 1321 !, 1322 url_encode(T). 1323url_encode([C|T]) --> 1324 no_enc_extra(C), 1325 !, 1326 url_encode(T). 1327url_encode(Enc) --> 1328 ( "\r\n" 1329 ; "\n" 1330 ), 1331 !, 1332 { string_codes("%0D%0A", Codes), 1333 append(Codes, T, Enc) 1334 }, 1335 url_encode(T). 1336url_encode([]) --> 1337 eos, 1338 !. 1339url_encode([0'%,D1,D2|T]) --> 1340 [C], 1341 { Dv1 is (C>>4 /\ 0xf), 1342 Dv2 is (C /\ 0xf), 1343 code_type(D1, xdigit(Dv1)), 1344 code_type(D2, xdigit(Dv2)) 1345 }, 1346 url_encode(T). 1347 1348eos([], []). 1349 1350alphanum(C) --> 1351 [C], 1352 { C < 128, % US-ASCII 1353 code_type(C, alnum) 1354 }. 1355 1356no_enc_extra(0'_) --> "_". 1357 1358 1359 /******************************* 1360 * REINDEX * 1361 *******************************/
1367reindex_db(Dir, Levels) :- 1368 directory_files(Dir, Files), 1369 reindex_files(Files, Dir, '.', 0, Levels), 1370 remove_empty_directories(Files, Dir). 1371 1372reindex_files([], _, _, _, _). 1373reindex_files([Nofollow|Files], Dir, Prefix, CLevel, Levels) :- 1374 nofollow(Nofollow), 1375 !, 1376 reindex_files(Files, Dir, Prefix, CLevel, Levels). 1377reindex_files([File|Files], Dir, Prefix, CLevel, Levels) :- 1378 CLevel \== Levels, 1379 file_name_extension(_Base, Ext, File), 1380 db_extension(Ext), 1381 !, 1382 directory_file_path(Prefix, File, DBFile), 1383 directory_file_path(Dir, DBFile, OldPath), 1384 db_file(Dir, File, Levels, NewPath), 1385 debug(rdf_persistency, 'Rename ~q --> ~q', [OldPath, NewPath]), 1386 file_directory_name(NewPath, NewDir), 1387 make_directory_path(NewDir), 1388 rename_file(OldPath, NewPath), 1389 reindex_files(Files, Dir, Prefix, CLevel, Levels). 1390reindex_files([D|Files], Dir, Prefix, CLevel, Levels) :- 1391 directory_file_path(Prefix, D, SubD), 1392 directory_file_path(Dir, SubD, AbsD), 1393 exists_directory(AbsD), 1394 \+ read_link(AbsD, _, _), % Do not follow links 1395 !, 1396 directory_files(AbsD, SubFiles), 1397 CLevel2 is CLevel + 1, 1398 reindex_files(SubFiles, Dir, SubD, CLevel2, Levels), 1399 reindex_files(Files, Dir, Prefix, CLevel, Levels). 1400reindex_files([_|Files], Dir, Prefix, CLevel, Levels) :- 1401 reindex_files(Files, Dir, Prefix, CLevel, Levels). 1402 1403 1404remove_empty_directories([], _). 1405remove_empty_directories([File|Files], Dir) :- 1406 \+ nofollow(File), 1407 directory_file_path(Dir, File, Path), 1408 exists_directory(Path), 1409 \+ read_link(Path, _, _), 1410 !, 1411 directory_files(Path, Content), 1412 exclude(nofollow, Content, RealContent), 1413 ( RealContent == [] 1414 -> debug(rdf_persistency, 'Remove empty dir ~q', [Path]), 1415 delete_directory(Path) 1416 ; remove_empty_directories(RealContent, Path) 1417 ), 1418 remove_empty_directories(Files, Dir). 1419remove_empty_directories([_|Files], Dir) :- 1420 remove_empty_directories(Files, Dir). 1421 1422 1423 /******************************* 1424 * PREFIXES * 1425 *******************************/ 1426 1427save_prefixes(Dir) :- 1428 atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile), 1429 setup_call_cleanup(open(PrefixFile, write, Out, [encoding(utf8)]), 1430 write_prefixes(Out), 1431 close(Out)). 1432 1433write_prefixes(Out) :- 1434 format(Out, '% Snapshot of defined RDF prefixes~n~n', []), 1435 forall(rdf_current_ns(Alias, URI), 1436 format(Out, 'prefix(~q, ~q).~n', [Alias, URI])).
1446load_prefixes(Dir) :- 1447 atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile), 1448 ( exists_file(PrefixFile) 1449 -> setup_call_cleanup(open(PrefixFile, read, In, [encoding(utf8)]), 1450 read_prefixes(In), 1451 close(In)) 1452 ; true 1453 ). 1454 1455read_prefixes(Stream) :- 1456 read_term(Stream, T0, []), 1457 read_prefixes(T0, Stream). 1458 1459read_prefixes(end_of_file, _) :- !. 1460read_prefixes(prefix(Alias, URI), Stream) :- 1461 !, 1462 must_be(atom, Alias), 1463 must_be(atom, URI), 1464 catch(rdf_register_ns(Alias, URI, []), E, 1465 print_message(warning, E)), 1466 read_term(Stream, T, []), 1467 read_prefixes(T, Stream). 1468read_prefixes(Term, _) :- 1469 domain_error(prefix_term, Term). 1470 1471 1472 /******************************* 1473 * UTIL * 1474 *******************************/
1480mkdir(Directory) :- 1481 exists_directory(Directory), 1482 !. 1483mkdir(Directory) :- 1484 make_directory(Directory).
1490time_stamp(Int) :- 1491 get_time(Now), 1492 Int is round(Now). 1493 1494 1495 /******************************* 1496 * MESSAGES * 1497 *******************************/ 1498 1499:- multifile 1500 prolog:message/3, 1501 prolog:message_context/3. 1502 1503prologmessage(rdf(Term)) --> 1504 message(Term). 1505 1506message(restoring(Type, Count, Jobs)) --> 1507 [ 'Restoring ~D ~w using ~D concurrent workers'-[Count, Type, Jobs] ]. 1508message(restore(attached(Graphs, Triples, Time/Wall))) --> 1509 { catch(Percent is round(100*Time/Wall), _, Percent = 0) }, 1510 [ 'Loaded ~D graphs (~D triples) in ~2f sec. (~d% CPU = ~2f sec.)'- 1511 [Graphs, Triples, Wall, Percent, Time] ]. 1512% attach_graph/2 1513message(restore(true, Action)) --> 1514 !, 1515 silent_message(Action). 1516message(restore(brief, Action)) --> 1517 !, 1518 brief_message(Action). 1519message(restore(_, Graph)) --> 1520 [ 'Restoring ~p ... '-[Graph], flush ]. 1521message(restore(_, snapshot(_))) --> 1522 [ at_same_line, '(snapshot) '-[], flush ]. 1523message(restore(_, journal(_))) --> 1524 [ at_same_line, '(journal) '-[], flush ]. 1525message(restore(_, done(_, Time, Count))) --> 1526 [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ]. 1527% load_source/4 1528message(restore(_, snapshot(G, _))) --> 1529 [ 'Restoring ~p\t(snapshot)'-[G], flush ]. 1530message(restore(_, journal(G, _))) --> 1531 [ 'Restoring ~p\t(journal)'-[G], flush ]. 1532message(restore(_, done(_, Time, Count))) --> 1533 [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ]. 1534% journal handling 1535message(update_failed(S,P,O,Action)) --> 1536 [ 'Failed to update <~p ~p ~p> with ~p'-[S,P,O,Action] ]. 1537% directory reindexing 1538message(reindex(Count, Depth)) --> 1539 [ 'Restructuring database with ~d levels (~D graphs)'-[Depth, Count] ]. 1540message(reindex(Depth)) --> 1541 [ 'Fixing database directory structure (~d levels)'-[Depth] ]. 1542message(read_only) --> 1543 [ 'Cannot write persistent store; continuing in read-only mode.', nl, 1544 'All changes to the RDF store will be lost if this process terminates.' 1545 ]. 1546 1547silent_message(_Action) --> []. 1548 1549brief_message(done(Graph, _Time, _Count, Nth, Total)) --> 1550 { file_base_name(Graph, Base) }, 1551 [ at_same_line, 1552 '\r~p~`.t ~D of ~D graphs~72|'-[Base, Nth, Total], 1553 flush 1554 ]. 1555brief_message(_) --> []. 1556 1557 1558prologmessage_context(rdf_locked(Args)) --> 1559 { memberchk(time(Time), Args), 1560 memberchk(pid(Pid), Args), 1561 format_time(string(S), '%+', Time) 1562 }, 1563 [ nl, 1564 'locked at ~s by process id ~w'-[S,Pid] 1565 ]
RDF persistency plugin
This module provides persistency for
rdf_db.pl
based on the rdf_monitor/2 predicate to track changes to the repository. Where previous versions used autosafe of the whole database using the quick-load format of rdf_db, this version is based on a quick-load file per source (4th argument of rdf/4), and journalling for edit operations.The result is safe, avoids frequent small changes to large files which makes synchronisation and backup expensive and avoids long disruption of the server doing the autosafe. Only loading large files disrupts service for some time.
The persistent backup of the database is realised in a directory, using a lock file to avoid corruption due to concurrent access. Each source is represented by two files, the latest snapshot and a journal. The state is restored by loading the snapshot and replaying the journal. The predicate rdf_flush_journals/1 can be used to create fresh snapshots and delete the journals.
rdf_edit.pl
*/