Async queries with DBD::Pg fail with: Cannot execute until previous async query has finished
I'm using CentOS 5.5 Linux (same as Redhat 5.5) with stock perl v5.8.8 and have installed DBD-Pg-2.17.1 via CPAN shell and I'm using postgresql-server-8.4.5-1PGDG.rhel5 and friends.
I have prepared a simple test case demonstrating my problem - it is listed at the bottom.
My code works ok, when I remove {pg_async => PG_ASYNC}
My background is that I have a small Facebook game running as a non-forking Unix daemon with IO::Poll. I would like to add some statistics for the players, but I don't want to throttle my poll-loop, so I would like to send mostly INSERT/UPDATE commands asynchronously and I don't need any return values from the database - because for reading/displaying statistics I will have separate web-scripts.
Surprisingly I get the error message DBD::Pg::st execute failed: Cannot execute until previous async query has finished even though I'm not using PG_OLDQUERY_WAIT
Here is my code (My daemon should reconnect to PostgreSQL whenever connection is lost, that is why I use *_cached methods and do not exit on eval { .... } exceptions):
#!/usr/bin/perl -w
use strict;
use DBI;
use DBD::Pg qw(:async);
use constant DBNAME => 'snake';
use constant DBUSER => 'snake';
use constant DBPASS => 'snake';
use constant SQL_CREATE_TABLES => q{
/*
create table pref_users (
id varchar(32) primary key,
first_name varchar(32),
last_name varchar(32),
female boolean,
avatar varchar(128),
city varchar(32),
lat real check (-90 <= lat and lat <= 90),
lng real check (-90 <= lng and lng <= 90),
last_login timestamp default current_timestamp,
last_ip inet,
medals smallint check (medals > 0)
);
create table pref_rate (
obj varchar(32) references pref_users(id),
subj varchar(32) references pref_users(id),
good boolean,
fair boolean,
开发者_运维百科 nice boolean,
about varchar(256),
last_rated timestamp default current_timestamp
);
create table pref_money (
id varchar(32) references pref_users,
yw char(7) default to_char(current_timestamp, 'YYYY-WW'),
money real
);
create index pref_money_yw_index on pref_money(yw);
create table pref_pass (
id varchar(32) references pref_users
);
create table pref_misere (
id varchar(32) references pref_users
);
*/
create or replace function pref_update_users(_id varchar,
_first_name varchar, _last_name varchar, _female boolean,
_avatar varchar, _city varchar, _last_ip inet) returns void as $BODY$
begin
update pref_users set
first_name = _first_name,
last_name = _last_name,
female = _female,
avatar = _avatar,
city = _city,
last_ip = _last_ip
where id = _id;
if not found then
insert into pref_users(id, first_name,
last_name, female, avatar, city, last_ip)
values (_id, _first_name, _last_name,
_female, _avatar, _city, _last_ip);
end if;
end;
$BODY$ language plpgsql;
};
eval {
my $dbh = DBI->connect_cached('dbi:Pg:dbname=' .
DBNAME, DBUSER, DBPASS, {
AutoCommit => 1,
PrintWarn => 1,
PrintError => 1,
RaiseError => 1,
FetchHashKeyName => 'NAME_lc',
pg_enable_utf8 => 1
}, {pg_async => PG_ASYNC});
$dbh->do(SQL_CREATE_TABLES, {pg_async => PG_ASYNC});
};
warn $@ if $@;
for my $i (1..10) {
eval {
my $dbh = DBI->connect_cached('dbi:Pg:dbname=' .
DBNAME, DBUSER, DBPASS, {
AutoCommit => 1,
PrintWarn => 1,
PrintError => 1,
RaiseError => 1,
FetchHashKeyName => 'NAME_lc',
pg_enable_utf8 => 1
}, {pg_async => PG_ASYNC});
#$dbh->pg_result;
my $sth = $dbh->prepare_cached(
q{select pref_update_users(?, ?, ?, ?, ?, ?, NULL)}, {pg_async => PG_ASYNC});
$sth->execute('ID123', 'Alexander', 'Farber', undef, undef, undef);
};
warn $@ if $@;
}
Thank you, Alex
DBD::Pg's asynchronous support works like that, only one active asynchronous query at a time. The PG_OLDQUERY_CANCEL and PG_OLDQUERY_WAIT constants need to be set on if you want to cancel or wait for the currently active query and then do the new query, rather than throwing the error about old queries.
You could add your queries into a AoH (Array of Hashes) or a Thread::Queue (ignore the name, it is useful as a generic queue object and execute them on a timer (once the previous one has completed) (or add the $dbh->{pg_socket} socket to your polled sockets with IO::Poll and check readiness of your query and execute the next query in your queue when that socket has data to read, indicating about the same as pg_ready).
精彩评论