summaryrefslogtreecommitdiff
path: root/includes/jobqueue/JobQueueDB.php
diff options
context:
space:
mode:
Diffstat (limited to 'includes/jobqueue/JobQueueDB.php')
-rw-r--r--includes/jobqueue/JobQueueDB.php98
1 files changed, 44 insertions, 54 deletions
diff --git a/includes/jobqueue/JobQueueDB.php b/includes/jobqueue/JobQueueDB.php
index d5f47ffd..d1e4a135 100644
--- a/includes/jobqueue/JobQueueDB.php
+++ b/includes/jobqueue/JobQueueDB.php
@@ -29,7 +29,6 @@
*/
class JobQueueDB extends JobQueue {
const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
- const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
const MAX_OFFSET = 255; // integer; maximum number of rows to skip
@@ -71,15 +70,6 @@ class JobQueueDB extends JobQueue {
* @return bool
*/
protected function doIsEmpty() {
- $key = $this->getCacheKey( 'empty' );
-
- $isEmpty = $this->cache->get( $key );
- if ( $isEmpty === 'true' ) {
- return true;
- } elseif ( $isEmpty === 'false' ) {
- return false;
- }
-
$dbr = $this->getSlaveDB();
try {
$found = $dbr->selectField( // unclaimed job
@@ -88,7 +78,6 @@ class JobQueueDB extends JobQueue {
} catch ( DBError $e ) {
$this->throwDBException( $e );
}
- $this->cache->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG );
return !$found;
}
@@ -256,12 +245,9 @@ class JobQueueDB extends JobQueue {
foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
$dbw->insert( 'job', $rowBatch, $method );
}
- JobQueue::incrStats( 'job-insert', $this->type, count( $rows ), $this->wiki );
- JobQueue::incrStats(
- 'job-insert-duplicate',
- $this->type,
- count( $rowSet ) + count( $rowList ) - count( $rows ),
- $this->wiki
+ JobQueue::incrStats( 'inserts', $this->type, count( $rows ) );
+ JobQueue::incrStats( 'dupe_inserts', $this->type,
+ count( $rowSet ) + count( $rowList ) - count( $rows )
);
} catch ( DBError $e ) {
if ( $flags & self::QOS_ATOMIC ) {
@@ -273,8 +259,6 @@ class JobQueueDB extends JobQueue {
$dbw->commit( $method );
}
- $this->cache->set( $this->getCacheKey( 'empty' ), 'false', JobQueueDB::CACHE_TTL_LONG );
-
return;
}
@@ -283,10 +267,6 @@ class JobQueueDB extends JobQueue {
* @return Job|bool
*/
protected function doPop() {
- if ( $this->cache->get( $this->getCacheKey( 'empty' ) ) === 'true' ) {
- return false; // queue is empty
- }
-
$dbw = $this->getMasterDB();
try {
$dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
@@ -309,22 +289,23 @@ class JobQueueDB extends JobQueue {
}
// Check if we found a row to reserve...
if ( !$row ) {
- $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG );
break; // nothing to do
}
- JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki );
+ JobQueue::incrStats( 'pops', $this->type );
// Get the job object from the row...
- $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
- if ( !$title ) {
- $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
- wfDebug( "Row has invalid title '{$row->job_title}'.\n" );
- continue; // try again
- }
+ $title = Title::makeTitle( $row->job_namespace, $row->job_title );
$job = Job::factory( $row->job_cmd, $title,
self::extractBlob( $row->job_params ), $row->job_id );
$job->metadata['id'] = $row->job_id;
+ $job->metadata['timestamp'] = $row->job_timestamp;
break; // done
} while ( true );
+
+ if ( !$job || mt_rand( 0, 9 ) == 0 ) {
+ // Handled jobs that need to be recycled/deleted;
+ // any recycled jobs will be picked up next attempt
+ $this->recycleAndDeleteStaleJobs();
+ }
} catch ( DBError $e ) {
$this->throwDBException( $e );
}
@@ -495,6 +476,8 @@ class JobQueueDB extends JobQueue {
// Delete a row with a single DELETE without holding row locks over RTTs...
$dbw->delete( 'job',
array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ );
+
+ JobQueue::incrStats( 'acks', $this->type );
} catch ( DBError $e ) {
$this->throwDBException( $e );
}
@@ -504,11 +487,11 @@ class JobQueueDB extends JobQueue {
/**
* @see JobQueue::doDeduplicateRootJob()
- * @param Job $job
+ * @param IJobSpecification $job
* @throws MWException
* @return bool
*/
- protected function doDeduplicateRootJob( Job $job ) {
+ protected function doDeduplicateRootJob( IJobSpecification $job ) {
$params = $job->getParams();
if ( !isset( $params['rootJobSignature'] ) ) {
throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
@@ -560,22 +543,10 @@ class JobQueueDB extends JobQueue {
}
/**
- * @return array
- */
- protected function doGetPeriodicTasks() {
- return array(
- 'recycleAndDeleteStaleJobs' => array(
- 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
- 'period' => ceil( $this->claimTTL / 2 )
- )
- );
- }
-
- /**
* @return void
*/
protected function doFlushCaches() {
- foreach ( array( 'empty', 'size', 'acquiredcount' ) as $type ) {
+ foreach ( array( 'size', 'acquiredcount' ) as $type ) {
$this->cache->delete( $this->getCacheKey( $type ) );
}
}
@@ -585,18 +556,35 @@ class JobQueueDB extends JobQueue {
* @return Iterator
*/
public function getAllQueuedJobs() {
+ return $this->getJobIterator( array( 'job_cmd' => $this->getType(), 'job_token' => '' ) );
+ }
+
+ /**
+ * @see JobQueue::getAllAcquiredJobs()
+ * @return Iterator
+ */
+ public function getAllAcquiredJobs() {
+ return $this->getJobIterator( array( 'job_cmd' => $this->getType(), "job_token > ''" ) );
+ }
+
+ /**
+ * @param array $conds Query conditions
+ * @return Iterator
+ */
+ protected function getJobIterator( array $conds ) {
$dbr = $this->getSlaveDB();
try {
return new MappedIterator(
- $dbr->select( 'job', self::selectFields(),
- array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ),
- function ( $row ) use ( $dbr ) {
+ $dbr->select( 'job', self::selectFields(), $conds ),
+ function ( $row ) {
$job = Job::factory(
$row->job_cmd,
Title::makeTitle( $row->job_namespace, $row->job_title ),
- strlen( $row->job_params ) ? unserialize( $row->job_params ) : false
+ strlen( $row->job_params ) ? unserialize( $row->job_params ) : array()
);
$job->metadata['id'] = $row->job_id;
+ $job->metadata['timestamp'] = $row->job_timestamp;
+
return $job;
}
);
@@ -613,6 +601,10 @@ class JobQueueDB extends JobQueue {
protected function doGetSiblingQueuesWithJobs( array $types ) {
$dbr = $this->getSlaveDB();
+ // @note: this does not check whether the jobs are claimed or not.
+ // This is useful so JobQueueGroup::pop() also sees queues that only
+ // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
+ // failed jobs so that they can be popped again for that edge case.
$res = $dbr->select( 'job', 'DISTINCT job_cmd',
array( 'job_cmd' => $types ), __METHOD__ );
@@ -685,9 +677,7 @@ class JobQueueDB extends JobQueue {
);
$affected = $dbw->affectedRows();
$count += $affected;
- JobQueue::incrStats( 'job-recycle', $this->type, $affected, $this->wiki );
- // The tasks recycled jobs or release delayed jobs into the queue
- $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
+ JobQueue::incrStats( 'recycles', $this->type, $affected );
$this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
}
}
@@ -714,7 +704,7 @@ class JobQueueDB extends JobQueue {
$dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ );
$affected = $dbw->affectedRows();
$count += $affected;
- JobQueue::incrStats( 'job-abandon', $this->type, $affected, $this->wiki );
+ JobQueue::incrStats( 'abandons', $this->type, $affected );
}
$dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );