implement aggregate_locking design

Now aggregation will not lock the wiki. Any changes made during aggregaton are
merged in with the changed state accumulated while aggregating. A separate
lock file prevents multiple concurrent aggregators. Garbage collection
of orphaned guids is much improved. loadstate() is only called once
per process, so tricky support for reloading wiki state is not needed.

(Tested fairly thuroughly.)
master
Joey Hess 2008-02-03 16:48:26 -05:00
parent 38affb0c1c
commit 9d54cc4659
4 changed files with 172 additions and 104 deletions

View File

@ -33,33 +33,62 @@ sub getopt () { #{{{
sub checkconfig () { #{{{ sub checkconfig () { #{{{
if ($config{aggregate} && ! ($config{post_commit} && if ($config{aggregate} && ! ($config{post_commit} &&
IkiWiki::commit_hook_enabled())) { IkiWiki::commit_hook_enabled())) {
if (! IkiWiki::lockwiki(0)) { # See if any feeds need aggregation.
debug("wiki is locked by another process, not aggregating");
exit 1;
}
loadstate(); loadstate();
IkiWiki::loadindex(); my @feeds=needsaggregate();
aggregate(); return unless @feeds;
expire(); if (! lockaggregate()) {
savestate(); debug("an aggregation process is already running");
clearstate(); return;
}
# force a later rebuild of source pages
$IkiWiki::forcerebuild{$_->{sourcepage}}=1
foreach @feeds;
IkiWiki::unlockwiki(); # Fork a child process to handle the aggregation.
# The parent process will then handle building the
# result. This avoids messy code to clear state
# accumulated while aggregating.
defined(my $pid = fork) or error("Can't fork: $!");
if (! $pid) {
IkiWiki::loadindex();
# Aggregation happens without the main wiki lock
# being held. This allows editing pages etc while
# aggregation is running.
aggregate(@feeds);
IkiWiki::lockwiki;
# Merge changes, since aggregation state may have
# changed on disk while the aggregation was happening.
mergestate();
expire();
savestate();
IkiWiki::unlockwiki;
exit 0;
}
waitpid($pid,0);
if ($?) {
error "aggregation failed with code $?";
}
clearstate();
unlockaggregate();
} }
} #}}} } #}}}
sub needsbuild (@) { #{{{ sub needsbuild (@) { #{{{
my $needsbuild=shift; my $needsbuild=shift;
loadstate(); # if not already loaded loadstate();
foreach my $feed (values %feeds) { foreach my $feed (values %feeds) {
if (exists $pagesources{$feed->{sourcepage}} && if (exists $pagesources{$feed->{sourcepage}} &&
grep { $_ eq $pagesources{$feed->{sourcepage}} } @$needsbuild) { grep { $_ eq $pagesources{$feed->{sourcepage}} } @$needsbuild) {
# Mark all feeds originating on this page as removable; # Mark all feeds originating on this page as
# preprocess will unmark those that still exist. # not yet seen; preprocess will unmark those that
remove_feeds($feed->{sourcepage}); # still exist.
markunseen($feed->{sourcepage});
} }
} }
} # }}} } # }}}
@ -92,8 +121,7 @@ sub preprocess (@) { #{{{
$feed->{updateinterval}=defined $params{updateinterval} ? $params{updateinterval} * 60 : 15 * 60; $feed->{updateinterval}=defined $params{updateinterval} ? $params{updateinterval} * 60 : 15 * 60;
$feed->{expireage}=defined $params{expireage} ? $params{expireage} : 0; $feed->{expireage}=defined $params{expireage} ? $params{expireage} : 0;
$feed->{expirecount}=defined $params{expirecount} ? $params{expirecount} : 0; $feed->{expirecount}=defined $params{expirecount} ? $params{expirecount} : 0;
delete $feed->{remove}; delete $feed->{unseen};
delete $feed->{expired};
$feed->{lastupdate}=0 unless defined $feed->{lastupdate}; $feed->{lastupdate}=0 unless defined $feed->{lastupdate};
$feed->{numposts}=0 unless defined $feed->{numposts}; $feed->{numposts}=0 unless defined $feed->{numposts};
$feed->{newposts}=0 unless defined $feed->{newposts}; $feed->{newposts}=0 unless defined $feed->{newposts};
@ -123,16 +151,27 @@ sub delete (@) { #{{{
# Remove feed data for removed pages. # Remove feed data for removed pages.
foreach my $file (@files) { foreach my $file (@files) {
my $page=pagename($file); my $page=pagename($file);
remove_feeds($page); markunseen($page);
}
} #}}}
sub markunseen ($) { #{{{
my $page=shift;
foreach my $id (keys %feeds) {
if ($feeds{$id}->{sourcepage} eq $page) {
$feeds{$id}->{unseen}=1;
}
} }
} #}}} } #}}}
my $state_loaded=0; my $state_loaded=0;
sub loadstate () { #{{{ sub loadstate () { #{{{
return if $state_loaded; return if $state_loaded;
$state_loaded=1; $state_loaded=1;
if (-e "$config{wikistatedir}/aggregate") { if (-e "$config{wikistatedir}/aggregate") {
open(IN, "<", "$config{wikistatedir}/aggregate") || open(IN, "$config{wikistatedir}/aggregate") ||
die "$config{wikistatedir}/aggregate: $!"; die "$config{wikistatedir}/aggregate: $!";
while (<IN>) { while (<IN>) {
$_=IkiWiki::possibly_foolish_untaint($_); $_=IkiWiki::possibly_foolish_untaint($_);
@ -166,32 +205,13 @@ sub loadstate () { #{{{
sub savestate () { #{{{ sub savestate () { #{{{
return unless $state_loaded; return unless $state_loaded;
garbage_collect();
eval q{use HTML::Entities}; eval q{use HTML::Entities};
error($@) if $@; error($@) if $@;
my $newfile="$config{wikistatedir}/aggregate.new"; my $newfile="$config{wikistatedir}/aggregate.new";
my $cleanup = sub { unlink($newfile) }; my $cleanup = sub { unlink($newfile) };
open (OUT, ">", $newfile) || error("open $newfile: $!", $cleanup); open (OUT, ">$newfile") || error("open $newfile: $!", $cleanup);
foreach my $data (values %feeds, values %guids) { foreach my $data (values %feeds, values %guids) {
if ($data->{remove}) {
if ($data->{name}) {
foreach my $guid (values %guids) {
if ($guid->{feed} eq $data->{name}) {
$guid->{remove}=1;
}
}
}
else {
unlink pagefile($data->{page})
if exists $data->{page};
}
next;
}
elsif ($data->{expired} && exists $data->{page}) {
unlink pagefile($data->{page});
delete $data->{page};
delete $data->{md5};
}
my @line; my @line;
foreach my $field (keys %$data) { foreach my $field (keys %$data) {
if ($field eq "name" || $field eq "feed" || if ($field eq "name" || $field eq "feed" ||
@ -212,6 +232,63 @@ sub savestate () { #{{{
error("rename $newfile: $!", $cleanup); error("rename $newfile: $!", $cleanup);
} #}}} } #}}}
sub garbage_collect () { #{{{
foreach my $name (keys %feeds) {
# remove any feeds that were not seen while building the pages
# that used to contain them
if ($feeds{$name}->{unseen}) {
delete $feeds{$name};
}
}
foreach my $guid (values %guids) {
# any guid whose feed is gone should be removed
if (! exists $feeds{$guid->{feed}}) {
unlink pagefile($guid->{page})
if exists $guid->{page};
delete $guids{$guid->{guid}};
}
# handle expired guids
elsif ($guid->{expired} && exists $guid->{page}) {
unlink pagefile($guid->{page});
delete $guid->{page};
delete $guid->{md5};
}
}
} #}}}
sub mergestate () { #{{{
# Load the current state in from disk, and merge into it
# values from the state in memory that might have changed
# during aggregation.
my %myfeeds=%feeds;
my %myguids=%guids;
clearstate();
loadstate();
# All that can change in feed state during aggregation is a few
# fields.
foreach my $name (keys %myfeeds) {
if (exists $feeds{$name}) {
foreach my $field (qw{message lastupdate numposts
newposts error}) {
$feeds{$name}->{$field}=$myfeeds{$name}->{$field};
}
}
}
# New guids can be created during aggregation.
# It's also possible that guids were removed from the on-disk state
# while the aggregation was in process. That would only happen if
# their feed was also removed, so any removed guids added back here
# will be garbage collected later.
foreach my $guid (keys %myguids) {
if (! exists $guids{$guid}) {
$guids{$guid}=$myguids{$guid};
}
}
} #}}}
sub clearstate () { #{{{ sub clearstate () { #{{{
%feeds=(); %feeds=();
%guids=(); %guids=();
@ -249,7 +326,12 @@ sub expire () { #{{{
} }
} #}}} } #}}}
sub aggregate () { #{{{ sub needsaggregate () { #{{{
return values %feeds if $config{rebuild};
return grep { time - $_->{lastupdate} >= $_->{updateinterval} } values %feeds;
} #}}}
sub aggregate (@) { #{{{
eval q{use XML::Feed}; eval q{use XML::Feed};
error($@) if $@; error($@) if $@;
eval q{use URI::Fetch}; eval q{use URI::Fetch};
@ -257,15 +339,12 @@ sub aggregate () { #{{{
eval q{use HTML::Entities}; eval q{use HTML::Entities};
error($@) if $@; error($@) if $@;
foreach my $feed (values %feeds) { foreach my $feed (@_) {
next unless $config{rebuild} ||
time - $feed->{lastupdate} >= $feed->{updateinterval};
$feed->{lastupdate}=time; $feed->{lastupdate}=time;
$feed->{newposts}=0; $feed->{newposts}=0;
$feed->{message}=sprintf(gettext("processed ok at %s"), $feed->{message}=sprintf(gettext("processed ok at %s"),
displaytime($feed->{lastupdate})); displaytime($feed->{lastupdate}));
$feed->{error}=0; $feed->{error}=0;
$IkiWiki::forcerebuild{$feed->{sourcepage}}=1;
debug(sprintf(gettext("checking feed %s ..."), $feed->{name})); debug(sprintf(gettext("checking feed %s ..."), $feed->{name}));
@ -473,18 +552,6 @@ sub htmlabs ($$) { #{{{
return $ret; return $ret;
} #}}} } #}}}
sub remove_feeds () { #{{{
my $page=shift;
my %removed;
foreach my $id (keys %feeds) {
if ($feeds{$id}->{sourcepage} eq $page) {
$feeds{$id}->{remove}=1;
$removed{$id}=1;
}
}
} #}}}
sub pagefile ($) { #{{{ sub pagefile ($) { #{{{
my $page=shift; my $page=shift;

7
debian/changelog vendored
View File

@ -6,19 +6,22 @@ ikiwiki (2.31) UNRELEASED; urgency=low
that contributes to a page's content and using the youngest of them all, that contributes to a page's content and using the youngest of them all,
as well as special cases for things like the version plugin, and it's just as well as special cases for things like the version plugin, and it's just
too complex to do. too complex to do.
* aggregate: Forking a child broke the one state that mattered: Forcing
the aggregating page to be rebuilt. Fix this.
* cgi hooks are now run before ikiwiki state is loaded. * cgi hooks are now run before ikiwiki state is loaded.
* This allows locking the wiki before loading state, which avoids some * This allows locking the wiki before loading state, which avoids some
tricky locking code when saving a web edit. tricky locking code when saving a web edit.
* poll: This plugin turns out to have edited pages w/o doing any locking. * poll: This plugin turns out to have edited pages w/o doing any locking.
Oops. Convert it from a cgi to a sessioncgi hook, which will work Oops. Convert it from a cgi to a sessioncgi hook, which will work
much better. much better.
* aggregate: Revert use of forking to not save state, that was not the right
approach.
* recentchanges: Improve handling of links on the very static changes pages * recentchanges: Improve handling of links on the very static changes pages
by thunking to the CGI, which can redirect to the page, or allow it to be by thunking to the CGI, which can redirect to the page, or allow it to be
created if it doesn't exist. created if it doesn't exist.
* recentchanges: Exipre all *._change pages, even if the directory * recentchanges: Exipre all *._change pages, even if the directory
they're in has changed. they're in has changed.
* aggregate: Lots of changes; aggregation can now run without locking the
wiki, and there is a separate aggregatelock to prevent multiple concurrent
aggregation runs.
-- Joey Hess <joeyh@debian.org> Sat, 02 Feb 2008 23:36:31 -0500 -- Joey Hess <joeyh@debian.org> Sat, 02 Feb 2008 23:36:31 -0500

View File

@ -46,16 +46,12 @@ would be loaded, and there would be no reason to worry about aggregating.
Or aggregation could be kept in checkconfig, like so: Or aggregation could be kept in checkconfig, like so:
* lock wiki
* load aggregation state * load aggregation state
* unlock wiki
* get list of feeds needing aggregation * get list of feeds needing aggregation
* exit if none * exit if none
* attempt to take aggregation lock, exit if another aggregation is happening * attempt to take aggregation lock, exit if another aggregation is happening
* fork a child process to do the aggregation * fork a child process to do the aggregation
* lock wiki
* load wiki state (needed for aggregation to run) * load wiki state (needed for aggregation to run)
* unlock wiki
* aggregate * aggregate
* lock wiki * lock wiki
* reload aggregation state * reload aggregation state
@ -64,3 +60,5 @@ Or aggregation could be kept in checkconfig, like so:
* drop aggregation lock * drop aggregation lock
* force rebuild of sourcepages of feeds that were aggregated * force rebuild of sourcepages of feeds that were aggregated
* exit checkconfig and continue with usual refresh process * exit checkconfig and continue with usual refresh process
[[done]]

View File

@ -8,7 +8,7 @@ msgid ""
msgstr "" msgstr ""
"Project-Id-Version: PACKAGE VERSION\n" "Project-Id-Version: PACKAGE VERSION\n"
"Report-Msgid-Bugs-To: \n" "Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2008-02-03 14:52-0500\n" "POT-Creation-Date: 2008-02-03 16:05-0500\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" "PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n" "Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n" "Language-Team: LANGUAGE <LL@li.org>\n"
@ -67,67 +67,67 @@ msgstr ""
msgid "You are banned." msgid "You are banned."
msgstr "" msgstr ""
#: ../IkiWiki/Plugin/aggregate.pm:72 #: ../IkiWiki/Plugin/aggregate.pm:100
#, perl-format #, perl-format
msgid "missing %s parameter" msgid "missing %s parameter"
msgstr "" msgstr ""
#: ../IkiWiki/Plugin/aggregate.pm:100 #: ../IkiWiki/Plugin/aggregate.pm:127
msgid "new feed" msgid "new feed"
msgstr "" msgstr ""
#: ../IkiWiki/Plugin/aggregate.pm:114 #: ../IkiWiki/Plugin/aggregate.pm:141
msgid "posts" msgid "posts"
msgstr "" msgstr ""
#: ../IkiWiki/Plugin/aggregate.pm:116 #: ../IkiWiki/Plugin/aggregate.pm:143
msgid "new" msgid "new"
msgstr "" msgstr ""
#: ../IkiWiki/Plugin/aggregate.pm:232
#, perl-format
msgid "expiring %s (%s days old)"
msgstr ""
#: ../IkiWiki/Plugin/aggregate.pm:239
#, perl-format
msgid "expiring %s"
msgstr ""
#: ../IkiWiki/Plugin/aggregate.pm:265
#, perl-format
msgid "processed ok at %s"
msgstr ""
#: ../IkiWiki/Plugin/aggregate.pm:270
#, perl-format
msgid "checking feed %s ..."
msgstr ""
#: ../IkiWiki/Plugin/aggregate.pm:275
#, perl-format
msgid "could not find feed at %s"
msgstr ""
#: ../IkiWiki/Plugin/aggregate.pm:290
msgid "feed not found"
msgstr ""
#: ../IkiWiki/Plugin/aggregate.pm:301
#, perl-format
msgid "(invalid UTF-8 stripped from feed)"
msgstr ""
#: ../IkiWiki/Plugin/aggregate.pm:307 #: ../IkiWiki/Plugin/aggregate.pm:307
#, perl-format #, perl-format
msgid "expiring %s (%s days old)"
msgstr ""
#: ../IkiWiki/Plugin/aggregate.pm:314
#, perl-format
msgid "expiring %s"
msgstr ""
#: ../IkiWiki/Plugin/aggregate.pm:343
#, perl-format
msgid "processed ok at %s"
msgstr ""
#: ../IkiWiki/Plugin/aggregate.pm:347
#, perl-format
msgid "checking feed %s ..."
msgstr ""
#: ../IkiWiki/Plugin/aggregate.pm:352
#, perl-format
msgid "could not find feed at %s"
msgstr ""
#: ../IkiWiki/Plugin/aggregate.pm:367
msgid "feed not found"
msgstr ""
#: ../IkiWiki/Plugin/aggregate.pm:378
#, perl-format
msgid "(invalid UTF-8 stripped from feed)"
msgstr ""
#: ../IkiWiki/Plugin/aggregate.pm:384
#, perl-format
msgid "(feed entities escaped)" msgid "(feed entities escaped)"
msgstr "" msgstr ""
#: ../IkiWiki/Plugin/aggregate.pm:313 #: ../IkiWiki/Plugin/aggregate.pm:390
msgid "feed crashed XML::Feed!" msgid "feed crashed XML::Feed!"
msgstr "" msgstr ""
#: ../IkiWiki/Plugin/aggregate.pm:387 #: ../IkiWiki/Plugin/aggregate.pm:464
#, perl-format #, perl-format
msgid "creating new page %s" msgid "creating new page %s"
msgstr "" msgstr ""