summaryrefslogtreecommitdiff
path: root/includes/jobqueue/JobQueue.php
diff options
context:
space:
mode:
Diffstat (limited to 'includes/jobqueue/JobQueue.php')
-rw-r--r--includes/jobqueue/JobQueue.php83
1 files changed, 33 insertions, 50 deletions
diff --git a/includes/jobqueue/JobQueue.php b/includes/jobqueue/JobQueue.php
index c00d22e9..91fe86cf 100644
--- a/includes/jobqueue/JobQueue.php
+++ b/includes/jobqueue/JobQueue.php
@@ -44,11 +44,10 @@ abstract class JobQueue {
/** @var int Maximum number of times to try a job */
protected $maxTries;
- /** @var bool Allow delayed jobs */
- protected $checkDelay;
-
/** @var BagOStuff */
protected $dupCache;
+ /** @var JobQueueAggregator */
+ protected $aggr;
const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
@@ -71,11 +70,10 @@ abstract class JobQueue {
if ( !in_array( $this->order, $this->supportedOrders() ) ) {
throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
}
- $this->checkDelay = !empty( $params['checkDelay'] );
- if ( $this->checkDelay && !$this->supportsDelayedJobs() ) {
- throw new MWException( __CLASS__ . " does not support delayed jobs." );
- }
$this->dupCache = wfGetCache( CACHE_ANYTHING );
+ $this->aggr = isset( $params['aggregator'] )
+ ? $params['aggregator']
+ : new JobQueueAggregatorNull( array() );
}
/**
@@ -98,10 +96,6 @@ abstract class JobQueue {
* but not acknowledged as completed after this many seconds. Recycling
* of jobs simple means re-inserting them into the queue. Jobs can be
* attempted up to three times before being discarded.
- * - checkDelay : If supported, respect Job::getReleaseTimestamp() in the push functions.
- * This lets delayed jobs wait in a staging area until a given timestamp is
- * reached, at which point they will enter the queue. If this is not enabled
- * or not supported, an exception will be thrown on delayed job insertion.
*
* Queue classes should throw an exception if they do not support the options given.
*
@@ -144,14 +138,6 @@ abstract class JobQueue {
}
/**
- * @return bool Whether delayed jobs are enabled
- * @since 1.22
- */
- final public function delayedJobsEnabled() {
- return $this->checkDelay;
- }
-
- /**
* Get the allowed queue orders for configuration validation
*
* @return array Subset of (random, timestamp, fifo, undefined)
@@ -175,6 +161,14 @@ abstract class JobQueue {
}
/**
+ * @return bool Whether delayed jobs are enabled
+ * @since 1.22
+ */
+ final public function delayedJobsEnabled() {
+ return $this->supportsDelayedJobs();
+ }
+
+ /**
* Quickly check if the queue has no available (unacquired, non-delayed) jobs.
* Queue classes should use caching if they are any slower without memcached.
*
@@ -187,9 +181,7 @@ abstract class JobQueue {
* @throws JobQueueError
*/
final public function isEmpty() {
- wfProfileIn( __METHOD__ );
$res = $this->doIsEmpty();
- wfProfileOut( __METHOD__ );
return $res;
}
@@ -210,9 +202,7 @@ abstract class JobQueue {
* @throws JobQueueError
*/
final public function getSize() {
- wfProfileIn( __METHOD__ );
$res = $this->doGetSize();
- wfProfileOut( __METHOD__ );
return $res;
}
@@ -233,9 +223,7 @@ abstract class JobQueue {
* @throws JobQueueError
*/
final public function getAcquiredCount() {
- wfProfileIn( __METHOD__ );
$res = $this->doGetAcquiredCount();
- wfProfileOut( __METHOD__ );
return $res;
}
@@ -257,9 +245,7 @@ abstract class JobQueue {
* @since 1.22
*/
final public function getDelayedCount() {
- wfProfileIn( __METHOD__ );
$res = $this->doGetDelayedCount();
- wfProfileOut( __METHOD__ );
return $res;
}
@@ -282,9 +268,7 @@ abstract class JobQueue {
* @throws JobQueueError
*/
final public function getAbandonedCount() {
- wfProfileIn( __METHOD__ );
$res = $this->doGetAbandonedCount();
- wfProfileOut( __METHOD__ );
return $res;
}
@@ -308,7 +292,8 @@ abstract class JobQueue {
* @throws JobQueueError
*/
final public function push( $jobs, $flags = 0 ) {
- $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags );
+ $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
+ $this->batchPush( $jobs, $flags );
}
/**
@@ -330,15 +315,14 @@ abstract class JobQueue {
if ( $job->getType() !== $this->type ) {
throw new MWException(
"Got '{$job->getType()}' job; expected a '{$this->type}' job." );
- } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) {
+ } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
throw new MWException(
"Got delayed '{$job->getType()}' job; delays are not supported." );
}
}
- wfProfileIn( __METHOD__ );
$this->doBatchPush( $jobs, $flags );
- wfProfileOut( __METHOD__ );
+ $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
}
/**
@@ -366,9 +350,11 @@ abstract class JobQueue {
throw new MWException( "Unrecognized job type '{$this->type}'." );
}
- wfProfileIn( __METHOD__ );
$job = $this->doPop();
- wfProfileOut( __METHOD__ );
+
+ if ( !$job ) {
+ $this->aggr->notifyQueueEmpty( $this->wiki, $this->type );
+ }
// Flag this job as an old duplicate based on its "root" job...
try {
@@ -376,7 +362,7 @@ abstract class JobQueue {
JobQueue::incrStats( 'job-pop-duplicate', $this->type, 1, $this->wiki );
$job = DuplicateJob::newFromJob( $job ); // convert to a no-op
}
- } catch ( MWException $e ) {
+ } catch ( Exception $e ) {
// don't lose jobs over this
}
@@ -403,9 +389,7 @@ abstract class JobQueue {
if ( $job->getType() !== $this->type ) {
throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
}
- wfProfileIn( __METHOD__ );
$this->doAck( $job );
- wfProfileOut( __METHOD__ );
}
/**
@@ -449,9 +433,7 @@ abstract class JobQueue {
if ( $job->getType() !== $this->type ) {
throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
}
- wfProfileIn( __METHOD__ );
$ok = $this->doDeduplicateRootJob( $job );
- wfProfileOut( __METHOD__ );
return $ok;
}
@@ -494,9 +476,7 @@ abstract class JobQueue {
if ( $job->getType() !== $this->type ) {
throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
}
- wfProfileIn( __METHOD__ );
$isDuplicate = $this->doIsRootJobOldDuplicate( $job );
- wfProfileOut( __METHOD__ );
return $isDuplicate;
}
@@ -538,9 +518,7 @@ abstract class JobQueue {
* @return void
*/
final public function delete() {
- wfProfileIn( __METHOD__ );
$this->doDelete();
- wfProfileOut( __METHOD__ );
}
/**
@@ -560,9 +538,7 @@ abstract class JobQueue {
* @throws JobQueueError
*/
final public function waitForBackups() {
- wfProfileIn( __METHOD__ );
$this->doWaitForBackups();
- wfProfileOut( __METHOD__ );
}
/**
@@ -607,9 +583,7 @@ abstract class JobQueue {
* @return void
*/
final public function flushCaches() {
- wfProfileIn( __METHOD__ );
$this->doFlushCaches();
- wfProfileOut( __METHOD__ );
}
/**
@@ -642,6 +616,17 @@ abstract class JobQueue {
}
/**
+ * Get an iterator to traverse over all abandoned jobs in this queue
+ *
+ * @return Iterator
+ * @throws JobQueueError
+ * @since 1.25
+ */
+ public function getAllAbandonedJobs() {
+ return new ArrayIterator( array() ); // not implemented
+ }
+
+ /**
* Do not use this function outside of JobQueue/JobQueueGroup
*
* @return string
@@ -661,7 +646,6 @@ abstract class JobQueue {
* @since 1.22
*/
final public function getSiblingQueuesWithJobs( array $types ) {
- $section = new ProfileSection( __METHOD__ );
return $this->doGetSiblingQueuesWithJobs( $types );
}
@@ -686,7 +670,6 @@ abstract class JobQueue {
* @since 1.22
*/
final public function getSiblingQueueSizes( array $types ) {
- $section = new ProfileSection( __METHOD__ );
return $this->doGetSiblingQueueSizes( $types );
}