summaryrefslogtreecommitdiff
path: root/includes/jobqueue
diff options
context:
space:
mode:
Diffstat (limited to 'includes/jobqueue')
-rw-r--r--includes/jobqueue/Job.php120
-rw-r--r--includes/jobqueue/JobQueue.php83
-rw-r--r--includes/jobqueue/JobQueueDB.php6
-rw-r--r--includes/jobqueue/JobQueueFederated.php131
-rw-r--r--includes/jobqueue/JobQueueGroup.php14
-rw-r--r--includes/jobqueue/JobQueueRedis.php255
-rw-r--r--includes/jobqueue/JobRunner.php86
-rw-r--r--includes/jobqueue/JobSpecification.php36
-rw-r--r--includes/jobqueue/aggregator/JobQueueAggregator.php28
-rw-r--r--includes/jobqueue/aggregator/JobQueueAggregatorMemc.php125
-rw-r--r--includes/jobqueue/aggregator/JobQueueAggregatorRedis.php2
-rw-r--r--includes/jobqueue/jobs/AssembleUploadChunksJob.php19
-rw-r--r--includes/jobqueue/jobs/DuplicateJob.php2
-rw-r--r--includes/jobqueue/jobs/EnqueueJob.php88
-rw-r--r--includes/jobqueue/jobs/HTMLCacheUpdateJob.php46
-rw-r--r--includes/jobqueue/jobs/NullJob.php2
-rw-r--r--includes/jobqueue/jobs/PublishStashedFileJob.php22
-rw-r--r--includes/jobqueue/jobs/RecentChangesUpdateJob.php223
-rw-r--r--includes/jobqueue/jobs/RefreshLinksJob.php16
-rw-r--r--includes/jobqueue/jobs/RefreshLinksJob2.php141
-rw-r--r--includes/jobqueue/jobs/ThumbnailRenderJob.php109
-rw-r--r--includes/jobqueue/jobs/UploadFromUrlJob.php2
22 files changed, 770 insertions, 786 deletions
diff --git a/includes/jobqueue/Job.php b/includes/jobqueue/Job.php
index ee3f2c2b..f8de0b5d 100644
--- a/includes/jobqueue/Job.php
+++ b/includes/jobqueue/Job.php
@@ -47,20 +47,12 @@ abstract class Job implements IJobSpecification {
/** @var string Text for error that occurred last */
protected $error;
- /*-------------------------------------------------------------------------
- * Abstract functions
- *------------------------------------------------------------------------*/
-
/**
* Run the job
* @return bool Success
*/
abstract public function run();
- /*-------------------------------------------------------------------------
- * Static functions
- *------------------------------------------------------------------------*/
-
/**
* Create the appropriate object to handle a specific job
*
@@ -81,80 +73,37 @@ abstract class Job implements IJobSpecification {
}
/**
+ * @param string $command
+ * @param Title $title
+ * @param array|bool $params Can not be === true
+ */
+ public function __construct( $command, $title, $params = false ) {
+ $this->command = $command;
+ $this->title = $title;
+ $this->params = $params;
+
+ // expensive jobs may set this to true
+ $this->removeDuplicates = false;
+ }
+
+ /**
* Batch-insert a group of jobs into the queue.
* This will be wrapped in a transaction with a forced commit.
*
* This may add duplicate at insert time, but they will be
* removed later on, when the first one is popped.
*
- * @param array $jobs Array of Job objects
+ * @param Job[] $jobs Array of Job objects
* @return bool
* @deprecated since 1.21
*/
public static function batchInsert( $jobs ) {
+ wfDeprecated( __METHOD__, '1.21' );
JobQueueGroup::singleton()->push( $jobs );
return true;
}
/**
- * Insert a group of jobs into the queue.
- *
- * Same as batchInsert() but does not commit and can thus
- * be rolled-back as part of a larger transaction. However,
- * large batches of jobs can cause slave lag.
- *
- * @param array $jobs Array of Job objects
- * @return bool
- * @deprecated since 1.21
- */
- public static function safeBatchInsert( $jobs ) {
- JobQueueGroup::singleton()->push( $jobs, JobQueue::QOS_ATOMIC );
- return true;
- }
-
- /**
- * Pop a job of a certain type. This tries less hard than pop() to
- * actually find a job; it may be adversely affected by concurrent job
- * runners.
- *
- * @param string $type
- * @return Job|bool Returns false if there are no jobs
- * @deprecated since 1.21
- */
- public static function pop_type( $type ) {
- return JobQueueGroup::singleton()->get( $type )->pop();
- }
-
- /**
- * Pop a job off the front of the queue.
- * This is subject to $wgJobTypesExcludedFromDefaultQueue.
- *
- * @return Job|bool False if there are no jobs
- * @deprecated since 1.21
- */
- public static function pop() {
- return JobQueueGroup::singleton()->pop();
- }
-
- /*-------------------------------------------------------------------------
- * Non-static functions
- *------------------------------------------------------------------------*/
-
- /**
- * @param string $command
- * @param Title $title
- * @param array|bool $params
- */
- public function __construct( $command, $title, $params = false ) {
- $this->command = $command;
- $this->title = $title;
- $this->params = $params;
-
- // expensive jobs may set this to true
- $this->removeDuplicates = false;
- }
-
- /**
* @return string
*/
public function getType() {
@@ -186,7 +135,15 @@ abstract class Job implements IJobSpecification {
}
/**
- * @return bool Whether only one of each identical set of jobs should be run
+ * Whether the queue should reject insertion of this job if a duplicate exists
+ *
+ * This can be used to avoid duplicated effort or combined with delayed jobs to
+ * coalesce updates into larger batches. Claimed jobs are never treated as
+ * duplicates of new jobs, and some queues may allow a few duplicates due to
+ * network partitions and fail-over. Thus, additional locking is needed to
+ * enforce mutual exclusion if this is really needed.
+ *
+ * @return bool
*/
public function ignoreDuplicates() {
return $this->removeDuplicates;
@@ -231,6 +188,8 @@ abstract class Job implements IJobSpecification {
unset( $info['params']['rootJobTimestamp'] );
// Likewise for jobs with different delay times
unset( $info['params']['jobReleaseTimestamp'] );
+ // Queues pack and hash this array, so normalize the order
+ ksort( $info['params'] );
}
return $info;
@@ -315,7 +274,7 @@ abstract class Job implements IJobSpecification {
break;
}
}
- if ( $filteredValue ) {
+ if ( $filteredValue && count( $filteredValue ) < 10 ) {
$value = FormatJson::encode( $filteredValue );
} else {
$value = "array(" . count( $value ) . ")";
@@ -328,16 +287,25 @@ abstract class Job implements IJobSpecification {
}
}
- if ( is_object( $this->title ) ) {
- $s = "{$this->command} " . $this->title->getPrefixedDBkey();
- if ( $paramString !== '' ) {
- $s .= ' ' . $paramString;
+ $metaString = '';
+ foreach ( $this->metadata as $key => $value ) {
+ if ( is_scalar( $value ) && mb_strlen( $value ) < 1024 ) {
+ $metaString .= ( $metaString ? ",$key=$value" : "$key=$value" );
}
+ }
- return $s;
- } else {
- return "{$this->command} $paramString";
+ $s = $this->command;
+ if ( is_object( $this->title ) ) {
+ $s .= " {$this->title->getPrefixedDBkey()}";
}
+ if ( $paramString != '' ) {
+ $s .= " $paramString";
+ }
+ if ( $metaString != '' ) {
+ $s .= " ($metaString)";
+ }
+
+ return $s;
}
protected function setLastError( $error ) {
diff --git a/includes/jobqueue/JobQueue.php b/includes/jobqueue/JobQueue.php
index c00d22e9..91fe86cf 100644
--- a/includes/jobqueue/JobQueue.php
+++ b/includes/jobqueue/JobQueue.php
@@ -44,11 +44,10 @@ abstract class JobQueue {
/** @var int Maximum number of times to try a job */
protected $maxTries;
- /** @var bool Allow delayed jobs */
- protected $checkDelay;
-
/** @var BagOStuff */
protected $dupCache;
+ /** @var JobQueueAggregator */
+ protected $aggr;
const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
@@ -71,11 +70,10 @@ abstract class JobQueue {
if ( !in_array( $this->order, $this->supportedOrders() ) ) {
throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
}
- $this->checkDelay = !empty( $params['checkDelay'] );
- if ( $this->checkDelay && !$this->supportsDelayedJobs() ) {
- throw new MWException( __CLASS__ . " does not support delayed jobs." );
- }
$this->dupCache = wfGetCache( CACHE_ANYTHING );
+ $this->aggr = isset( $params['aggregator'] )
+ ? $params['aggregator']
+ : new JobQueueAggregatorNull( array() );
}
/**
@@ -98,10 +96,6 @@ abstract class JobQueue {
* but not acknowledged as completed after this many seconds. Recycling
* of jobs simple means re-inserting them into the queue. Jobs can be
* attempted up to three times before being discarded.
- * - checkDelay : If supported, respect Job::getReleaseTimestamp() in the push functions.
- * This lets delayed jobs wait in a staging area until a given timestamp is
- * reached, at which point they will enter the queue. If this is not enabled
- * or not supported, an exception will be thrown on delayed job insertion.
*
* Queue classes should throw an exception if they do not support the options given.
*
@@ -144,14 +138,6 @@ abstract class JobQueue {
}
/**
- * @return bool Whether delayed jobs are enabled
- * @since 1.22
- */
- final public function delayedJobsEnabled() {
- return $this->checkDelay;
- }
-
- /**
* Get the allowed queue orders for configuration validation
*
* @return array Subset of (random, timestamp, fifo, undefined)
@@ -175,6 +161,14 @@ abstract class JobQueue {
}
/**
+ * @return bool Whether delayed jobs are enabled
+ * @since 1.22
+ */
+ final public function delayedJobsEnabled() {
+ return $this->supportsDelayedJobs();
+ }
+
+ /**
* Quickly check if the queue has no available (unacquired, non-delayed) jobs.
* Queue classes should use caching if they are any slower without memcached.
*
@@ -187,9 +181,7 @@ abstract class JobQueue {
* @throws JobQueueError
*/
final public function isEmpty() {
- wfProfileIn( __METHOD__ );
$res = $this->doIsEmpty();
- wfProfileOut( __METHOD__ );
return $res;
}
@@ -210,9 +202,7 @@ abstract class JobQueue {
* @throws JobQueueError
*/
final public function getSize() {
- wfProfileIn( __METHOD__ );
$res = $this->doGetSize();
- wfProfileOut( __METHOD__ );
return $res;
}
@@ -233,9 +223,7 @@ abstract class JobQueue {
* @throws JobQueueError
*/
final public function getAcquiredCount() {
- wfProfileIn( __METHOD__ );
$res = $this->doGetAcquiredCount();
- wfProfileOut( __METHOD__ );
return $res;
}
@@ -257,9 +245,7 @@ abstract class JobQueue {
* @since 1.22
*/
final public function getDelayedCount() {
- wfProfileIn( __METHOD__ );
$res = $this->doGetDelayedCount();
- wfProfileOut( __METHOD__ );
return $res;
}
@@ -282,9 +268,7 @@ abstract class JobQueue {
* @throws JobQueueError
*/
final public function getAbandonedCount() {
- wfProfileIn( __METHOD__ );
$res = $this->doGetAbandonedCount();
- wfProfileOut( __METHOD__ );
return $res;
}
@@ -308,7 +292,8 @@ abstract class JobQueue {
* @throws JobQueueError
*/
final public function push( $jobs, $flags = 0 ) {
- $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags );
+ $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
+ $this->batchPush( $jobs, $flags );
}
/**
@@ -330,15 +315,14 @@ abstract class JobQueue {
if ( $job->getType() !== $this->type ) {
throw new MWException(
"Got '{$job->getType()}' job; expected a '{$this->type}' job." );
- } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) {
+ } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
throw new MWException(
"Got delayed '{$job->getType()}' job; delays are not supported." );
}
}
- wfProfileIn( __METHOD__ );
$this->doBatchPush( $jobs, $flags );
- wfProfileOut( __METHOD__ );
+ $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
}
/**
@@ -366,9 +350,11 @@ abstract class JobQueue {
throw new MWException( "Unrecognized job type '{$this->type}'." );
}
- wfProfileIn( __METHOD__ );
$job = $this->doPop();
- wfProfileOut( __METHOD__ );
+
+ if ( !$job ) {
+ $this->aggr->notifyQueueEmpty( $this->wiki, $this->type );
+ }
// Flag this job as an old duplicate based on its "root" job...
try {
@@ -376,7 +362,7 @@ abstract class JobQueue {
JobQueue::incrStats( 'job-pop-duplicate', $this->type, 1, $this->wiki );
$job = DuplicateJob::newFromJob( $job ); // convert to a no-op
}
- } catch ( MWException $e ) {
+ } catch ( Exception $e ) {
// don't lose jobs over this
}
@@ -403,9 +389,7 @@ abstract class JobQueue {
if ( $job->getType() !== $this->type ) {
throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
}
- wfProfileIn( __METHOD__ );
$this->doAck( $job );
- wfProfileOut( __METHOD__ );
}
/**
@@ -449,9 +433,7 @@ abstract class JobQueue {
if ( $job->getType() !== $this->type ) {
throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
}
- wfProfileIn( __METHOD__ );
$ok = $this->doDeduplicateRootJob( $job );
- wfProfileOut( __METHOD__ );
return $ok;
}
@@ -494,9 +476,7 @@ abstract class JobQueue {
if ( $job->getType() !== $this->type ) {
throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
}
- wfProfileIn( __METHOD__ );
$isDuplicate = $this->doIsRootJobOldDuplicate( $job );
- wfProfileOut( __METHOD__ );
return $isDuplicate;
}
@@ -538,9 +518,7 @@ abstract class JobQueue {
* @return void
*/
final public function delete() {
- wfProfileIn( __METHOD__ );
$this->doDelete();
- wfProfileOut( __METHOD__ );
}
/**
@@ -560,9 +538,7 @@ abstract class JobQueue {
* @throws JobQueueError
*/
final public function waitForBackups() {
- wfProfileIn( __METHOD__ );
$this->doWaitForBackups();
- wfProfileOut( __METHOD__ );
}
/**
@@ -607,9 +583,7 @@ abstract class JobQueue {
* @return void
*/
final public function flushCaches() {
- wfProfileIn( __METHOD__ );
$this->doFlushCaches();
- wfProfileOut( __METHOD__ );
}
/**
@@ -642,6 +616,17 @@ abstract class JobQueue {
}
/**
+ * Get an iterator to traverse over all abandoned jobs in this queue
+ *
+ * @return Iterator
+ * @throws JobQueueError
+ * @since 1.25
+ */
+ public function getAllAbandonedJobs() {
+ return new ArrayIterator( array() ); // not implemented
+ }
+
+ /**
* Do not use this function outside of JobQueue/JobQueueGroup
*
* @return string
@@ -661,7 +646,6 @@ abstract class JobQueue {
* @since 1.22
*/
final public function getSiblingQueuesWithJobs( array $types ) {
- $section = new ProfileSection( __METHOD__ );
return $this->doGetSiblingQueuesWithJobs( $types );
}
@@ -686,7 +670,6 @@ abstract class JobQueue {
* @since 1.22
*/
final public function getSiblingQueueSizes( array $types ) {
- $section = new ProfileSection( __METHOD__ );
return $this->doGetSiblingQueueSizes( $types );
}
diff --git a/includes/jobqueue/JobQueueDB.php b/includes/jobqueue/JobQueueDB.php
index 08873cc1..d5f47ffd 100644
--- a/includes/jobqueue/JobQueueDB.php
+++ b/includes/jobqueue/JobQueueDB.php
@@ -221,7 +221,7 @@ class JobQueueDB extends JobQueue {
}
$rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated
- $rowList = array(); // list of jobs for jobs that are are not de-duplicated
+ $rowList = array(); // list of jobs for jobs that are not de-duplicated
foreach ( $jobs as $job ) {
$row = $this->insertFields( $job );
if ( $job->ignoreDuplicates() ) {
@@ -556,7 +556,7 @@ class JobQueueDB extends JobQueue {
* @return void
*/
protected function doWaitForBackups() {
- wfWaitForSlaves();
+ wfWaitForSlaves( false, $this->wiki, $this->cluster ?: false );
}
/**
@@ -686,7 +686,9 @@ class JobQueueDB extends JobQueue {
$affected = $dbw->affectedRows();
$count += $affected;
JobQueue::incrStats( 'job-recycle', $this->type, $affected, $this->wiki );
+ // The tasks recycled jobs or release delayed jobs into the queue
$this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
+ $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
}
}
diff --git a/includes/jobqueue/JobQueueFederated.php b/includes/jobqueue/JobQueueFederated.php
index c4301eed..d985d449 100644
--- a/includes/jobqueue/JobQueueFederated.php
+++ b/includes/jobqueue/JobQueueFederated.php
@@ -49,20 +49,12 @@
class JobQueueFederated extends JobQueue {
/** @var HashRing */
protected $partitionRing;
- /** @var HashRing */
- protected $partitionPushRing;
/** @var array (partition name => JobQueue) reverse sorted by weight */
protected $partitionQueues = array();
- /** @var BagOStuff */
- protected $cache;
-
/** @var int Maximum number of partitions to try */
protected $maxPartitionsTry;
- const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
- const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
-
/**
* @param array $params Possible keys:
* - sectionsByWiki : A map of wiki IDs to section names.
@@ -72,10 +64,8 @@ class JobQueueFederated extends JobQueue {
* have explicitly defined sections.
* - configByPartition : Map of queue partition names to configuration arrays.
* These configuration arrays are passed to JobQueue::factory().
- * The options set here are overriden by those passed to this
+ * The options set here are overridden by those passed to this
* the federated queue itself (e.g. 'order' and 'claimTTL').
- * - partitionsNoPush : List of partition names that can handle pop() but not push().
- * This can be used to migrate away from a certain partition.
* - maxPartitionsTry : Maximum number of times to attempt job insertion using
* different partition queues. This improves availability
* during failure, at the cost of added latency and somewhat
@@ -96,17 +86,10 @@ class JobQueueFederated extends JobQueue {
// Get the full partition map
$partitionMap = $params['partitionsBySection'][$section];
arsort( $partitionMap, SORT_NUMERIC );
- // Get the partitions jobs can actually be pushed to
- $partitionPushMap = $partitionMap;
- if ( isset( $params['partitionsNoPush'] ) ) {
- foreach ( $params['partitionsNoPush'] as $partition ) {
- unset( $partitionPushMap[$partition] );
- }
- }
// Get the config to pass to merge into each partition queue config
$baseConfig = $params;
foreach ( array( 'class', 'sectionsByWiki', 'maxPartitionsTry',
- 'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o
+ 'partitionsBySection', 'configByPartition', ) as $o
) {
unset( $baseConfig[$o] ); // partition queue doesn't care about this
}
@@ -120,14 +103,6 @@ class JobQueueFederated extends JobQueue {
}
// Ring of all partitions
$this->partitionRing = new HashRing( $partitionMap );
- // Get the ring of partitions to push jobs into
- if ( count( $partitionPushMap ) === count( $partitionMap ) ) {
- $this->partitionPushRing = clone $this->partitionRing; // faster
- } else {
- $this->partitionPushRing = new HashRing( $partitionPushMap );
- }
- // Aggregate cache some per-queue values if there are multiple partition queues
- $this->cache = count( $partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff();
}
protected function supportedOrders() {
@@ -140,19 +115,16 @@ class JobQueueFederated extends JobQueue {
}
protected function supportsDelayedJobs() {
- return true; // defer checks to the partitions
+ foreach ( $this->partitionQueues as $queue ) {
+ if ( !$queue->supportsDelayedJobs() ) {
+ return false;
+ }
+ }
+
+ return true;
}
protected function doIsEmpty() {
- $key = $this->getCacheKey( 'empty' );
-
- $isEmpty = $this->cache->get( $key );
- if ( $isEmpty === 'true' ) {
- return true;
- } elseif ( $isEmpty === 'false' ) {
- return false;
- }
-
$empty = true;
$failed = 0;
foreach ( $this->partitionQueues as $queue ) {
@@ -160,12 +132,11 @@ class JobQueueFederated extends JobQueue {
$empty = $empty && $queue->doIsEmpty();
} catch ( JobQueueError $e ) {
++$failed;
- MWExceptionHandler::logException( $e );
+ $this->logException( $e );
}
}
$this->throwErrorIfAllPartitionsDown( $failed );
- $this->cache->add( $key, $empty ? 'true' : 'false', self::CACHE_TTL_LONG );
return $empty;
}
@@ -191,32 +162,24 @@ class JobQueueFederated extends JobQueue {
* @return int
*/
protected function getCrossPartitionSum( $type, $method ) {
- $key = $this->getCacheKey( $type );
-
- $count = $this->cache->get( $key );
- if ( $count !== false ) {
- return $count;
- }
-
+ $count = 0;
$failed = 0;
foreach ( $this->partitionQueues as $queue ) {
try {
$count += $queue->$method();
} catch ( JobQueueError $e ) {
++$failed;
- MWExceptionHandler::logException( $e );
+ $this->logException( $e );
}
}
$this->throwErrorIfAllPartitionsDown( $failed );
- $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
-
return $count;
}
protected function doBatchPush( array $jobs, $flags ) {
// Local ring variable that may be changed to point to a new ring on failure
- $partitionRing = $this->partitionPushRing;
+ $partitionRing = $this->partitionRing;
// Try to insert the jobs and update $partitionsTry on any failures.
// Retry to insert any remaning jobs again, ignoring the bad partitions.
$jobsLeft = $jobs;
@@ -277,12 +240,9 @@ class JobQueueFederated extends JobQueue {
$queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
} catch ( JobQueueError $e ) {
$ok = false;
- MWExceptionHandler::logException( $e );
+ $this->logException( $e );
}
- if ( $ok ) {
- $key = $this->getCacheKey( 'empty' );
- $this->cache->set( $key, 'false', self::CACHE_TTL_LONG );
- } else {
+ if ( !$ok ) {
if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist
throw new JobQueueError( "Could not insert job(s), no partitions available." );
}
@@ -299,12 +259,9 @@ class JobQueueFederated extends JobQueue {
$queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
} catch ( JobQueueError $e ) {
$ok = false;
- MWExceptionHandler::logException( $e );
+ $this->logException( $e );
}
- if ( $ok ) {
- $key = $this->getCacheKey( 'empty' );
- $this->cache->set( $key, 'false', self::CACHE_TTL_LONG );
- } else {
+ if ( !$ok ) {
if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist
throw new JobQueueError( "Could not insert job(s), no partitions available." );
}
@@ -331,7 +288,7 @@ class JobQueueFederated extends JobQueue {
$job = $queue->pop();
} catch ( JobQueueError $e ) {
++$failed;
- MWExceptionHandler::logException( $e );
+ $this->logException( $e );
$job = false;
}
if ( $job ) {
@@ -344,9 +301,6 @@ class JobQueueFederated extends JobQueue {
}
$this->throwErrorIfAllPartitionsDown( $failed );
- $key = $this->getCacheKey( 'empty' );
- $this->cache->set( $key, 'true', self::CACHE_TTL_LONG );
-
return false;
}
@@ -361,12 +315,12 @@ class JobQueueFederated extends JobQueue {
protected function doIsRootJobOldDuplicate( Job $job ) {
$params = $job->getRootJobParams();
$sigature = $params['rootJobSignature'];
- $partition = $this->partitionPushRing->getLiveLocation( $sigature );
+ $partition = $this->partitionRing->getLiveLocation( $sigature );
try {
return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
} catch ( JobQueueError $e ) {
- if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) {
- $partition = $this->partitionPushRing->getLiveLocation( $sigature );
+ if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
+ $partition = $this->partitionRing->getLiveLocation( $sigature );
return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
}
}
@@ -377,12 +331,12 @@ class JobQueueFederated extends JobQueue {
protected function doDeduplicateRootJob( Job $job ) {
$params = $job->getRootJobParams();
$sigature = $params['rootJobSignature'];
- $partition = $this->partitionPushRing->getLiveLocation( $sigature );
+ $partition = $this->partitionRing->getLiveLocation( $sigature );
try {
return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
} catch ( JobQueueError $e ) {
- if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) {
- $partition = $this->partitionPushRing->getLiveLocation( $sigature );
+ if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
+ $partition = $this->partitionRing->getLiveLocation( $sigature );
return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
}
}
@@ -398,7 +352,7 @@ class JobQueueFederated extends JobQueue {
$queue->doDelete();
} catch ( JobQueueError $e ) {
++$failed;
- MWExceptionHandler::logException( $e );
+ $this->logException( $e );
}
}
$this->throwErrorIfAllPartitionsDown( $failed );
@@ -413,7 +367,7 @@ class JobQueueFederated extends JobQueue {
$queue->waitForBackups();
} catch ( JobQueueError $e ) {
++$failed;
- MWExceptionHandler::logException( $e );
+ $this->logException( $e );
}
}
$this->throwErrorIfAllPartitionsDown( $failed );
@@ -440,10 +394,6 @@ class JobQueueFederated extends JobQueue {
'abandonedcount'
);
- foreach ( $types as $type ) {
- $this->cache->delete( $this->getCacheKey( $type ) );
- }
-
/** @var JobQueue $queue */
foreach ( $this->partitionQueues as $queue ) {
$queue->doFlushCaches();
@@ -472,6 +422,17 @@ class JobQueueFederated extends JobQueue {
return $iterator;
}
+ public function getAllAbandonedJobs() {
+ $iterator = new AppendIterator();
+
+ /** @var JobQueue $queue */
+ foreach ( $this->partitionQueues as $queue ) {
+ $iterator->append( $queue->getAllAbandonedJobs() );
+ }
+
+ return $iterator;
+ }
+
public function getCoalesceLocationInternal() {
return "JobQueueFederated:wiki:{$this->wiki}" .
sha1( serialize( array_keys( $this->partitionQueues ) ) );
@@ -495,7 +456,7 @@ class JobQueueFederated extends JobQueue {
}
} catch ( JobQueueError $e ) {
++$failed;
- MWExceptionHandler::logException( $e );
+ $this->logException( $e );
}
}
$this->throwErrorIfAllPartitionsDown( $failed );
@@ -519,7 +480,7 @@ class JobQueueFederated extends JobQueue {
}
} catch ( JobQueueError $e ) {
++$failed;
- MWExceptionHandler::logException( $e );
+ $this->logException( $e );
}
}
$this->throwErrorIfAllPartitionsDown( $failed );
@@ -527,6 +488,10 @@ class JobQueueFederated extends JobQueue {
return $result;
}
+ protected function logException( Exception $e ) {
+ wfDebugLog( 'JobQueueFederated', $e->getMessage() . "\n" . $e->getTraceAsString() );
+ }
+
/**
* Throw an error if no partitions available
*
@@ -546,14 +511,4 @@ class JobQueueFederated extends JobQueue {
$queue->setTestingPrefix( $key );
}
}
-
- /**
- * @param string $property
- * @return string
- */
- private function getCacheKey( $property ) {
- list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
-
- return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property );
- }
}
diff --git a/includes/jobqueue/JobQueueGroup.php b/includes/jobqueue/JobQueueGroup.php
index 98a78c5e..ebd547a0 100644
--- a/includes/jobqueue/JobQueueGroup.php
+++ b/includes/jobqueue/JobQueueGroup.php
@@ -94,6 +94,7 @@ class JobQueueGroup {
} else {
$conf = $conf + $wgJobTypeConf['default'];
}
+ $conf['aggregator'] = JobQueueAggregator::singleton();
return JobQueue::factory( $conf );
}
@@ -104,7 +105,7 @@ class JobQueueGroup {
* This inserts the jobs into the queue specified by $wgJobTypeConf
* and updates the aggregate job queue information cache as needed.
*
- * @param Job|array $jobs A single Job or a list of Jobs
+ * @param Job|Job[] $jobs A single Job or a list of Jobs
* @throws MWException
* @return void
*/
@@ -125,7 +126,6 @@ class JobQueueGroup {
foreach ( $jobsByType as $type => $jobs ) {
$this->get( $type )->push( $jobs );
- JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
}
if ( $this->cache->has( 'queues-ready', 'list' ) ) {
@@ -153,9 +153,6 @@ class JobQueueGroup {
if ( is_string( $qtype ) ) { // specific job type
if ( !in_array( $qtype, $blacklist ) ) {
$job = $this->get( $qtype )->pop();
- if ( !$job ) {
- JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype );
- }
}
} else { // any job in the "default" jobs types
if ( $flags & self::USE_CACHE ) {
@@ -179,7 +176,6 @@ class JobQueueGroup {
if ( $job ) { // found
break;
} else { // not found
- JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type );
$this->cache->clear( 'queues-ready' );
}
}
@@ -220,12 +216,10 @@ class JobQueueGroup {
public function waitForBackups() {
global $wgJobTypeConf;
- wfProfileIn( __METHOD__ );
// Try to avoid doing this more than once per queue storage medium
foreach ( $wgJobTypeConf as $type => $conf ) {
$this->get( $type )->waitForBackups();
}
- wfProfileOut( __METHOD__ );
}
/**
@@ -383,10 +377,6 @@ class JobQueueGroup {
}
}
}
- // The tasks may have recycled jobs or release delayed jobs into the queue
- if ( isset( $tasksRun[$type] ) && !$queue->isEmpty() ) {
- JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
- }
}
if ( $count === 0 ) {
diff --git a/includes/jobqueue/JobQueueRedis.php b/includes/jobqueue/JobQueueRedis.php
index 3519eac8..6c823fb9 100644
--- a/includes/jobqueue/JobQueueRedis.php
+++ b/includes/jobqueue/JobQueueRedis.php
@@ -24,7 +24,7 @@
/**
* Class to handle job queues stored in Redis
*
- * This is faster, less resource intensive, queue that JobQueueDB.
+ * This is a faster and less resource-intensive job queue than JobQueueDB.
* All data for a queue using this class is placed into one redis server.
*
* There are eight main redis keys used to track jobs:
@@ -49,7 +49,7 @@
*
* This class requires Redis 2.6 as it makes use Lua scripts for fast atomic operations.
* Additionally, it should be noted that redis has different persistence modes, such
- * as rdb snapshots, journaling, and no persistent. Appropriate configuration should be
+ * as rdb snapshots, journaling, and no persistence. Appropriate configuration should be
* made on the servers based on what queues are using it and what tolerance they have.
*
* @ingroup JobQueue
@@ -64,8 +64,6 @@ class JobQueueRedis extends JobQueue {
protected $server;
/** @var string Compression method to use */
protected $compression;
- /** @var bool */
- protected $daemonized;
const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days)
@@ -90,7 +88,11 @@ class JobQueueRedis extends JobQueue {
$this->server = $params['redisServer'];
$this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none';
$this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
- $this->daemonized = !empty( $params['daemonized'] );
+ if ( empty( $params['daemonized'] ) ) {
+ throw new Exception(
+ "Non-daemonized mode is no longer supported. Please install the " .
+ "mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." );
+ }
}
protected function supportedOrders() {
@@ -134,9 +136,6 @@ class JobQueueRedis extends JobQueue {
* @throws JobQueueError
*/
protected function doGetAcquiredCount() {
- if ( $this->claimTTL <= 0 ) {
- return 0; // no acknowledgements
- }
$conn = $this->getConnection();
try {
$conn->multi( Redis::PIPELINE );
@@ -155,9 +154,6 @@ class JobQueueRedis extends JobQueue {
* @throws JobQueueError
*/
protected function doGetDelayedCount() {
- if ( !$this->checkDelay ) {
- return 0; // no delayed jobs
- }
$conn = $this->getConnection();
try {
return $conn->zSize( $this->getQueueKey( 'z-delayed' ) );
@@ -172,9 +168,6 @@ class JobQueueRedis extends JobQueue {
* @throws JobQueueError
*/
protected function doGetAbandonedCount() {
- if ( $this->claimTTL <= 0 ) {
- return 0; // no acknowledgements
- }
$conn = $this->getConnection();
try {
return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) );
@@ -299,24 +292,10 @@ LUA;
protected function doPop() {
$job = false;
- // Push ready delayed jobs into the queue every 10 jobs to spread the load.
- // This is also done as a periodic task, but we don't want too much done at once.
- if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) {
- $this->recyclePruneAndUndelayJobs();
- }
-
$conn = $this->getConnection();
try {
do {
- if ( $this->claimTTL > 0 ) {
- // Keep the claimed job list down for high-traffic queues
- if ( mt_rand( 0, 99 ) == 0 ) {
- $this->recyclePruneAndUndelayJobs();
- }
- $blob = $this->popAndAcquireBlob( $conn );
- } else {
- $blob = $this->popAndDeleteBlob( $conn );
- }
+ $blob = $this->popAndAcquireBlob( $conn );
if ( !is_string( $blob ) ) {
break; // no jobs; nothing to do
}
@@ -328,7 +307,7 @@ LUA;
continue;
}
- // If $item is invalid, recyclePruneAndUndelayJobs() will cleanup as needed
+ // If $item is invalid, the runner loop recyling will cleanup as needed
$job = $this->getJobFromFields( $item ); // may be false
} while ( !$job ); // job may be false if invalid
} catch ( RedisException $e ) {
@@ -343,39 +322,6 @@ LUA;
* @return array Serialized string or false
* @throws RedisException
*/
- protected function popAndDeleteBlob( RedisConnRef $conn ) {
- static $script =
-<<<LUA
- local kUnclaimed, kSha1ById, kIdBySha1, kData = unpack(KEYS)
- -- Pop an item off the queue
- local id = redis.call('rpop',kUnclaimed)
- if not id then return false end
- -- Get the job data and remove it
- local item = redis.call('hGet',kData,id)
- redis.call('hDel',kData,id)
- -- Allow new duplicates of this job
- local sha1 = redis.call('hGet',kSha1ById,id)
- if sha1 then redis.call('hDel',kIdBySha1,sha1) end
- redis.call('hDel',kSha1ById,id)
- -- Return the job data
- return item
-LUA;
- return $conn->luaEval( $script,
- array(
- $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
- $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
- $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
- $this->getQueueKey( 'h-data' ), # KEYS[4]
- ),
- 4 # number of first argument(s) that are keys
- );
- }
-
- /**
- * @param RedisConnRef $conn
- * @return array Serialized string or false
- * @throws RedisException
- */
protected function popAndAcquireBlob( RedisConnRef $conn ) {
static $script =
<<<LUA
@@ -416,36 +362,35 @@ LUA;
if ( !isset( $job->metadata['uuid'] ) ) {
throw new MWException( "Job of type '{$job->getType()}' has no UUID." );
}
- if ( $this->claimTTL > 0 ) {
- $conn = $this->getConnection();
- try {
- static $script =
+
+ $conn = $this->getConnection();
+ try {
+ static $script =
<<<LUA
- local kClaimed, kAttempts, kData = unpack(KEYS)
- -- Unmark the job as claimed
- redis.call('zRem',kClaimed,ARGV[1])
- redis.call('hDel',kAttempts,ARGV[1])
- -- Delete the job data itself
- return redis.call('hDel',kData,ARGV[1])
+ local kClaimed, kAttempts, kData = unpack(KEYS)
+ -- Unmark the job as claimed
+ redis.call('zRem',kClaimed,ARGV[1])
+ redis.call('hDel',kAttempts,ARGV[1])
+ -- Delete the job data itself
+ return redis.call('hDel',kData,ARGV[1])
LUA;
- $res = $conn->luaEval( $script,
- array(
- $this->getQueueKey( 'z-claimed' ), # KEYS[1]
- $this->getQueueKey( 'h-attempts' ), # KEYS[2]
- $this->getQueueKey( 'h-data' ), # KEYS[3]
- $job->metadata['uuid'] # ARGV[1]
- ),
- 3 # number of first argument(s) that are keys
- );
-
- if ( !$res ) {
- wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." );
-
- return false;
- }
- } catch ( RedisException $e ) {
- $this->throwRedisException( $conn, $e );
+ $res = $conn->luaEval( $script,
+ array(
+ $this->getQueueKey( 'z-claimed' ), # KEYS[1]
+ $this->getQueueKey( 'h-attempts' ), # KEYS[2]
+ $this->getQueueKey( 'h-data' ), # KEYS[3]
+ $job->metadata['uuid'] # ARGV[1]
+ ),
+ 3 # number of first argument(s) that are keys
+ );
+
+ if ( !$res ) {
+ wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." );
+
+ return false;
}
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
}
return true;
@@ -571,6 +516,29 @@ LUA;
}
}
+ /**
+ * @see JobQueue::getAllAbandonedJobs()
+ * @return Iterator
+ */
+ public function getAllAbandonedJobs() {
+ $conn = $this->getConnection();
+ try {
+ $that = $this;
+
+ return new MappedIterator( // delayed jobs
+ $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 ),
+ function ( $uid ) use ( $that, $conn ) {
+ return $that->getJobFromUidInternal( $uid, $conn );
+ },
+ array( 'accept' => function ( $job ) {
+ return is_object( $job );
+ } )
+ );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+ }
+
public function getCoalesceLocationInternal() {
return "RedisServer:" . $this->server;
}
@@ -630,115 +598,10 @@ LUA;
}
/**
- * Recycle or destroy any jobs that have been claimed for too long
- * and release any ready delayed jobs into the queue
- *
- * @return int Number of jobs recycled/deleted/undelayed
- * @throws MWException|JobQueueError
- */
- public function recyclePruneAndUndelayJobs() {
- $count = 0;
- // For each job item that can be retried, we need to add it back to the
- // main queue and remove it from the list of currenty claimed job items.
- // For those that cannot, they are marked as dead and kept around for
- // investigation and manual job restoration but are eventually deleted.
- $conn = $this->getConnection();
- try {
- $now = time();
- static $script =
-<<<LUA
- local kClaimed, kAttempts, kUnclaimed, kData, kAbandoned, kDelayed = unpack(KEYS)
- local released,abandoned,pruned,undelayed = 0,0,0,0
- -- Get all non-dead jobs that have an expired claim on them.
- -- The score for each item is the last claim timestamp (UNIX).
- local staleClaims = redis.call('zRangeByScore',kClaimed,0,ARGV[1])
- for k,id in ipairs(staleClaims) do
- local timestamp = redis.call('zScore',kClaimed,id)
- local attempts = redis.call('hGet',kAttempts,id)
- if attempts < ARGV[3] then
- -- Claim expired and retries left: re-enqueue the job
- redis.call('lPush',kUnclaimed,id)
- redis.call('hIncrBy',kAttempts,id,1)
- released = released + 1
- else
- -- Claim expired and no retries left: mark the job as dead
- redis.call('zAdd',kAbandoned,timestamp,id)
- abandoned = abandoned + 1
- end
- redis.call('zRem',kClaimed,id)
- end
- -- Get all of the dead jobs that have been marked as dead for too long.
- -- The score for each item is the last claim timestamp (UNIX).
- local deadClaims = redis.call('zRangeByScore',kAbandoned,0,ARGV[2])
- for k,id in ipairs(deadClaims) do
- -- Stale and out of retries: remove any traces of the job
- redis.call('zRem',kAbandoned,id)
- redis.call('hDel',kAttempts,id)
- redis.call('hDel',kData,id)
- pruned = pruned + 1
- end
- -- Get the list of ready delayed jobs, sorted by readiness (UNIX timestamp)
- local ids = redis.call('zRangeByScore',kDelayed,0,ARGV[4])
- -- Migrate the jobs from the "delayed" set to the "unclaimed" list
- for k,id in ipairs(ids) do
- redis.call('lPush',kUnclaimed,id)
- redis.call('zRem',kDelayed,id)
- end
- undelayed = #ids
- return {released,abandoned,pruned,undelayed}
-LUA;
- $res = $conn->luaEval( $script,
- array(
- $this->getQueueKey( 'z-claimed' ), # KEYS[1]
- $this->getQueueKey( 'h-attempts' ), # KEYS[2]
- $this->getQueueKey( 'l-unclaimed' ), # KEYS[3]
- $this->getQueueKey( 'h-data' ), # KEYS[4]
- $this->getQueueKey( 'z-abandoned' ), # KEYS[5]
- $this->getQueueKey( 'z-delayed' ), # KEYS[6]
- $now - $this->claimTTL, # ARGV[1]
- $now - self::MAX_AGE_PRUNE, # ARGV[2]
- $this->maxTries, # ARGV[3]
- $now # ARGV[4]
- ),
- 6 # number of first argument(s) that are keys
- );
- if ( $res ) {
- list( $released, $abandoned, $pruned, $undelayed ) = $res;
- $count += $released + $pruned + $undelayed;
- JobQueue::incrStats( 'job-recycle', $this->type, $released, $this->wiki );
- JobQueue::incrStats( 'job-abandon', $this->type, $abandoned, $this->wiki );
- JobQueue::incrStats( 'job-undelay', $this->type, $undelayed, $this->wiki );
- }
- } catch ( RedisException $e ) {
- $this->throwRedisException( $conn, $e );
- }
-
- return $count;
- }
-
- /**
* @return array
*/
protected function doGetPeriodicTasks() {
- if ( $this->daemonized ) {
- return array(); // managed in the runner loop
- }
- $periods = array( 3600 ); // standard cleanup (useful on config change)
- if ( $this->claimTTL > 0 ) {
- $periods[] = ceil( $this->claimTTL / 2 ); // avoid bad timing
- }
- if ( $this->checkDelay ) {
- $periods[] = 300; // 5 minutes
- }
- $period = min( $periods );
- $period = max( $period, 30 ); // sanity
-
- return array(
- 'recyclePruneAndUndelayJobs' => array(
- 'callback' => array( $this, 'recyclePruneAndUndelayJobs' ),
- 'period' => $period,
- )
- );
+ return array(); // managed in the runner loop
}
/**
diff --git a/includes/jobqueue/JobRunner.php b/includes/jobqueue/JobRunner.php
index 8cccedaf..b8c5d6cf 100644
--- a/includes/jobqueue/JobRunner.php
+++ b/includes/jobqueue/JobRunner.php
@@ -21,13 +21,17 @@
* @ingroup JobQueue
*/
+use MediaWiki\Logger\LoggerFactory;
+use Psr\Log\LoggerAwareInterface;
+use Psr\Log\LoggerInterface;
+
/**
* Job queue runner utility methods
*
* @ingroup JobQueue
* @since 1.24
*/
-class JobRunner {
+class JobRunner implements LoggerAwareInterface {
/** @var callable|null Debug output handler */
protected $debug;
@@ -39,6 +43,28 @@ class JobRunner {
}
/**
+ * @var LoggerInterface $logger
+ */
+ protected $logger;
+
+ /**
+ * @param LoggerInterface $logger
+ */
+ public function setLogger( LoggerInterface $logger ) {
+ $this->logger = $logger;
+ }
+
+ /**
+ * @param LoggerInterface $logger
+ */
+ public function __construct( LoggerInterface $logger = null ) {
+ if ( $logger === null ) {
+ $logger = LoggerFactory::getInstance( 'runJobs' );
+ }
+ $this->setLogger( $logger );
+ }
+
+ /**
* Run jobs of the specified number/type for the specified time
*
* The response map has a 'job' field that lists status of each job, including:
@@ -62,6 +88,8 @@ class JobRunner {
* @return array Summary response that can easily be JSON serialized
*/
public function run( array $options ) {
+ global $wgJobClasses;
+
$response = array( 'jobs' => array(), 'reached' => 'none-ready' );
$type = isset( $options['type'] ) ? $options['type'] : false;
@@ -69,11 +97,31 @@ class JobRunner {
$maxTime = isset( $options['maxTime'] ) ? $options['maxTime'] : false;
$noThrottle = isset( $options['throttle'] ) && !$options['throttle'];
+ if ( $type !== false && !isset( $wgJobClasses[$type] ) ) {
+ $response['reached'] = 'none-possible';
+ return $response;
+ }
+
$group = JobQueueGroup::singleton();
// Handle any required periodic queue maintenance
$count = $group->executeReadyPeriodicTasks();
if ( $count > 0 ) {
- $this->runJobsLog( "Executed $count periodic queue task(s)." );
+ $msg = "Executed $count periodic queue task(s).";
+ $this->logger->debug( $msg );
+ $this->debugCallback( $msg );
+ }
+
+ // Bail out if in read-only mode
+ if ( wfReadOnly() ) {
+ $response['reached'] = 'read-only';
+ return $response;
+ }
+
+ // Bail out if there is too much DB lag
+ list( , $maxLag ) = wfGetLBFactory()->getMainLB( wfWikiID() )->getMaxLag();
+ if ( $maxLag >= 5 ) {
+ $response['reached'] = 'slave-lag-limit';
+ return $response;
}
// Flush any pending DB writes for sanity
@@ -87,8 +135,10 @@ class JobRunner {
$jobsRun = 0;
$timeMsTotal = 0;
$flags = JobQueueGroup::USE_CACHE;
+ $checkPeriod = 5.0; // seconds
+ $checkPhase = mt_rand( 0, 1000 * $checkPeriod ) / 1000; // avoid stampedes
$startTime = microtime( true ); // time since jobs started running
- $lastTime = microtime( true ); // time since last slave check
+ $lastTime = microtime( true ) - $checkPhase; // time since last slave check
do {
// Sync the persistent backoffs with concurrent runners
$backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
@@ -117,24 +167,24 @@ class JobRunner {
$backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
}
- $this->runJobsLog( $job->toString() . " STARTING" );
+ $msg = $job->toString() . " STARTING";
+ $this->logger->info( $msg );
+ $this->debugCallback( $msg );
// Run the job...
- wfProfileIn( __METHOD__ . '-' . get_class( $job ) );
$jobStartTime = microtime( true );
try {
++$jobsRun;
$status = $job->run();
$error = $job->getLastError();
wfGetLBFactory()->commitMasterChanges();
- } catch ( MWException $e ) {
+ } catch ( Exception $e ) {
MWExceptionHandler::rollbackMasterChangesAndLog( $e );
$status = false;
$error = get_class( $e ) . ': ' . $e->getMessage();
MWExceptionHandler::logException( $e );
}
$timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 );
- wfProfileOut( __METHOD__ . '-' . get_class( $job ) );
$timeMsTotal += $timeMs;
// Mark the job as done on success or when the job cannot be retried
@@ -151,9 +201,13 @@ class JobRunner {
}
if ( $status === false ) {
- $this->runJobsLog( $job->toString() . " t=$timeMs error={$error}" );
+ $msg = $job->toString() . " t=$timeMs error={$error}";
+ $this->logger->error( $msg );
+ $this->debugCallback( $msg );
} else {
- $this->runJobsLog( $job->toString() . " t=$timeMs good" );
+ $msg = $job->toString() . " t=$timeMs good";
+ $this->logger->info( $msg );
+ $this->debugCallback( $msg );
}
$response['jobs'][] = array(
@@ -172,10 +226,15 @@ class JobRunner {
break;
}
- // Don't let any of the main DB slaves get backed up
+ // Don't let any of the main DB slaves get backed up.
+ // This only waits for so long before exiting and letting
+ // other wikis in the farm (on different masters) get a chance.
$timePassed = microtime( true ) - $lastTime;
if ( $timePassed >= 5 || $timePassed < 0 ) {
- wfWaitForSlaves( $lastTime );
+ if ( !wfWaitForSlaves( $lastTime, false, '*', 5 ) ) {
+ $response['reached'] = 'slave-lag-limit';
+ break;
+ }
$lastTime = microtime( true );
}
// Don't let any queue slaves/backups fall behind
@@ -239,7 +298,6 @@ class JobRunner {
* @return array Map of (job type => backoff expiry timestamp)
*/
private function loadBackoffs( array $backoffs, $mode = 'wait' ) {
- $section = new ProfileSection( __METHOD__ );
$file = wfTempDir() . '/mw-runJobs-backoffs.json';
if ( is_file( $file ) ) {
@@ -278,7 +336,6 @@ class JobRunner {
* @return array The new backoffs account for $backoffs and the latest file data
*/
private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) {
- $section = new ProfileSection( __METHOD__ );
if ( !$deltas ) {
return $this->loadBackoffs( $backoffs, $mode );
@@ -341,10 +398,9 @@ class JobRunner {
* Log the job message
* @param string $msg The message to log
*/
- private function runJobsLog( $msg ) {
+ private function debugCallback( $msg ) {
if ( $this->debug ) {
call_user_func_array( $this->debug, array( wfTimestamp( TS_DB ) . " $msg\n" ) );
}
- wfDebugLog( 'runJobs', $msg );
}
}
diff --git a/includes/jobqueue/JobSpecification.php b/includes/jobqueue/JobSpecification.php
index 9fa7747f..42d2a39b 100644
--- a/includes/jobqueue/JobSpecification.php
+++ b/includes/jobqueue/JobSpecification.php
@@ -91,8 +91,8 @@ class JobSpecification implements IJobSpecification {
/** @var Title */
protected $title;
- /** @var bool Expensive jobs may set this to true */
- protected $ignoreDuplicates;
+ /** @var array */
+ protected $opts;
/**
* @param string $type
@@ -104,11 +104,12 @@ class JobSpecification implements IJobSpecification {
$type, array $params, array $opts = array(), Title $title = null
) {
$this->validateParams( $params );
+ $this->validateParams( $opts );
$this->type = $type;
$this->params = $params;
$this->title = $title ?: Title::newMainPage();
- $this->ignoreDuplicates = !empty( $opts['removeDuplicates'] );
+ $this->opts = $opts;
}
/**
@@ -158,7 +159,7 @@ class JobSpecification implements IJobSpecification {
* @return bool Whether only one of each identical set of jobs should be run
*/
public function ignoreDuplicates() {
- return $this->ignoreDuplicates;
+ return !empty( $this->opts['removeDuplicates'] );
}
/**
@@ -186,4 +187,31 @@ class JobSpecification implements IJobSpecification {
return $info;
}
+
+ /**
+ * @return array Field/value map that can immediately be serialized
+ * @since 1.25
+ */
+ public function toSerializableArray() {
+ return array(
+ 'type' => $this->type,
+ 'params' => $this->params,
+ 'opts' => $this->opts,
+ 'title' => array(
+ 'ns' => $this->title->getNamespace(),
+ 'key' => $this->title->getDbKey()
+ )
+ );
+ }
+
+ /**
+ * @param array $map Field/value map
+ * @return JobSpecification
+ * @since 1.25
+ */
+ public static function newFromArray( array $map ) {
+ $title = Title::makeTitle( $map['title']['ns'], $map['title']['key'] );
+
+ return new self( $map['type'], $map['params'], $map['opts'], $title );
+ }
}
diff --git a/includes/jobqueue/aggregator/JobQueueAggregator.php b/includes/jobqueue/aggregator/JobQueueAggregator.php
index 8600eed9..febc277a 100644
--- a/includes/jobqueue/aggregator/JobQueueAggregator.php
+++ b/includes/jobqueue/aggregator/JobQueueAggregator.php
@@ -34,7 +34,7 @@ abstract class JobQueueAggregator {
/**
* @param array $params
*/
- protected function __construct( array $params ) {
+ public function __construct( array $params ) {
}
/**
@@ -73,9 +73,7 @@ abstract class JobQueueAggregator {
* @return bool Success
*/
final public function notifyQueueEmpty( $wiki, $type ) {
- wfProfileIn( __METHOD__ );
$ok = $this->doNotifyQueueEmpty( $wiki, $type );
- wfProfileOut( __METHOD__ );
return $ok;
}
@@ -93,9 +91,7 @@ abstract class JobQueueAggregator {
* @return bool Success
*/
final public function notifyQueueNonEmpty( $wiki, $type ) {
- wfProfileIn( __METHOD__ );
$ok = $this->doNotifyQueueNonEmpty( $wiki, $type );
- wfProfileOut( __METHOD__ );
return $ok;
}
@@ -111,9 +107,7 @@ abstract class JobQueueAggregator {
* @return array (job type => (list of wiki IDs))
*/
final public function getAllReadyWikiQueues() {
- wfProfileIn( __METHOD__ );
$res = $this->doGetAllReadyWikiQueues();
- wfProfileOut( __METHOD__ );
return $res;
}
@@ -129,9 +123,7 @@ abstract class JobQueueAggregator {
* @return bool Success
*/
final public function purge() {
- wfProfileIn( __METHOD__ );
$res = $this->doPurge();
- wfProfileOut( __METHOD__ );
return $res;
}
@@ -160,3 +152,21 @@ abstract class JobQueueAggregator {
return $pendingDBs;
}
}
+
+class JobQueueAggregatorNull extends JobQueueAggregator {
+ protected function doNotifyQueueEmpty( $wiki, $type ) {
+ return true;
+ }
+
+ protected function doNotifyQueueNonEmpty( $wiki, $type ) {
+ return true;
+ }
+
+ protected function doGetAllReadyWikiQueues() {
+ return array();
+ }
+
+ protected function doPurge() {
+ return true;
+ }
+} \ No newline at end of file
diff --git a/includes/jobqueue/aggregator/JobQueueAggregatorMemc.php b/includes/jobqueue/aggregator/JobQueueAggregatorMemc.php
deleted file mode 100644
index ae266ef3..00000000
--- a/includes/jobqueue/aggregator/JobQueueAggregatorMemc.php
+++ /dev/null
@@ -1,125 +0,0 @@
-<?php
-/**
- * Job queue aggregator code that uses BagOStuff.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- * @author Aaron Schulz
- */
-
-/**
- * Class to handle tracking information about all queues using BagOStuff
- *
- * @ingroup JobQueue
- * @since 1.21
- */
-class JobQueueAggregatorMemc extends JobQueueAggregator {
- /** @var BagOStuff */
- protected $cache;
-
- protected $cacheTTL; // integer; seconds
-
- /**
- * @param array $params Possible keys:
- * - objectCache : Name of an object cache registered in $wgObjectCaches.
- * This defaults to the one specified by $wgMainCacheType.
- * - cacheTTL : Seconds to cache the aggregate data before regenerating.
- */
- protected function __construct( array $params ) {
- parent::__construct( $params );
- $this->cache = isset( $params['objectCache'] )
- ? wfGetCache( $params['objectCache'] )
- : wfGetMainCache();
- $this->cacheTTL = isset( $params['cacheTTL'] ) ? $params['cacheTTL'] : 180; // 3 min
- }
-
- /**
- * @see JobQueueAggregator::doNotifyQueueEmpty()
- */
- protected function doNotifyQueueEmpty( $wiki, $type ) {
- $key = $this->getReadyQueueCacheKey();
- // Delist the queue from the "ready queue" list
- if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock
- $curInfo = $this->cache->get( $key );
- if ( is_array( $curInfo ) && isset( $curInfo['pendingDBs'][$type] ) ) {
- if ( in_array( $wiki, $curInfo['pendingDBs'][$type] ) ) {
- $curInfo['pendingDBs'][$type] = array_diff(
- $curInfo['pendingDBs'][$type], array( $wiki ) );
- $this->cache->set( $key, $curInfo );
- }
- }
- $this->cache->delete( "$key:lock" ); // unlock
- }
-
- return true;
- }
-
- /**
- * @see JobQueueAggregator::doNotifyQueueNonEmpty()
- */
- protected function doNotifyQueueNonEmpty( $wiki, $type ) {
- return true; // updated periodically
- }
-
- /**
- * @see JobQueueAggregator::doAllGetReadyWikiQueues()
- */
- protected function doGetAllReadyWikiQueues() {
- $key = $this->getReadyQueueCacheKey();
- // If the cache entry wasn't present, is stale, or in .1% of cases otherwise,
- // regenerate the cache. Use any available stale cache if another process is
- // currently regenerating the pending DB information.
- $pendingDbInfo = $this->cache->get( $key );
- if ( !is_array( $pendingDbInfo )
- || ( time() - $pendingDbInfo['timestamp'] ) > $this->cacheTTL
- || mt_rand( 0, 999 ) == 0
- ) {
- if ( $this->cache->add( "$key:rebuild", 1, 1800 ) ) { // lock
- $pendingDbInfo = array(
- 'pendingDBs' => $this->findPendingWikiQueues(),
- 'timestamp' => time()
- );
- for ( $attempts = 1; $attempts <= 25; ++$attempts ) {
- if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock
- $this->cache->set( $key, $pendingDbInfo );
- $this->cache->delete( "$key:lock" ); // unlock
- break;
- }
- }
- $this->cache->delete( "$key:rebuild" ); // unlock
- }
- }
-
- return is_array( $pendingDbInfo )
- ? $pendingDbInfo['pendingDBs']
- : array(); // cache is both empty and locked
- }
-
- /**
- * @see JobQueueAggregator::doPurge()
- */
- protected function doPurge() {
- return $this->cache->delete( $this->getReadyQueueCacheKey() );
- }
-
- /**
- * @return string
- */
- private function getReadyQueueCacheKey() {
- return "jobqueue:aggregator:ready-queues:v1"; // global
- }
-}
diff --git a/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php b/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php
index db9e764c..847dd6f4 100644
--- a/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php
+++ b/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php
@@ -44,7 +44,7 @@ class JobQueueAggregatorRedis extends JobQueueAggregator {
* If a hostname is specified but no port, the standard port number
* 6379 will be used. Required.
*/
- protected function __construct( array $params ) {
+ public function __construct( array $params ) {
parent::__construct( $params );
$this->servers = isset( $params['redisServers'] )
? $params['redisServers']
diff --git a/includes/jobqueue/jobs/AssembleUploadChunksJob.php b/includes/jobqueue/jobs/AssembleUploadChunksJob.php
index 9e9bda6f..b7f09e77 100644
--- a/includes/jobqueue/jobs/AssembleUploadChunksJob.php
+++ b/includes/jobqueue/jobs/AssembleUploadChunksJob.php
@@ -35,26 +35,16 @@ class AssembleUploadChunksJob extends Job {
public function run() {
$scope = RequestContext::importScopedSession( $this->params['session'] );
$context = RequestContext::getMain();
+ $user = $context->getUser();
try {
- $user = $context->getUser();
if ( !$user->isLoggedIn() ) {
$this->setLastError( "Could not load the author user from session." );
return false;
}
- if ( count( $_SESSION ) === 0 ) {
- // Empty session probably indicates that we didn't associate
- // with the session correctly. Note that being able to load
- // the user does not necessarily mean the session was loaded.
- // Most likely cause by suhosin.session.encrypt = On.
- $this->setLastError( "Error associating with user session. " .
- "Try setting suhosin.session.encrypt = Off" );
-
- return false;
- }
-
UploadBase::setSessionStatus(
+ $user,
$this->params['filekey'],
array( 'result' => 'Poll', 'stage' => 'assembling', 'status' => Status::newGood() )
);
@@ -70,6 +60,7 @@ class AssembleUploadChunksJob extends Job {
$status = $upload->concatenateChunks();
if ( !$status->isGood() ) {
UploadBase::setSessionStatus(
+ $user,
$this->params['filekey'],
array( 'result' => 'Failure', 'stage' => 'assembling', 'status' => $status )
);
@@ -93,6 +84,7 @@ class AssembleUploadChunksJob extends Job {
// Cache the info so the user doesn't have to wait forever to get the final info
UploadBase::setSessionStatus(
+ $user,
$this->params['filekey'],
array(
'result' => 'Success',
@@ -102,8 +94,9 @@ class AssembleUploadChunksJob extends Job {
'status' => Status::newGood()
)
);
- } catch ( MWException $e ) {
+ } catch ( Exception $e ) {
UploadBase::setSessionStatus(
+ $user,
$this->params['filekey'],
array(
'result' => 'Failure',
diff --git a/includes/jobqueue/jobs/DuplicateJob.php b/includes/jobqueue/jobs/DuplicateJob.php
index 1fa6cefe..c5e3a234 100644
--- a/includes/jobqueue/jobs/DuplicateJob.php
+++ b/includes/jobqueue/jobs/DuplicateJob.php
@@ -18,7 +18,7 @@
* http://www.gnu.org/copyleft/gpl.html
*
* @file
- * @ingroup Cache
+ * @ingroup JobQueue
*/
/**
diff --git a/includes/jobqueue/jobs/EnqueueJob.php b/includes/jobqueue/jobs/EnqueueJob.php
new file mode 100644
index 00000000..46fb2aa7
--- /dev/null
+++ b/includes/jobqueue/jobs/EnqueueJob.php
@@ -0,0 +1,88 @@
+<?php
+/**
+ * Router job that takes jobs and enqueues them.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+
+/**
+ * Router job that takes jobs and enqueues them to their proper queues
+ *
+ * This can be used for several things:
+ * - a) Making multi-job enqueues more robust by atomically enqueueing
+ * a single job that pushes the actual jobs (with retry logic)
+ * - b) Masking the latency of pushing jobs to different queues/wikis
+ * - c) Low-latency enqueues to push jobs from warm to hot datacenters
+ *
+ * @ingroup JobQueue
+ * @since 1.25
+ */
+final class EnqueueJob extends Job {
+ /**
+ * Callers should use the factory methods instead
+ *
+ * @param Title $title
+ * @param array $params Job parameters
+ */
+ function __construct( $title, $params ) {
+ parent::__construct( 'enqueue', $title, $params );
+ }
+
+ /**
+ * @param Job|JobSpecification|array $jobs
+ * @return JobRouteJob
+ */
+ public static function newFromLocalJobs( $jobs ) {
+ $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
+
+ return self::newFromJobsByWiki( array( wfWikiID() => $jobs ) );
+ }
+
+ /**
+ * @param array $jobsByWiki Map of (wiki => JobSpecification list)
+ * @return JobRouteJob
+ */
+ public static function newFromJobsByWiki( array $jobsByWiki ) {
+ $jobMapsByWiki = array();
+ foreach ( $jobsByWiki as $wiki => $jobs ) {
+ $jobMapsByWiki[$wiki] = array();
+ foreach ( $jobs as $job ) {
+ if ( $job instanceof JobSpecification ) {
+ $jobMapsByWiki[$wiki][] = $job->toSerializableArray();
+ } else {
+ throw new InvalidArgumentException( "Jobs must be of type JobSpecification." );
+ }
+ }
+ }
+
+ return new self( Title::newMainPage(), array( 'jobsByWiki' => $jobMapsByWiki ) );
+ }
+
+ public function run() {
+ foreach ( $this->params['jobsByWiki'] as $wiki => $jobMaps ) {
+ $jobSpecs = array();
+ foreach ( $jobMaps as $jobMap ) {
+ $jobSpecs[] = JobSpecification::newFromArray( $jobMap );
+ }
+ JobQueueGroup::singleton( $wiki )->push( $jobSpecs );
+ }
+
+ return true;
+ }
+}
diff --git a/includes/jobqueue/jobs/HTMLCacheUpdateJob.php b/includes/jobqueue/jobs/HTMLCacheUpdateJob.php
index 4d1e72c9..e5e521c3 100644
--- a/includes/jobqueue/jobs/HTMLCacheUpdateJob.php
+++ b/includes/jobqueue/jobs/HTMLCacheUpdateJob.php
@@ -18,6 +18,7 @@
* http://www.gnu.org/copyleft/gpl.html
*
* @file
+ * @ingroup JobQueue
* @ingroup Cache
*/
@@ -26,9 +27,9 @@
*
* This job comes in a few variants:
* - a) Recursive jobs to purge caches for backlink pages for a given title.
- * These jobs have have (recursive:true,table:<table>) set.
+ * These jobs have (recursive:true,table:<table>) set.
* - b) Jobs to purge caches for a set of titles (the job title is ignored).
- * These jobs have have (pages:(<page ID>:(<namespace>,<title>),...) set.
+ * These jobs have (pages:(<page ID>:(<namespace>,<title>),...) set.
*
* @ingroup JobQueue
*/
@@ -42,17 +43,8 @@ class HTMLCacheUpdateJob extends Job {
function run() {
global $wgUpdateRowsPerJob, $wgUpdateRowsPerQuery;
- static $expected = array( 'recursive', 'pages' ); // new jobs have one of these
-
- $oldRangeJob = false;
- if ( !array_intersect( array_keys( $this->params ), $expected ) ) {
- // B/C for older job params formats that lack these fields:
- // a) base jobs with just ("table") and b) range jobs with ("table","start","end")
- if ( isset( $this->params['start'] ) && isset( $this->params['end'] ) ) {
- $oldRangeJob = true;
- } else {
- $this->params['recursive'] = true; // base job
- }
+ if ( isset( $this->params['table'] ) && !isset( $this->params['pages'] ) ) {
+ $this->params['recursive'] = true; // b/c; base job
}
// Job to purge all (or a range of) backlink pages for a page
@@ -67,29 +59,15 @@ class HTMLCacheUpdateJob extends Job {
array( 'params' => $this->getRootJobParams() )
);
JobQueueGroup::singleton()->push( $jobs );
- // Job to purge pages for for a set of titles
+ // Job to purge pages for a set of titles
} elseif ( isset( $this->params['pages'] ) ) {
$this->invalidateTitles( $this->params['pages'] );
- // B/C for job to purge a range of backlink pages for a given page
- } elseif ( $oldRangeJob ) {
- $titleArray = $this->title->getBacklinkCache()->getLinks(
- $this->params['table'], $this->params['start'], $this->params['end'] );
-
- $pages = array(); // same format BacklinkJobUtils uses
- foreach ( $titleArray as $tl ) {
- $pages[$tl->getArticleId()] = array( $tl->getNamespace(), $tl->getDbKey() );
- }
-
- $jobs = array();
- foreach ( array_chunk( $pages, $wgUpdateRowsPerJob ) as $pageChunk ) {
- $jobs[] = new HTMLCacheUpdateJob( $this->title,
- array(
- 'table' => $this->params['table'],
- 'pages' => $pageChunk
- ) + $this->getRootJobParams() // carry over information for de-duplication
- );
- }
- JobQueueGroup::singleton()->push( $jobs );
+ // Job to update a single title
+ } else {
+ $t = $this->title;
+ $this->invalidateTitles( array(
+ $t->getArticleID() => array( $t->getNamespace(), $t->getDBkey() )
+ ) );
}
return true;
diff --git a/includes/jobqueue/jobs/NullJob.php b/includes/jobqueue/jobs/NullJob.php
index 66291e9d..f94d6ebc 100644
--- a/includes/jobqueue/jobs/NullJob.php
+++ b/includes/jobqueue/jobs/NullJob.php
@@ -18,7 +18,7 @@
* http://www.gnu.org/copyleft/gpl.html
*
* @file
- * @ingroup Cache
+ * @ingroup JobQueue
*/
/**
diff --git a/includes/jobqueue/jobs/PublishStashedFileJob.php b/includes/jobqueue/jobs/PublishStashedFileJob.php
index 918a392d..a922dd3d 100644
--- a/includes/jobqueue/jobs/PublishStashedFileJob.php
+++ b/includes/jobqueue/jobs/PublishStashedFileJob.php
@@ -19,12 +19,14 @@
*
* @file
* @ingroup Upload
+ * @ingroup JobQueue
*/
/**
* Upload a file from the upload stash into the local file repo.
*
* @ingroup Upload
+ * @ingroup JobQueue
*/
class PublishStashedFileJob extends Job {
public function __construct( $title, $params ) {
@@ -35,26 +37,16 @@ class PublishStashedFileJob extends Job {
public function run() {
$scope = RequestContext::importScopedSession( $this->params['session'] );
$context = RequestContext::getMain();
+ $user = $context->getUser();
try {
- $user = $context->getUser();
if ( !$user->isLoggedIn() ) {
$this->setLastError( "Could not load the author user from session." );
return false;
}
- if ( count( $_SESSION ) === 0 ) {
- // Empty session probably indicates that we didn't associate
- // with the session correctly. Note that being able to load
- // the user does not necessarily mean the session was loaded.
- // Most likely cause by suhosin.session.encrypt = On.
- $this->setLastError( "Error associating with user session. " .
- "Try setting suhosin.session.encrypt = Off" );
-
- return false;
- }
-
UploadBase::setSessionStatus(
+ $user,
$this->params['filekey'],
array( 'result' => 'Poll', 'stage' => 'publish', 'status' => Status::newGood() )
);
@@ -72,6 +64,7 @@ class PublishStashedFileJob extends Job {
$status = Status::newFatal( 'verification-error' );
$status->value = array( 'verification' => $verification );
UploadBase::setSessionStatus(
+ $user,
$this->params['filekey'],
array( 'result' => 'Failure', 'stage' => 'publish', 'status' => $status )
);
@@ -89,6 +82,7 @@ class PublishStashedFileJob extends Job {
);
if ( !$status->isGood() ) {
UploadBase::setSessionStatus(
+ $user,
$this->params['filekey'],
array( 'result' => 'Failure', 'stage' => 'publish', 'status' => $status )
);
@@ -106,6 +100,7 @@ class PublishStashedFileJob extends Job {
// Cache the info so the user doesn't have to wait forever to get the final info
UploadBase::setSessionStatus(
+ $user,
$this->params['filekey'],
array(
'result' => 'Success',
@@ -115,8 +110,9 @@ class PublishStashedFileJob extends Job {
'status' => Status::newGood()
)
);
- } catch ( MWException $e ) {
+ } catch ( Exception $e ) {
UploadBase::setSessionStatus(
+ $user,
$this->params['filekey'],
array(
'result' => 'Failure',
diff --git a/includes/jobqueue/jobs/RecentChangesUpdateJob.php b/includes/jobqueue/jobs/RecentChangesUpdateJob.php
new file mode 100644
index 00000000..cc04595d
--- /dev/null
+++ b/includes/jobqueue/jobs/RecentChangesUpdateJob.php
@@ -0,0 +1,223 @@
+<?php
+/**
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ * @ingroup JobQueue
+ */
+
+/**
+ * Job for pruning recent changes
+ *
+ * @ingroup JobQueue
+ * @since 1.25
+ */
+class RecentChangesUpdateJob extends Job {
+ function __construct( $title, $params ) {
+ parent::__construct( 'recentChangesUpdate', $title, $params );
+
+ if ( !isset( $params['type'] ) ) {
+ throw new Exception( "Missing 'type' parameter." );
+ }
+
+ $this->removeDuplicates = true;
+ }
+
+ /**
+ * @return RecentChangesUpdateJob
+ */
+ final public static function newPurgeJob() {
+ return new self(
+ SpecialPage::getTitleFor( 'Recentchanges' ), array( 'type' => 'purge' )
+ );
+ }
+
+ /**
+ * @return RecentChangesUpdateJob
+ * @since 1.26
+ */
+ final public static function newCacheUpdateJob() {
+ return new self(
+ SpecialPage::getTitleFor( 'Recentchanges' ), array( 'type' => 'cacheUpdate' )
+ );
+ }
+
+ public function run() {
+ if ( $this->params['type'] === 'purge' ) {
+ $this->purgeExpiredRows();
+ } elseif ( $this->params['type'] === 'cacheUpdate' ) {
+ $this->updateActiveUsers();
+ } else {
+ throw new InvalidArgumentException(
+ "Invalid 'type' parameter '{$this->params['type']}'." );
+ }
+
+ return true;
+ }
+
+ protected function purgeExpiredRows() {
+ global $wgRCMaxAge;
+
+ $lockKey = wfWikiID() . ':recentchanges-prune';
+
+ $dbw = wfGetDB( DB_MASTER );
+ if ( !$dbw->lock( $lockKey, __METHOD__, 1 ) ) {
+ return; // already in progress
+ }
+ $batchSize = 100; // Avoid slave lag
+
+ $cutoff = $dbw->timestamp( time() - $wgRCMaxAge );
+ do {
+ $rcIds = $dbw->selectFieldValues( 'recentchanges',
+ 'rc_id',
+ array( 'rc_timestamp < ' . $dbw->addQuotes( $cutoff ) ),
+ __METHOD__,
+ array( 'LIMIT' => $batchSize )
+ );
+ if ( $rcIds ) {
+ $dbw->delete( 'recentchanges', array( 'rc_id' => $rcIds ), __METHOD__ );
+ }
+ // Commit in chunks to avoid slave lag
+ $dbw->commit( __METHOD__, 'flush' );
+
+ if ( count( $rcIds ) === $batchSize ) {
+ // There might be more, so try waiting for slaves
+ if ( !wfWaitForSlaves( null, false, false, /* $timeout = */ 3 ) ) {
+ // Another job will continue anyway
+ break;
+ }
+ }
+ } while ( $rcIds );
+
+ $dbw->unlock( $lockKey, __METHOD__ );
+ }
+
+ protected function updateActiveUsers() {
+ global $wgActiveUserDays;
+
+ // Users that made edits at least this many days ago are "active"
+ $days = $wgActiveUserDays;
+ // Pull in the full window of active users in this update
+ $window = $wgActiveUserDays * 86400;
+
+ $dbw = wfGetDB( DB_MASTER );
+ // JobRunner uses DBO_TRX, but doesn't call begin/commit itself;
+ // onTransactionIdle() will run immediately since there is no trx.
+ $dbw->onTransactionIdle( function() use ( $dbw, $days, $window ) {
+ // Avoid disconnect/ping() cycle that makes locks fall off
+ $dbw->setSessionOptions( array( 'connTimeout' => 900 ) );
+
+ $lockKey = wfWikiID() . '-activeusers';
+ if ( !$dbw->lock( $lockKey, __METHOD__, 1 ) ) {
+ return false; // exclusive update (avoids duplicate entries)
+ }
+
+ $nowUnix = time();
+ // Get the last-updated timestamp for the cache
+ $cTime = $dbw->selectField( 'querycache_info',
+ 'qci_timestamp',
+ array( 'qci_type' => 'activeusers' )
+ );
+ $cTimeUnix = $cTime ? wfTimestamp( TS_UNIX, $cTime ) : 1;
+
+ // Pick the date range to fetch from. This is normally from the last
+ // update to till the present time, but has a limited window for sanity.
+ // If the window is limited, multiple runs are need to fully populate it.
+ $sTimestamp = max( $cTimeUnix, $nowUnix - $days * 86400 );
+ $eTimestamp = min( $sTimestamp + $window, $nowUnix );
+
+ // Get all the users active since the last update
+ $res = $dbw->select(
+ array( 'recentchanges' ),
+ array( 'rc_user_text', 'lastedittime' => 'MAX(rc_timestamp)' ),
+ array(
+ 'rc_user > 0', // actual accounts
+ 'rc_type != ' . $dbw->addQuotes( RC_EXTERNAL ), // no wikidata
+ 'rc_log_type IS NULL OR rc_log_type != ' . $dbw->addQuotes( 'newusers' ),
+ 'rc_timestamp >= ' . $dbw->addQuotes( $dbw->timestamp( $sTimestamp ) ),
+ 'rc_timestamp <= ' . $dbw->addQuotes( $dbw->timestamp( $eTimestamp ) )
+ ),
+ __METHOD__,
+ array(
+ 'GROUP BY' => array( 'rc_user_text' ),
+ 'ORDER BY' => 'NULL' // avoid filesort
+ )
+ );
+ $names = array();
+ foreach ( $res as $row ) {
+ $names[$row->rc_user_text] = $row->lastedittime;
+ }
+
+ // Rotate out users that have not edited in too long (according to old data set)
+ $dbw->delete( 'querycachetwo',
+ array(
+ 'qcc_type' => 'activeusers',
+ 'qcc_value < ' . $dbw->addQuotes( $nowUnix - $days * 86400 ) // TS_UNIX
+ ),
+ __METHOD__
+ );
+
+ // Find which of the recently active users are already accounted for
+ if ( count( $names ) ) {
+ $res = $dbw->select( 'querycachetwo',
+ array( 'user_name' => 'qcc_title' ),
+ array(
+ 'qcc_type' => 'activeusers',
+ 'qcc_namespace' => NS_USER,
+ 'qcc_title' => array_keys( $names ) ),
+ __METHOD__
+ );
+ foreach ( $res as $row ) {
+ unset( $names[$row->user_name] );
+ }
+ }
+
+ // Insert the users that need to be added to the list
+ if ( count( $names ) ) {
+ $newRows = array();
+ foreach ( $names as $name => $lastEditTime ) {
+ $newRows[] = array(
+ 'qcc_type' => 'activeusers',
+ 'qcc_namespace' => NS_USER,
+ 'qcc_title' => $name,
+ 'qcc_value' => wfTimestamp( TS_UNIX, $lastEditTime ),
+ 'qcc_namespacetwo' => 0, // unused
+ 'qcc_titletwo' => '' // unused
+ );
+ }
+ foreach ( array_chunk( $newRows, 500 ) as $rowBatch ) {
+ $dbw->insert( 'querycachetwo', $rowBatch, __METHOD__ );
+ wfWaitForSlaves();
+ }
+ }
+
+ // If a transaction was already started, it might have an old
+ // snapshot, so kludge the timestamp range back as needed.
+ $asOfTimestamp = min( $eTimestamp, (int)$dbw->trxTimestamp() );
+
+ // Touch the data freshness timestamp
+ $dbw->replace( 'querycache_info',
+ array( 'qci_type' ),
+ array( 'qci_type' => 'activeusers',
+ 'qci_timestamp' => $dbw->timestamp( $asOfTimestamp ) ), // not always $now
+ __METHOD__
+ );
+
+ $dbw->unlock( $lockKey, __METHOD__ );
+ } );
+ }
+}
diff --git a/includes/jobqueue/jobs/RefreshLinksJob.php b/includes/jobqueue/jobs/RefreshLinksJob.php
index f82af273..1252b0b5 100644
--- a/includes/jobqueue/jobs/RefreshLinksJob.php
+++ b/includes/jobqueue/jobs/RefreshLinksJob.php
@@ -26,9 +26,9 @@
*
* This job comes in a few variants:
* - a) Recursive jobs to update links for backlink pages for a given title.
- * These jobs have have (recursive:true,table:<table>) set.
+ * These jobs have (recursive:true,table:<table>) set.
* - b) Jobs to update links for a set of pages (the job title is ignored).
- * These jobs have have (pages:(<page ID>:(<namespace>,<title>),...) set.
+ * These jobs have (pages:(<page ID>:(<namespace>,<title>),...) set.
* - c) Jobs to update links for a single page (the job title)
* These jobs need no extra fields set.
*
@@ -39,6 +39,10 @@ class RefreshLinksJob extends Job {
function __construct( $title, $params = '' ) {
parent::__construct( 'refreshLinks', $title, $params );
+ // A separate type is used just for cascade-protected backlinks
+ if ( !empty( $this->params['prioritize'] ) ) {
+ $this->command .= 'Prioritized';
+ }
// Base backlink update jobs and per-title update jobs can be de-duplicated.
// If template A changes twice before any jobs run, a clean queue will have:
// (A base, A base)
@@ -86,7 +90,7 @@ class RefreshLinksJob extends Job {
array( 'params' => $extraParams )
);
JobQueueGroup::singleton()->push( $jobs );
- // Job to update link tables for for a set of titles
+ // Job to update link tables for a set of titles
} elseif ( isset( $this->params['pages'] ) ) {
foreach ( $this->params['pages'] as $pageId => $nsAndKey ) {
list( $ns, $dbKey ) = $nsAndKey;
@@ -100,6 +104,10 @@ class RefreshLinksJob extends Job {
return true;
}
+ /**
+ * @param Title $title
+ * @return bool
+ */
protected function runForTitle( Title $title = null ) {
$linkCache = LinkCache::singleton();
$linkCache->clear();
@@ -157,7 +165,7 @@ class RefreshLinksJob extends Job {
$ellapsed = microtime( true ) - $start;
// If it took a long time to render, then save this back to the cache to avoid
// wasted CPU by other apaches or job runners. We don't want to always save to
- // cache as this cause cause high cache I/O and LRU churn when a template changes.
+ // cache as this can cause high cache I/O and LRU churn when a template changes.
if ( $ellapsed >= self::PARSE_THRESHOLD_SEC
&& $page->isParserCacheUsed( $parserOptions, $revision->getId() )
&& $parserOutput->isCacheable()
diff --git a/includes/jobqueue/jobs/RefreshLinksJob2.php b/includes/jobqueue/jobs/RefreshLinksJob2.php
deleted file mode 100644
index 97405aeb..00000000
--- a/includes/jobqueue/jobs/RefreshLinksJob2.php
+++ /dev/null
@@ -1,141 +0,0 @@
-<?php
-/**
- * Job to update links for a given title.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- * @ingroup JobQueue
- */
-
-/**
- * Background job to update links for titles in certain backlink range by page ID.
- * Newer version for high use templates. This is deprecated by RefreshLinksPartitionJob.
- *
- * @ingroup JobQueue
- * @deprecated since 1.23
- */
-class RefreshLinksJob2 extends Job {
- function __construct( $title, $params ) {
- parent::__construct( 'refreshLinks2', $title, $params );
- // Base jobs for large templates can easily be de-duplicated
- $this->removeDuplicates = !isset( $params['start'] ) && !isset( $params['end'] );
- }
-
- /**
- * Run a refreshLinks2 job
- * @return bool Success
- */
- function run() {
- global $wgUpdateRowsPerJob;
-
- $linkCache = LinkCache::singleton();
- $linkCache->clear();
-
- if ( is_null( $this->title ) ) {
- $this->error = "refreshLinks2: Invalid title";
- return false;
- }
-
- // Back compat for pre-r94435 jobs
- $table = isset( $this->params['table'] ) ? $this->params['table'] : 'templatelinks';
-
- // Avoid slave lag when fetching templates.
- // When the outermost job is run, we know that the caller that enqueued it must have
- // committed the relevant changes to the DB by now. At that point, record the master
- // position and pass it along as the job recursively breaks into smaller range jobs.
- // Hopefully, when leaf jobs are popped, the slaves will have reached that position.
- if ( isset( $this->params['masterPos'] ) ) {
- $masterPos = $this->params['masterPos'];
- } elseif ( wfGetLB()->getServerCount() > 1 ) {
- $masterPos = wfGetLB()->getMasterPos();
- } else {
- $masterPos = false;
- }
-
- $tbc = $this->title->getBacklinkCache();
-
- $jobs = array(); // jobs to insert
- if ( isset( $this->params['start'] ) && isset( $this->params['end'] ) ) {
- # This is a partition job to trigger the insertion of leaf jobs...
- $jobs = array_merge( $jobs, $this->getSingleTitleJobs( $table, $masterPos ) );
- } else {
- # This is a base job to trigger the insertion of partitioned jobs...
- if ( $tbc->getNumLinks( $table, $wgUpdateRowsPerJob + 1 ) <= $wgUpdateRowsPerJob ) {
- # Just directly insert the single per-title jobs
- $jobs = array_merge( $jobs, $this->getSingleTitleJobs( $table, $masterPos ) );
- } else {
- # Insert the partition jobs to make per-title jobs
- foreach ( $tbc->partition( $table, $wgUpdateRowsPerJob ) as $batch ) {
- list( $start, $end ) = $batch;
- $jobs[] = new RefreshLinksJob2( $this->title,
- array(
- 'table' => $table,
- 'start' => $start,
- 'end' => $end,
- 'masterPos' => $masterPos,
- ) + $this->getRootJobParams() // carry over information for de-duplication
- );
- }
- }
- }
-
- if ( count( $jobs ) ) {
- JobQueueGroup::singleton()->push( $jobs );
- }
-
- return true;
- }
-
- /**
- * @param string $table
- * @param mixed $masterPos
- * @return array
- */
- protected function getSingleTitleJobs( $table, $masterPos ) {
- # The "start"/"end" fields are not set for the base jobs
- $start = isset( $this->params['start'] ) ? $this->params['start'] : false;
- $end = isset( $this->params['end'] ) ? $this->params['end'] : false;
- $titles = $this->title->getBacklinkCache()->getLinks( $table, $start, $end );
- # Convert into single page refresh links jobs.
- # This handles well when in sapi mode and is useful in any case for job
- # de-duplication. If many pages use template A, and that template itself
- # uses template B, then an edit to both will create many duplicate jobs.
- # Roughly speaking, for each page, one of the "RefreshLinksJob" jobs will
- # get run first, and when it does, it will remove the duplicates. Of course,
- # one page could have its job popped when the other page's job is still
- # buried within the logic of a refreshLinks2 job.
- $jobs = array();
- foreach ( $titles as $title ) {
- $jobs[] = new RefreshLinksJob( $title,
- array( 'masterPos' => $masterPos ) + $this->getRootJobParams()
- ); // carry over information for de-duplication
- }
- return $jobs;
- }
-
- /**
- * @return array
- */
- public function getDeduplicationInfo() {
- $info = parent::getDeduplicationInfo();
- // Don't let highly unique "masterPos" values ruin duplicate detection
- if ( is_array( $info['params'] ) ) {
- unset( $info['params']['masterPos'] );
- }
- return $info;
- }
-}
diff --git a/includes/jobqueue/jobs/ThumbnailRenderJob.php b/includes/jobqueue/jobs/ThumbnailRenderJob.php
new file mode 100644
index 00000000..ab381388
--- /dev/null
+++ b/includes/jobqueue/jobs/ThumbnailRenderJob.php
@@ -0,0 +1,109 @@
+<?php
+/**
+ * Job for asynchronous rendering of thumbnails.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+
+/**
+ * Job for asynchronous rendering of thumbnails.
+ *
+ * @ingroup JobQueue
+ */
+class ThumbnailRenderJob extends Job {
+ public function __construct( $title, $params ) {
+ parent::__construct( 'ThumbnailRender', $title, $params );
+ }
+
+ public function run() {
+ global $wgUploadThumbnailRenderMethod;
+
+ $transformParams = $this->params['transformParams'];
+
+ $file = wfLocalFile( $this->title );
+ $file->load( File::READ_LATEST );
+
+ if ( $file && $file->exists() ) {
+ if ( $wgUploadThumbnailRenderMethod === 'jobqueue' ) {
+ $thumb = $file->transform( $transformParams, File::RENDER_NOW );
+
+ if ( $thumb && !$thumb->isError() ) {
+ return true;
+ } else {
+ $this->setLastError( __METHOD__ . ': thumbnail couln\'t be generated' );
+ return false;
+ }
+ } elseif ( $wgUploadThumbnailRenderMethod === 'http' ) {
+ $status = $this->hitThumbUrl( $file, $transformParams );
+
+ wfDebug( __METHOD__ . ": received status {$status}\n" );
+
+ if ( $status === 200 || $status === 301 || $status === 302 ) {
+ return true;
+ } elseif ( $status ) {
+ // Note that this currently happens (500) when requesting sizes larger then or
+ // equal to the original, which is harmless.
+ $this->setLastError( __METHOD__ . ': incorrect HTTP status ' . $status );
+ return false;
+ } else {
+ $this->setLastError( __METHOD__ . ': HTTP request failure' );
+ return false;
+ }
+ } else {
+ $this->setLastError( __METHOD__ . ': unknown thumbnail render method ' . $wgUploadThumbnailRenderMethod );
+ return false;
+ }
+ } else {
+ $this->setLastError( __METHOD__ . ': file doesn\'t exist' );
+ return false;
+ }
+ }
+
+ protected function hitThumbUrl( $file, $transformParams ) {
+ global $wgUploadThumbnailRenderHttpCustomHost, $wgUploadThumbnailRenderHttpCustomDomain;
+
+ $thumbName = $file->thumbName( $transformParams );
+ $thumbUrl = $file->getThumbUrl( $thumbName );
+
+ if ( $wgUploadThumbnailRenderHttpCustomDomain ) {
+ $parsedUrl = wfParseUrl( $thumbUrl );
+
+ if ( !$parsedUrl || !isset( $parsedUrl['path'] ) || !strlen( $parsedUrl['path'] ) ) {
+ return false;
+ }
+
+ $thumbUrl = '//' . $wgUploadThumbnailRenderHttpCustomDomain . $parsedUrl['path'];
+ }
+
+ wfDebug( __METHOD__ . ": hitting url {$thumbUrl}\n" );
+
+ $request = MWHttpRequest::factory( $thumbUrl,
+ array( 'method' => 'HEAD', 'followRedirects' => true ),
+ __METHOD__
+ );
+
+ if ( $wgUploadThumbnailRenderHttpCustomHost ) {
+ $request->setHeader( 'Host', $wgUploadThumbnailRenderHttpCustomHost );
+ }
+
+ $status = $request->execute();
+
+ return $request->getStatus();
+ }
+}
diff --git a/includes/jobqueue/jobs/UploadFromUrlJob.php b/includes/jobqueue/jobs/UploadFromUrlJob.php
index a09db15a..d15fd025 100644
--- a/includes/jobqueue/jobs/UploadFromUrlJob.php
+++ b/includes/jobqueue/jobs/UploadFromUrlJob.php
@@ -81,7 +81,7 @@ class UploadFromUrlJob extends Job {
if ( $warnings ) {
# Stash the upload
- $key = $this->upload->stashFile();
+ $key = $this->upload->stashFile( $this->user );
// @todo FIXME: This has been broken for a while.
// User::leaveUserMessage() does not exist.