/*
* call-seq:
* conn.wait_for_notify( [ timeout ] ) -> String
* conn.wait_for_notify( [ timeout ] ) { |event, pid| block }
* conn.wait_for_notify( [ timeout ] ) { |event, pid, payload| block } # PostgreSQL 9.0
*
* Blocks while waiting for notification(s), or until the optional
* _timeout_ is reached, whichever comes first. _timeout_ is
* measured in seconds and can be fractional.
*
* Returns +nil+ if _timeout_ is reached, the name of the NOTIFY
* event otherwise. If used in block form, passes the name of the
* NOTIFY +event+ and the generating +pid+ into the block.
*
* Under PostgreSQL 9.0 and later, if the notification is sent with
* the optional +payload+ string, it will be given to the block as the
* third argument.
*
*/
static VALUE
pgconn_wait_for_notify(int argc, VALUE *argv, VALUE self)
{
PGconn *conn = get_pgconn( self );
PGnotify *notification;
int sd = PQsocket( conn );
int ret;
struct timeval timeout;
struct timeval *ptimeout = NULL;
VALUE timeout_in = Qnil, relname = Qnil, be_pid = Qnil, extra = Qnil;
double timeout_sec;
fd_set sd_rset;
#ifdef _WIN32
fd_set crt_sd_rset;
#endif
if ( sd < 0 )
rb_bug( "PQsocket(conn): couldn't fetch the connection's socket!" );
rb_scan_args( argc, argv, "01", &timeout_in );
if ( RTEST(timeout_in) ) {
timeout_sec = NUM2DBL( timeout_in );
timeout.tv_sec = (long)timeout_sec;
timeout.tv_usec = (long)( (timeout_sec - (long)timeout_sec) * 1e6 );
ptimeout = &timeout;
}
/* Check for notifications */
while ( (notification = PQnotifies(conn)) == NULL ) {
FD_ZERO( &sd_rset );
FD_SET( sd, &sd_rset );
#ifdef _WIN32
create_crt_fd(&sd_rset, &crt_sd_rset);
#endif
/* Wait for the socket to become readable before checking again */
ret = rb_thread_select( sd+1, &sd_rset, NULL, NULL, ptimeout );
#ifdef _WIN32
cleanup_crt_fd(&sd_rset, &crt_sd_rset);
#endif
if ( ret < 0 )
rb_sys_fail( 0 );
/* Return nil if the select timed out */
if ( ret == 0 ) return Qnil;
/* Read the socket */
if ( (ret = PQconsumeInput(conn)) != 1 )
rb_raise( rb_ePGError, "PQconsumeInput == %d: %s", ret, PQerrorMessage(conn) );
}
relname = rb_tainted_str_new2( notification->relname );
ASSOCIATE_INDEX( relname, self );
be_pid = INT2NUM( notification->be_pid );
#ifdef HAVE_ST_NOTIFY_EXTRA
if ( *notification->extra ) {
extra = rb_tainted_str_new2( notification->extra );
ASSOCIATE_INDEX( extra, self );
}
#endif
PQfreemem( notification );
if ( rb_block_given_p() )
rb_yield_values( 3, relname, be_pid, extra );
return relname;
}