Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Changes
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
Revision history for Data-Consumer

0.18 ????
Remove GET_LOCK from MySQL2 - as of 5.7 the GET_LOCK() trick
doesn't work, as a lock request does not free up a previously
held lock.

0.17 Wed Nov 1 2017
Fix a bug with ignored in MySQL2, thanks to Maksym Davydov

Expand Down
6 changes: 4 additions & 2 deletions lib/Data/Consumer.pm
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ ignored.

=item release

This routine is to release any held locks in the object.
This routine is to release any held locks in the object. It is called at the very end of
processing, and by reset.

=item _mark_as

Expand Down Expand Up @@ -721,7 +722,8 @@ Returns an identifier to be used to identify the item acquired.

Release any locks on the currently held item.

Normally there is no need to call this directly.
Normally there is no need to call this directly. It is called by reset() and
at the end of processing.

=cut

Expand Down
140 changes: 59 additions & 81 deletions lib/Data/Consumer/MySQL2.pm
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package Data::Consumer::MySQL2;
use warnings;
use strict;
use DBI;
use Carp qw(confess);
use Carp qw(confess cluck);
use warnings FATAL => 'all';
use base 'Data::Consumer';
use vars qw/$Debug $VERSION $Cmd $Fail/;
Expand All @@ -26,11 +26,11 @@ Data::Consumer::MySQL2 - Data::Consumer implementation for a mysql database tabl

=head1 VERSION

Version 0.17
Version 0.18

=cut

$VERSION= '0.17';
$VERSION= '0.18';

=head1 SYNOPSIS

Expand All @@ -41,7 +41,6 @@ $VERSION= '0.17';
table => 'T',
id_field= > 'id',
flag_field => 'done',
lock_prefix => $worker_name,
unprocessed => 0,
working => 1,
processed => 2,
Expand Down Expand Up @@ -88,17 +87,6 @@ The column name of the primary key of the table being processed
The column name in the table being processed which shows whether
an object is processed or not.

=item lock_prefix => 'my-lock-name'

The prefix to use for the mysql locks. Defaults to C<$0-$table>.

It is B<strongly> recommended that end-users of this module explicitly
specify a lock_prefix in production environments. A multi-process
system relying on mutual exclusion B<will> run into problems when
consuming from the same source if $0 and $table are not identical
between workers. Generally, using the name of the consuming module
should suffice (e.g. Your::Data::Consumer::Worker).

=item unprocessed => 0

The value of the C<flag_field> which indicates that an item is not
Expand Down Expand Up @@ -137,25 +125,32 @@ same time it should ensure that a lock on the id is created.
The query will be executed with the id of the last processed item, followed by the arguments
provided by the C<select_args> property.

=item check_sql
=item acquire_sql

=item acquire_args

These arguments are optional, and will be synthesized from the other values if
not provided.

SQL update query that claims item in the table. If number of rows updated by
this query is 0, item will be skipped.

=item check_args
The query will be executed with the id of the item returned by select_sql,
followed by the arguments provided by the C<acquire_args> property.

These arguments are optional, unless you specify C<select_sql> yourself, in which case
it is required you also specify C<check_sql> as well.
=item release_acquire_sql

SQL select query which can be executed to verify that the item to be processed still has
the expected flag fields set appropriately.
=item release_acquire_args

There is a very annoying and sublte race condition (possibly only in modern MySQL's) which
means that is possible that the query used for C<select_sql> might return an id for a record
which has already been processed. This query is used to avoid that race condition.
These arguments are optional, and will be synthesized from acquire_sql/acquire_args
if not provided.

The query should validate any flag fields or constraints specified in C<select_sql> are
true, it should return only the id of the record to be processed.
SQL update query that releases a claimed item in the table. If number of rows updated by
this query is 0, item will be skipped.

The query will be executed with the id of the item to process, followed by the arguments
provided by the C<check_args> property.
The query will be executed with the id of the item returned by select_sql,
followed by the arguments provided by the C<release_acquire_args> property. If the latter
is not provided then this will be the *list* reverse of the acquire_args.

=item update_sql

Expand All @@ -168,16 +163,6 @@ SQL update query which can be used to change the status the record being process
Will be executed with the arguments provided in update_args followed the new status,
and the id.

=item release_sql

=item release_args

These arguments are optional, and will be synthesized from the other values if not provided.

SQL select query which can be used to clear the currently held lock.

Will be called with the arguments provided in release_args, plust the id.

=back

=cut
Expand All @@ -187,7 +172,7 @@ sub new {
my $self= $class->SUPER::new(); # let Data::Consumer bless the hash

my @bad;
foreach my $opt (qw(unprocessed processed working failed lock_prefix)) {
foreach my $opt (qw(unprocessed processed working failed)) {
if (ref $opts{$opt}) {
push @bad, "option '$opt' is not allowed to be a ref in DC::MySQL2";
} elsif (!defined $opts{$opt}) {
Expand All @@ -214,9 +199,9 @@ sub new {
$opts{flag_field} ||= 'process_state';
$opts{init_id}= 0 unless exists $opts{init_id};

if (!$opts{check_sql} and $opts{select_sql}) {
confess "In $class if you specify 'select_sql' you MUST provide 'check_sql' as well!";
}
if ($opts{lock_prefix}) { cluck "Ignoring 'lock_prefix' this version does not use GET_LOCK()" }
if ($opts{check_sql} || $opts{check_args}) { confess "This version does not support 'check_sql', 'check_args'" }
if ($opts{release_sql} || $opts{release_args}) { confess "This version does not support 'release_sql', 'release_args'" }

unless ( $opts{select_sql} ) {
$opts{select_sql}= do {
Expand All @@ -227,29 +212,33 @@ sub new {
WHERE
$id_field > ?
AND $flag_field = ?
AND GET_LOCK( CONCAT_WS("=", ?, $id_field ), 0) != 0
LIMIT 1
';
s/^\s+//mg;
s/\$(\w+)/$opts{$1} || confess "Option $1 is mandatory"/ge;
$_;
};
$opts{select_args}= [ $opts{unprocessed}, $opts{lock_prefix} ];
$opts{select_args}= [ $opts{unprocessed} ];
}

$opts{check_sql}= do {
unless ( $opts{acquire_sql} ) {
$opts{acquire_sql}= do {
local $_= '
SELECT
$id_field
FROM $table
UPDATE $table
SET $flag_field = ?
WHERE
$id_field = ?
AND $flag_field = ?
$flag_field = ?
AND $id_field = ?
';
s/^\s+//mg;
s/\$(\w+)/$opts{$1} || confess "Option $1 is mandatory"/ge;
$_;
};
$opts{check_args}= [ $opts{unprocessed} ];
$opts{acquire_args}= [ $opts{working}, $opts{unprocessed} ];
}
unless ( exists $opts{release_acquire_sql} ) {
$opts{release_acquire_sql}= $opts{acquire_sql};
$opts{release_acquire_args}= [ reverse @{$opts{acquire_args}} ];
}

$opts{update_sql} ||= do {
Expand All @@ -263,17 +252,7 @@ sub new {
s/\$(\w+)/$opts{$1} || confess "Option $1 is mandatory"/ge;
$_;
};
if ( !$opts{release_sql} ) {
$opts{release_sql}= do {
local $_= '
SELECT RELEASE_LOCK( CONCAT_WS("=", ?, ? ) )
';
s/^\s+//mg;
s/\$(\w+)/$opts{$1} || confess "Option $1 is mandatory"/ge;
$_;
};
$opts{release_args}= [ $opts{lock_prefix} ];
}

%$self= %opts;

return $self;
Expand All @@ -291,7 +270,7 @@ Returns an identifier to be used to identify the item acquired.

=head2 $object->release()

Release any locks on the currently held item.
Release any locks on the currently held item. This is called by reset, and at the end of processing.

Normally there is no need to call this directly.

Expand Down Expand Up @@ -322,22 +301,19 @@ sub acquire {
while (1) {
$self->debug_warn( 5, "last_id was $self->{last_id}");
my ($id)= $dbh->selectrow_array( $self->{select_sql}, undef, $self->{last_id}, @{ $self->{select_args} || [] } );
$self->{last_id}= $id;
if ( defined $id ) {
if ( $self->is_ignored($id) ) {
$self->{last_id}= $id;
next;
}
my ($got_id) = $dbh->selectrow_array( $self->{check_sql}, undef, $id, @{ $self->{check_args} || [] } );
if ( not defined $got_id) {
$self->debug_warn(5, "race condition avoided for '$id', check_sql and select_sql did not line up!");
next if $self->is_ignored($id);

my $claimed = $dbh->do( $self->{acquire_sql}, undef, @{ $self->{acquire_args} || [] }, $id );
if ($claimed != 1) {
$self->debug_warn(5, "failed to claim '$id', acquire_sql updated $claimed rows");
next;
}
$self->{last_lock}= $id;
$self->debug_warn( 5, "acquired '$id'" );
} else {
$self->debug_warn( 5, "acquire failed -- resource has been exhausted" );
}
$self->{last_id}= $id;
last;
}
return $self->{last_id};
Expand All @@ -347,17 +323,19 @@ sub acquire {
sub release {
my $self= shift;

return 0 unless exists $self->{last_lock};
my $id= delete $self->{last_id};

return 0 unless defined $id && $self->{release_acquire_sql};

my $res=
$self->{dbh}
->do( $self->{release_sql}, undef, @{ $self->{release_args} || [] }, $self->{last_lock} );
defined $res
or $self->error( "Failed to execute '$self->{release_sql}' with args '$self->{last_lock}': "
. $self->{dbh}->errstr() );
my $dbh= $self->{dbh};

my $released = $dbh->do( $self->{release_acquire_sql}, undef, @{ $self->{release_acquire_args} || [] }, $id );
if ($released != 1) {
$self->debug_warn(5, "failed to release '$id', release_acquire_sql updated $released rows");
} else {
$self->debug_warn( 5, "released '$id'" );
}

$self->debug_warn( 5, "release lock '$self->{last_lock}' status: $res" ); # XXX
delete $self->{last_lock};
return 1;
}

Expand Down