summaryrefslogtreecommitdiff
path: root/includes/jobqueue/JobQueueRedis.php
diff options
context:
space:
mode:
Diffstat (limited to 'includes/jobqueue/JobQueueRedis.php')
-rw-r--r--includes/jobqueue/JobQueueRedis.php255
1 files changed, 59 insertions, 196 deletions
diff --git a/includes/jobqueue/JobQueueRedis.php b/includes/jobqueue/JobQueueRedis.php
index 3519eac8..6c823fb9 100644
--- a/includes/jobqueue/JobQueueRedis.php
+++ b/includes/jobqueue/JobQueueRedis.php
@@ -24,7 +24,7 @@
/**
* Class to handle job queues stored in Redis
*
- * This is faster, less resource intensive, queue that JobQueueDB.
+ * This is a faster and less resource-intensive job queue than JobQueueDB.
* All data for a queue using this class is placed into one redis server.
*
* There are eight main redis keys used to track jobs:
@@ -49,7 +49,7 @@
*
* This class requires Redis 2.6 as it makes use Lua scripts for fast atomic operations.
* Additionally, it should be noted that redis has different persistence modes, such
- * as rdb snapshots, journaling, and no persistent. Appropriate configuration should be
+ * as rdb snapshots, journaling, and no persistence. Appropriate configuration should be
* made on the servers based on what queues are using it and what tolerance they have.
*
* @ingroup JobQueue
@@ -64,8 +64,6 @@ class JobQueueRedis extends JobQueue {
protected $server;
/** @var string Compression method to use */
protected $compression;
- /** @var bool */
- protected $daemonized;
const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days)
@@ -90,7 +88,11 @@ class JobQueueRedis extends JobQueue {
$this->server = $params['redisServer'];
$this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none';
$this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
- $this->daemonized = !empty( $params['daemonized'] );
+ if ( empty( $params['daemonized'] ) ) {
+ throw new Exception(
+ "Non-daemonized mode is no longer supported. Please install the " .
+ "mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." );
+ }
}
protected function supportedOrders() {
@@ -134,9 +136,6 @@ class JobQueueRedis extends JobQueue {
* @throws JobQueueError
*/
protected function doGetAcquiredCount() {
- if ( $this->claimTTL <= 0 ) {
- return 0; // no acknowledgements
- }
$conn = $this->getConnection();
try {
$conn->multi( Redis::PIPELINE );
@@ -155,9 +154,6 @@ class JobQueueRedis extends JobQueue {
* @throws JobQueueError
*/
protected function doGetDelayedCount() {
- if ( !$this->checkDelay ) {
- return 0; // no delayed jobs
- }
$conn = $this->getConnection();
try {
return $conn->zSize( $this->getQueueKey( 'z-delayed' ) );
@@ -172,9 +168,6 @@ class JobQueueRedis extends JobQueue {
* @throws JobQueueError
*/
protected function doGetAbandonedCount() {
- if ( $this->claimTTL <= 0 ) {
- return 0; // no acknowledgements
- }
$conn = $this->getConnection();
try {
return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) );
@@ -299,24 +292,10 @@ LUA;
protected function doPop() {
$job = false;
- // Push ready delayed jobs into the queue every 10 jobs to spread the load.
- // This is also done as a periodic task, but we don't want too much done at once.
- if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) {
- $this->recyclePruneAndUndelayJobs();
- }
-
$conn = $this->getConnection();
try {
do {
- if ( $this->claimTTL > 0 ) {
- // Keep the claimed job list down for high-traffic queues
- if ( mt_rand( 0, 99 ) == 0 ) {
- $this->recyclePruneAndUndelayJobs();
- }
- $blob = $this->popAndAcquireBlob( $conn );
- } else {
- $blob = $this->popAndDeleteBlob( $conn );
- }
+ $blob = $this->popAndAcquireBlob( $conn );
if ( !is_string( $blob ) ) {
break; // no jobs; nothing to do
}
@@ -328,7 +307,7 @@ LUA;
continue;
}
- // If $item is invalid, recyclePruneAndUndelayJobs() will cleanup as needed
+ // If $item is invalid, the runner loop recyling will cleanup as needed
$job = $this->getJobFromFields( $item ); // may be false
} while ( !$job ); // job may be false if invalid
} catch ( RedisException $e ) {
@@ -343,39 +322,6 @@ LUA;
* @return array Serialized string or false
* @throws RedisException
*/
- protected function popAndDeleteBlob( RedisConnRef $conn ) {
- static $script =
-<<<LUA
- local kUnclaimed, kSha1ById, kIdBySha1, kData = unpack(KEYS)
- -- Pop an item off the queue
- local id = redis.call('rpop',kUnclaimed)
- if not id then return false end
- -- Get the job data and remove it
- local item = redis.call('hGet',kData,id)
- redis.call('hDel',kData,id)
- -- Allow new duplicates of this job
- local sha1 = redis.call('hGet',kSha1ById,id)
- if sha1 then redis.call('hDel',kIdBySha1,sha1) end
- redis.call('hDel',kSha1ById,id)
- -- Return the job data
- return item
-LUA;
- return $conn->luaEval( $script,
- array(
- $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
- $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
- $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
- $this->getQueueKey( 'h-data' ), # KEYS[4]
- ),
- 4 # number of first argument(s) that are keys
- );
- }
-
- /**
- * @param RedisConnRef $conn
- * @return array Serialized string or false
- * @throws RedisException
- */
protected function popAndAcquireBlob( RedisConnRef $conn ) {
static $script =
<<<LUA
@@ -416,36 +362,35 @@ LUA;
if ( !isset( $job->metadata['uuid'] ) ) {
throw new MWException( "Job of type '{$job->getType()}' has no UUID." );
}
- if ( $this->claimTTL > 0 ) {
- $conn = $this->getConnection();
- try {
- static $script =
+
+ $conn = $this->getConnection();
+ try {
+ static $script =
<<<LUA
- local kClaimed, kAttempts, kData = unpack(KEYS)
- -- Unmark the job as claimed
- redis.call('zRem',kClaimed,ARGV[1])
- redis.call('hDel',kAttempts,ARGV[1])
- -- Delete the job data itself
- return redis.call('hDel',kData,ARGV[1])
+ local kClaimed, kAttempts, kData = unpack(KEYS)
+ -- Unmark the job as claimed
+ redis.call('zRem',kClaimed,ARGV[1])
+ redis.call('hDel',kAttempts,ARGV[1])
+ -- Delete the job data itself
+ return redis.call('hDel',kData,ARGV[1])
LUA;
- $res = $conn->luaEval( $script,
- array(
- $this->getQueueKey( 'z-claimed' ), # KEYS[1]
- $this->getQueueKey( 'h-attempts' ), # KEYS[2]
- $this->getQueueKey( 'h-data' ), # KEYS[3]
- $job->metadata['uuid'] # ARGV[1]
- ),
- 3 # number of first argument(s) that are keys
- );
-
- if ( !$res ) {
- wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." );
-
- return false;
- }
- } catch ( RedisException $e ) {
- $this->throwRedisException( $conn, $e );
+ $res = $conn->luaEval( $script,
+ array(
+ $this->getQueueKey( 'z-claimed' ), # KEYS[1]
+ $this->getQueueKey( 'h-attempts' ), # KEYS[2]
+ $this->getQueueKey( 'h-data' ), # KEYS[3]
+ $job->metadata['uuid'] # ARGV[1]
+ ),
+ 3 # number of first argument(s) that are keys
+ );
+
+ if ( !$res ) {
+ wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." );
+
+ return false;
}
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
}
return true;
@@ -571,6 +516,29 @@ LUA;
}
}
+ /**
+ * @see JobQueue::getAllAbandonedJobs()
+ * @return Iterator
+ */
+ public function getAllAbandonedJobs() {
+ $conn = $this->getConnection();
+ try {
+ $that = $this;
+
+ return new MappedIterator( // delayed jobs
+ $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 ),
+ function ( $uid ) use ( $that, $conn ) {
+ return $that->getJobFromUidInternal( $uid, $conn );
+ },
+ array( 'accept' => function ( $job ) {
+ return is_object( $job );
+ } )
+ );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+ }
+
public function getCoalesceLocationInternal() {
return "RedisServer:" . $this->server;
}
@@ -630,115 +598,10 @@ LUA;
}
/**
- * Recycle or destroy any jobs that have been claimed for too long
- * and release any ready delayed jobs into the queue
- *
- * @return int Number of jobs recycled/deleted/undelayed
- * @throws MWException|JobQueueError
- */
- public function recyclePruneAndUndelayJobs() {
- $count = 0;
- // For each job item that can be retried, we need to add it back to the
- // main queue and remove it from the list of currenty claimed job items.
- // For those that cannot, they are marked as dead and kept around for
- // investigation and manual job restoration but are eventually deleted.
- $conn = $this->getConnection();
- try {
- $now = time();
- static $script =
-<<<LUA
- local kClaimed, kAttempts, kUnclaimed, kData, kAbandoned, kDelayed = unpack(KEYS)
- local released,abandoned,pruned,undelayed = 0,0,0,0
- -- Get all non-dead jobs that have an expired claim on them.
- -- The score for each item is the last claim timestamp (UNIX).
- local staleClaims = redis.call('zRangeByScore',kClaimed,0,ARGV[1])
- for k,id in ipairs(staleClaims) do
- local timestamp = redis.call('zScore',kClaimed,id)
- local attempts = redis.call('hGet',kAttempts,id)
- if attempts < ARGV[3] then
- -- Claim expired and retries left: re-enqueue the job
- redis.call('lPush',kUnclaimed,id)
- redis.call('hIncrBy',kAttempts,id,1)
- released = released + 1
- else
- -- Claim expired and no retries left: mark the job as dead
- redis.call('zAdd',kAbandoned,timestamp,id)
- abandoned = abandoned + 1
- end
- redis.call('zRem',kClaimed,id)
- end
- -- Get all of the dead jobs that have been marked as dead for too long.
- -- The score for each item is the last claim timestamp (UNIX).
- local deadClaims = redis.call('zRangeByScore',kAbandoned,0,ARGV[2])
- for k,id in ipairs(deadClaims) do
- -- Stale and out of retries: remove any traces of the job
- redis.call('zRem',kAbandoned,id)
- redis.call('hDel',kAttempts,id)
- redis.call('hDel',kData,id)
- pruned = pruned + 1
- end
- -- Get the list of ready delayed jobs, sorted by readiness (UNIX timestamp)
- local ids = redis.call('zRangeByScore',kDelayed,0,ARGV[4])
- -- Migrate the jobs from the "delayed" set to the "unclaimed" list
- for k,id in ipairs(ids) do
- redis.call('lPush',kUnclaimed,id)
- redis.call('zRem',kDelayed,id)
- end
- undelayed = #ids
- return {released,abandoned,pruned,undelayed}
-LUA;
- $res = $conn->luaEval( $script,
- array(
- $this->getQueueKey( 'z-claimed' ), # KEYS[1]
- $this->getQueueKey( 'h-attempts' ), # KEYS[2]
- $this->getQueueKey( 'l-unclaimed' ), # KEYS[3]
- $this->getQueueKey( 'h-data' ), # KEYS[4]
- $this->getQueueKey( 'z-abandoned' ), # KEYS[5]
- $this->getQueueKey( 'z-delayed' ), # KEYS[6]
- $now - $this->claimTTL, # ARGV[1]
- $now - self::MAX_AGE_PRUNE, # ARGV[2]
- $this->maxTries, # ARGV[3]
- $now # ARGV[4]
- ),
- 6 # number of first argument(s) that are keys
- );
- if ( $res ) {
- list( $released, $abandoned, $pruned, $undelayed ) = $res;
- $count += $released + $pruned + $undelayed;
- JobQueue::incrStats( 'job-recycle', $this->type, $released, $this->wiki );
- JobQueue::incrStats( 'job-abandon', $this->type, $abandoned, $this->wiki );
- JobQueue::incrStats( 'job-undelay', $this->type, $undelayed, $this->wiki );
- }
- } catch ( RedisException $e ) {
- $this->throwRedisException( $conn, $e );
- }
-
- return $count;
- }
-
- /**
* @return array
*/
protected function doGetPeriodicTasks() {
- if ( $this->daemonized ) {
- return array(); // managed in the runner loop
- }
- $periods = array( 3600 ); // standard cleanup (useful on config change)
- if ( $this->claimTTL > 0 ) {
- $periods[] = ceil( $this->claimTTL / 2 ); // avoid bad timing
- }
- if ( $this->checkDelay ) {
- $periods[] = 300; // 5 minutes
- }
- $period = min( $periods );
- $period = max( $period, 30 ); // sanity
-
- return array(
- 'recyclePruneAndUndelayJobs' => array(
- 'callback' => array( $this, 'recyclePruneAndUndelayJobs' ),
- 'period' => $period,
- )
- );
+ return array(); // managed in the runner loop
}
/**