summaryrefslogtreecommitdiff
path: root/includes/jobqueue/JobQueueRedis.php
diff options
context:
space:
mode:
Diffstat (limited to 'includes/jobqueue/JobQueueRedis.php')
-rw-r--r--includes/jobqueue/JobQueueRedis.php151
1 files changed, 83 insertions, 68 deletions
diff --git a/includes/jobqueue/JobQueueRedis.php b/includes/jobqueue/JobQueueRedis.php
index 6c823fb9..29c8068a 100644
--- a/includes/jobqueue/JobQueueRedis.php
+++ b/includes/jobqueue/JobQueueRedis.php
@@ -81,6 +81,7 @@ class JobQueueRedis extends JobQueue {
* - daemonized : Set to true if the redisJobRunnerService runs in the background.
* This will disable job recycling/undelaying from the MediaWiki side
* to avoid redundance and out-of-sync configuration.
+ * @throws InvalidArgumentException
*/
public function __construct( array $params ) {
parent::__construct( $params );
@@ -89,7 +90,7 @@ class JobQueueRedis extends JobQueue {
$this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none';
$this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
if ( empty( $params['daemonized'] ) ) {
- throw new Exception(
+ throw new InvalidArgumentException(
"Non-daemonized mode is no longer supported. Please install the " .
"mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." );
}
@@ -110,7 +111,7 @@ class JobQueueRedis extends JobQueue {
/**
* @see JobQueue::doIsEmpty()
* @return bool
- * @throws MWException
+ * @throws JobQueueError
*/
protected function doIsEmpty() {
return $this->doGetSize() == 0;
@@ -119,7 +120,7 @@ class JobQueueRedis extends JobQueue {
/**
* @see JobQueue::doGetSize()
* @return int
- * @throws MWException
+ * @throws JobQueueError
*/
protected function doGetSize() {
$conn = $this->getConnection();
@@ -205,7 +206,7 @@ class JobQueueRedis extends JobQueue {
if ( $flags & self::QOS_ATOMIC ) {
$batches = array( $items ); // all or nothing
} else {
- $batches = array_chunk( $items, 500 ); // avoid tying up the server
+ $batches = array_chunk( $items, 100 ); // avoid tying up the server
}
$failed = 0;
$pushed = 0;
@@ -222,9 +223,9 @@ class JobQueueRedis extends JobQueue {
throw new RedisException( "Could not insert {$failed} {$this->type} job(s)." );
}
- JobQueue::incrStats( 'job-insert', $this->type, count( $items ), $this->wiki );
- JobQueue::incrStats( 'job-insert-duplicate', $this->type,
- count( $items ) - $failed - $pushed, $this->wiki );
+ JobQueue::incrStats( 'inserts', $this->type, count( $items ) );
+ JobQueue::incrStats( 'dupe_inserts', $this->type,
+ count( $items ) - $failed - $pushed );
} catch ( RedisException $e ) {
$this->throwRedisException( $conn, $e );
}
@@ -300,7 +301,7 @@ LUA;
break; // no jobs; nothing to do
}
- JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki );
+ JobQueue::incrStats( 'job-pop', $this->type );
$item = $this->unserialize( $blob );
if ( $item === false ) {
wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." );
@@ -356,13 +357,15 @@ LUA;
* @see JobQueue::doAck()
* @param Job $job
* @return Job|bool
- * @throws MWException|JobQueueError
+ * @throws UnexpectedValueException
+ * @throws JobQueueError
*/
protected function doAck( Job $job ) {
if ( !isset( $job->metadata['uuid'] ) ) {
- throw new MWException( "Job of type '{$job->getType()}' has no UUID." );
+ throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no UUID." );
}
+ $uuid = $job->metadata['uuid'];
$conn = $this->getConnection();
try {
static $script =
@@ -379,16 +382,18 @@ LUA;
$this->getQueueKey( 'z-claimed' ), # KEYS[1]
$this->getQueueKey( 'h-attempts' ), # KEYS[2]
$this->getQueueKey( 'h-data' ), # KEYS[3]
- $job->metadata['uuid'] # ARGV[1]
+ $uuid # ARGV[1]
),
3 # number of first argument(s) that are keys
);
if ( !$res ) {
- wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." );
+ wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job $uuid." );
return false;
}
+
+ JobQueue::incrStats( 'job-ack', $this->type );
} catch ( RedisException $e ) {
$this->throwRedisException( $conn, $e );
}
@@ -398,13 +403,14 @@ LUA;
/**
* @see JobQueue::doDeduplicateRootJob()
- * @param Job $job
+ * @param IJobSpecification $job
* @return bool
- * @throws MWException|JobQueueError
+ * @throws JobQueueError
+ * @throws LogicException
*/
- protected function doDeduplicateRootJob( Job $job ) {
+ protected function doDeduplicateRootJob( IJobSpecification $job ) {
if ( !$job->hasRootJobParams() ) {
- throw new MWException( "Cannot register root job; missing parameters." );
+ throw new LogicException( "Cannot register root job; missing parameters." );
}
$params = $job->getRootJobParams();
@@ -441,6 +447,7 @@ LUA;
// Get the last time this root job was enqueued
$timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
} catch ( RedisException $e ) {
+ $timestamp = false;
$this->throwRedisException( $conn, $e );
}
@@ -473,70 +480,84 @@ LUA;
/**
* @see JobQueue::getAllQueuedJobs()
* @return Iterator
+ * @throws JobQueueError
*/
public function getAllQueuedJobs() {
$conn = $this->getConnection();
try {
- $that = $this;
-
- return new MappedIterator(
- $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ),
- function ( $uid ) use ( $that, $conn ) {
- return $that->getJobFromUidInternal( $uid, $conn );
- },
- array( 'accept' => function ( $job ) {
- return is_object( $job );
- } )
- );
+ $uids = $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 );
} catch ( RedisException $e ) {
$this->throwRedisException( $conn, $e );
}
+
+ return $this->getJobIterator( $conn, $uids );
}
/**
- * @see JobQueue::getAllQueuedJobs()
+ * @see JobQueue::getAllDelayedJobs()
* @return Iterator
+ * @throws JobQueueError
*/
public function getAllDelayedJobs() {
$conn = $this->getConnection();
try {
- $that = $this;
-
- return new MappedIterator( // delayed jobs
- $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ),
- function ( $uid ) use ( $that, $conn ) {
- return $that->getJobFromUidInternal( $uid, $conn );
- },
- array( 'accept' => function ( $job ) {
- return is_object( $job );
- } )
- );
+ $uids = $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 );
} catch ( RedisException $e ) {
$this->throwRedisException( $conn, $e );
}
+
+ return $this->getJobIterator( $conn, $uids );
+ }
+
+ /**
+ * @see JobQueue::getAllAcquiredJobs()
+ * @return Iterator
+ * @throws JobQueueError
+ */
+ public function getAllAcquiredJobs() {
+ $conn = $this->getConnection();
+ try {
+ $uids = $conn->zRange( $this->getQueueKey( 'z-claimed' ), 0, -1 );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+
+ return $this->getJobIterator( $conn, $uids );
}
/**
* @see JobQueue::getAllAbandonedJobs()
* @return Iterator
+ * @throws JobQueueError
*/
public function getAllAbandonedJobs() {
$conn = $this->getConnection();
try {
- $that = $this;
-
- return new MappedIterator( // delayed jobs
- $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 ),
- function ( $uid ) use ( $that, $conn ) {
- return $that->getJobFromUidInternal( $uid, $conn );
- },
- array( 'accept' => function ( $job ) {
- return is_object( $job );
- } )
- );
+ $uids = $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 );
} catch ( RedisException $e ) {
$this->throwRedisException( $conn, $e );
}
+
+ return $this->getJobIterator( $conn, $uids );
+ }
+
+ /**
+ * @param RedisConnRef $conn
+ * @param array $uids List of job UUIDs
+ * @return MappedIterator
+ */
+ protected function getJobIterator( RedisConnRef $conn, array $uids ) {
+ $that = $this;
+
+ return new MappedIterator(
+ $uids,
+ function ( $uid ) use ( $that, $conn ) {
+ return $that->getJobFromUidInternal( $uid, $conn );
+ },
+ array( 'accept' => function ( $job ) {
+ return is_object( $job );
+ } )
+ );
}
public function getCoalesceLocationInternal() {
@@ -575,7 +596,8 @@ LUA;
* @param string $uid
* @param RedisConnRef $conn
* @return Job|bool Returns false if the job does not exist
- * @throws MWException|JobQueueError
+ * @throws JobQueueError
+ * @throws UnexpectedValueException
*/
public function getJobFromUidInternal( $uid, RedisConnRef $conn ) {
try {
@@ -583,13 +605,16 @@ LUA;
if ( $data === false ) {
return false; // not found
}
- $item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) );
+ $item = $this->unserialize( $data );
if ( !is_array( $item ) ) { // this shouldn't happen
- throw new MWException( "Could not find job with ID '$uid'." );
+ throw new UnexpectedValueException( "Could not find job with ID '$uid'." );
}
$title = Title::makeTitle( $item['namespace'], $item['title'] );
$job = Job::factory( $item['type'], $title, $item['params'] );
$job->metadata['uuid'] = $item['uuid'];
+ $job->metadata['timestamp'] = $item['timestamp'];
+ // Add in attempt count for debugging at showJobs.php
+ $job->metadata['attempts'] = $conn->hGet( $this->getQueueKey( 'h-attempts' ), $uid );
return $job;
} catch ( RedisException $e ) {
@@ -598,13 +623,6 @@ LUA;
}
/**
- * @return array
- */
- protected function doGetPeriodicTasks() {
- return array(); // managed in the runner loop
- }
-
- /**
* @param IJobSpecification $job
* @return array
*/
@@ -631,15 +649,12 @@ LUA;
* @return Job|bool
*/
protected function getJobFromFields( array $fields ) {
- $title = Title::makeTitleSafe( $fields['namespace'], $fields['title'] );
- if ( $title ) {
- $job = Job::factory( $fields['type'], $title, $fields['params'] );
- $job->metadata['uuid'] = $fields['uuid'];
+ $title = Title::makeTitle( $fields['namespace'], $fields['title'] );
+ $job = Job::factory( $fields['type'], $title, $fields['params'] );
+ $job->metadata['uuid'] = $fields['uuid'];
+ $job->metadata['timestamp'] = $fields['timestamp'];
- return $job;
- }
-
- return false;
+ return $job;
}
/**