summaryrefslogtreecommitdiff
path: root/includes/job
diff options
context:
space:
mode:
Diffstat (limited to 'includes/job')
-rw-r--r--includes/job/Job.php416
-rw-r--r--includes/job/JobQueue.php435
-rw-r--r--includes/job/JobQueueAggregator.php139
-rw-r--r--includes/job/JobQueueAggregatorMemc.php117
-rw-r--r--includes/job/JobQueueAggregatorRedis.php165
-rw-r--r--includes/job/JobQueueDB.php716
-rw-r--r--includes/job/JobQueueGroup.php351
-rw-r--r--includes/job/README81
-rw-r--r--includes/job/RefreshLinksJob.php202
-rw-r--r--includes/job/jobs/AssembleUploadChunksJob.php118
-rw-r--r--includes/job/jobs/DoubleRedirectJob.php (renamed from includes/job/DoubleRedirectJob.php)65
-rw-r--r--includes/job/jobs/DuplicateJob.php59
-rw-r--r--includes/job/jobs/EmaillingJob.php (renamed from includes/job/EmaillingJob.php)5
-rw-r--r--includes/job/jobs/EnotifNotifyJob.php (renamed from includes/job/EnotifNotifyJob.php)3
-rw-r--r--includes/job/jobs/HTMLCacheUpdateJob.php254
-rw-r--r--includes/job/jobs/NullJob.php60
-rw-r--r--includes/job/jobs/PublishStashedFileJob.php130
-rw-r--r--includes/job/jobs/RefreshLinksJob.php226
-rw-r--r--includes/job/jobs/UploadFromUrlJob.php (renamed from includes/job/UploadFromUrlJob.php)4
19 files changed, 3025 insertions, 521 deletions
diff --git a/includes/job/Job.php b/includes/job/Job.php
index d777a5d4..bcf582e7 100644
--- a/includes/job/Job.php
+++ b/includes/job/Job.php
@@ -23,11 +23,11 @@
/**
* Class to both describe a background job and handle jobs.
+ * The queue aspects of this class are now deprecated.
*
* @ingroup JobQueue
*/
abstract class Job {
-
/**
* @var Title
*/
@@ -39,6 +39,9 @@ abstract class Job {
$removeDuplicates,
$error;
+ /** @var Array Additional queue metadata */
+ public $metadata = array();
+
/*-------------------------------------------------------------------------
* Abstract functions
*------------------------------------------------------------------------*/
@@ -47,176 +50,23 @@ abstract class Job {
* Run the job
* @return boolean success
*/
- abstract function run();
+ abstract public function run();
/*-------------------------------------------------------------------------
* Static functions
*------------------------------------------------------------------------*/
/**
- * Pop a job of a certain type. This tries less hard than pop() to
- * actually find a job; it may be adversely affected by concurrent job
- * runners.
- *
- * @param $type string
- *
- * @return Job
- */
- static function pop_type( $type ) {
- wfProfilein( __METHOD__ );
-
- $dbw = wfGetDB( DB_MASTER );
-
- $dbw->begin( __METHOD__ );
-
- $row = $dbw->selectRow(
- 'job',
- '*',
- array( 'job_cmd' => $type ),
- __METHOD__,
- array( 'LIMIT' => 1, 'FOR UPDATE' )
- );
-
- if ( $row === false ) {
- $dbw->commit( __METHOD__ );
- wfProfileOut( __METHOD__ );
- return false;
- }
-
- /* Ensure we "own" this row */
- $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
- $affected = $dbw->affectedRows();
- $dbw->commit( __METHOD__ );
-
- if ( $affected == 0 ) {
- wfProfileOut( __METHOD__ );
- return false;
- }
-
- wfIncrStats( 'job-pop' );
- $namespace = $row->job_namespace;
- $dbkey = $row->job_title;
- $title = Title::makeTitleSafe( $namespace, $dbkey );
- $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ),
- $row->job_id );
-
- $job->removeDuplicates();
-
- wfProfileOut( __METHOD__ );
- return $job;
- }
-
- /**
- * Pop a job off the front of the queue
- *
- * @param $offset Integer: Number of jobs to skip
- * @return Job or false if there's no jobs
- */
- static function pop( $offset = 0 ) {
- wfProfileIn( __METHOD__ );
-
- $dbr = wfGetDB( DB_SLAVE );
-
- /* Get a job from the slave, start with an offset,
- scan full set afterwards, avoid hitting purged rows
-
- NB: If random fetch previously was used, offset
- will always be ahead of few entries
- */
-
- $conditions = self::defaultQueueConditions();
-
- $offset = intval( $offset );
- $options = array( 'ORDER BY' => 'job_id', 'USE INDEX' => 'PRIMARY' );
-
- $row = $dbr->selectRow( 'job', '*',
- array_merge( $conditions, array( "job_id >= $offset" ) ),
- __METHOD__,
- $options
- );
-
- // Refetching without offset is needed as some of job IDs could have had delayed commits
- // and have lower IDs than jobs already executed, blame concurrency :)
- //
- if ( $row === false ) {
- if ( $offset != 0 ) {
- $row = $dbr->selectRow( 'job', '*', $conditions, __METHOD__, $options );
- }
-
- if ( $row === false ) {
- wfProfileOut( __METHOD__ );
- return false;
- }
- }
-
- // Try to delete it from the master
- $dbw = wfGetDB( DB_MASTER );
- $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
- $affected = $dbw->affectedRows();
-
- if ( !$affected ) {
- // Failed, someone else beat us to it
- // Try getting a random row
- $row = $dbw->selectRow( 'job', array( 'minjob' => 'MIN(job_id)',
- 'maxjob' => 'MAX(job_id)' ), '1=1', __METHOD__ );
- if ( $row === false || is_null( $row->minjob ) || is_null( $row->maxjob ) ) {
- // No jobs to get
- wfProfileOut( __METHOD__ );
- return false;
- }
- // Get the random row
- $row = $dbw->selectRow( 'job', '*',
- 'job_id >= ' . mt_rand( $row->minjob, $row->maxjob ), __METHOD__ );
- if ( $row === false ) {
- // Random job gone before we got the chance to select it
- // Give up
- wfProfileOut( __METHOD__ );
- return false;
- }
- // Delete the random row
- $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
- $affected = $dbw->affectedRows();
-
- if ( !$affected ) {
- // Random job gone before we exclusively deleted it
- // Give up
- wfProfileOut( __METHOD__ );
- return false;
- }
- }
-
- // If execution got to here, there's a row in $row that has been deleted from the database
- // by this thread. Hence the concurrent pop was successful.
- wfIncrStats( 'job-pop' );
- $namespace = $row->job_namespace;
- $dbkey = $row->job_title;
- $title = Title::makeTitleSafe( $namespace, $dbkey );
-
- if ( is_null( $title ) ) {
- wfProfileOut( __METHOD__ );
- return false;
- }
-
- $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ), $row->job_id );
-
- // Remove any duplicates it may have later in the queue
- $job->removeDuplicates();
-
- wfProfileOut( __METHOD__ );
- return $job;
- }
-
- /**
* Create the appropriate object to handle a specific job
*
- * @param $command String: Job command
+ * @param string $command Job command
* @param $title Title: Associated title
- * @param $params Array|bool: Job parameters
- * @param $id Int: Job identifier
+ * @param array|bool $params Job parameters
+ * @param int $id Job identifier
* @throws MWException
* @return Job
*/
- static function factory( $command, Title $title, $params = false, $id = 0 ) {
+ public static function factory( $command, Title $title, $params = false, $id = 0 ) {
global $wgJobClasses;
if( isset( $wgJobClasses[$command] ) ) {
$class = $wgJobClasses[$command];
@@ -226,64 +76,18 @@ abstract class Job {
}
/**
- * @param $params
- * @return string
- */
- static function makeBlob( $params ) {
- if ( $params !== false ) {
- return serialize( $params );
- } else {
- return '';
- }
- }
-
- /**
- * @param $blob
- * @return bool|mixed
- */
- static function extractBlob( $blob ) {
- if ( (string)$blob !== '' ) {
- return unserialize( $blob );
- } else {
- return false;
- }
- }
-
- /**
* Batch-insert a group of jobs into the queue.
* This will be wrapped in a transaction with a forced commit.
*
* This may add duplicate at insert time, but they will be
* removed later on, when the first one is popped.
*
- * @param $jobs array of Job objects
+ * @param array $jobs of Job objects
+ * @return bool
+ * @deprecated 1.21
*/
- static function batchInsert( $jobs ) {
- if ( !count( $jobs ) ) {
- return;
- }
- $dbw = wfGetDB( DB_MASTER );
- $rows = array();
-
- /**
- * @var $job Job
- */
- foreach ( $jobs as $job ) {
- $rows[] = $job->insertFields();
- if ( count( $rows ) >= 50 ) {
- # Do a small transaction to avoid slave lag
- $dbw->begin( __METHOD__ );
- $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
- $dbw->commit( __METHOD__ );
- $rows = array();
- }
- }
- if ( $rows ) { // last chunk
- $dbw->begin( __METHOD__ );
- $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
- $dbw->commit( __METHOD__ );
- }
- wfIncrStats( 'job-insert', count( $jobs ) );
+ public static function batchInsert( $jobs ) {
+ return JobQueueGroup::singleton()->push( $jobs );
}
/**
@@ -293,46 +97,36 @@ abstract class Job {
* be rolled-back as part of a larger transaction. However,
* large batches of jobs can cause slave lag.
*
- * @param $jobs array of Job objects
+ * @param array $jobs of Job objects
+ * @return bool
+ * @deprecated 1.21
*/
- static function safeBatchInsert( $jobs ) {
- if ( !count( $jobs ) ) {
- return;
- }
- $dbw = wfGetDB( DB_MASTER );
- $rows = array();
- foreach ( $jobs as $job ) {
- $rows[] = $job->insertFields();
- if ( count( $rows ) >= 500 ) {
- $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
- $rows = array();
- }
- }
- if ( $rows ) { // last chunk
- $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
- }
- wfIncrStats( 'job-insert', count( $jobs ) );
+ public static function safeBatchInsert( $jobs ) {
+ return JobQueueGroup::singleton()->push( $jobs, JobQueue::QoS_Atomic );
}
-
/**
- * SQL conditions to apply on most JobQueue queries
+ * Pop a job of a certain type. This tries less hard than pop() to
+ * actually find a job; it may be adversely affected by concurrent job
+ * runners.
*
- * Whenever we exclude jobs types from the default queue, we want to make
- * sure that queries to the job queue actually ignore them.
+ * @param $type string
+ * @return Job|bool Returns false if there are no jobs
+ * @deprecated 1.21
+ */
+ public static function pop_type( $type ) {
+ return JobQueueGroup::singleton()->get( $type )->pop();
+ }
+
+ /**
+ * Pop a job off the front of the queue.
+ * This is subject to $wgJobTypesExcludedFromDefaultQueue.
*
- * @return array SQL conditions suitable for Database:: methods
+ * @return Job or false if there's no jobs
+ * @deprecated 1.21
*/
- static function defaultQueueConditions( ) {
- global $wgJobTypesExcludedFromDefaultQueue;
- $conditions = array();
- if ( count( $wgJobTypesExcludedFromDefaultQueue ) > 0 ) {
- $dbr = wfGetDB( DB_SLAVE );
- foreach ( $wgJobTypesExcludedFromDefaultQueue as $cmdType ) {
- $conditions[] = "job_cmd != " . $dbr->addQuotes( $cmdType );
- }
- }
- return $conditions;
+ public static function pop() {
+ return JobQueueGroup::singleton()->pop();
}
/*-------------------------------------------------------------------------
@@ -345,83 +139,131 @@ abstract class Job {
* @param $params array|bool
* @param $id int
*/
- function __construct( $command, $title, $params = false, $id = 0 ) {
+ public function __construct( $command, $title, $params = false, $id = 0 ) {
$this->command = $command;
$this->title = $title;
$this->params = $params;
$this->id = $id;
- // A bit of premature generalisation
- // Oh well, the whole class is premature generalisation really
- $this->removeDuplicates = true;
+ $this->removeDuplicates = false; // expensive jobs may set this to true
}
/**
- * Insert a single job into the queue.
- * @return bool true on success
+ * @return integer May be 0 for jobs stored outside the DB
*/
- function insert() {
- $fields = $this->insertFields();
+ public function getId() {
+ return $this->id;
+ }
- $dbw = wfGetDB( DB_MASTER );
+ /**
+ * @return string
+ */
+ public function getType() {
+ return $this->command;
+ }
- if ( $this->removeDuplicates ) {
- $res = $dbw->select( 'job', array( '1' ), $fields, __METHOD__ );
- if ( $dbw->numRows( $res ) ) {
- return true;
- }
- }
- wfIncrStats( 'job-insert' );
- return $dbw->insert( 'job', $fields, __METHOD__ );
+ /**
+ * @return Title
+ */
+ public function getTitle() {
+ return $this->title;
}
/**
* @return array
*/
- protected function insertFields() {
- $dbw = wfGetDB( DB_MASTER );
+ public function getParams() {
+ return $this->params;
+ }
+
+ /**
+ * @return bool Whether only one of each identical set of jobs should be run
+ */
+ public function ignoreDuplicates() {
+ return $this->removeDuplicates;
+ }
+
+ /**
+ * @return bool Whether this job can be retried on failure by job runners
+ */
+ public function allowRetries() {
+ return true;
+ }
+
+ /**
+ * Subclasses may need to override this to make duplication detection work
+ *
+ * @return Array Map of key/values
+ */
+ public function getDeduplicationInfo() {
+ $info = array(
+ 'type' => $this->getType(),
+ 'namespace' => $this->getTitle()->getNamespace(),
+ 'title' => $this->getTitle()->getDBkey(),
+ 'params' => $this->getParams()
+ );
+ // Identical jobs with different "root" jobs should count as duplicates
+ if ( is_array( $info['params'] ) ) {
+ unset( $info['params']['rootJobSignature'] );
+ unset( $info['params']['rootJobTimestamp'] );
+ }
+ return $info;
+ }
+
+ /**
+ * @param string $key A key that identifies the task
+ * @return Array
+ */
+ public static function newRootJobParams( $key ) {
return array(
- 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
- 'job_cmd' => $this->command,
- 'job_namespace' => $this->title->getNamespace(),
- 'job_title' => $this->title->getDBkey(),
- 'job_timestamp' => $dbw->timestamp(),
- 'job_params' => Job::makeBlob( $this->params )
+ 'rootJobSignature' => sha1( $key ),
+ 'rootJobTimestamp' => wfTimestampNow()
);
}
/**
- * Remove jobs in the job queue which are duplicates of this job.
- * This is deadlock-prone and so starts its own transaction.
+ * @return Array
*/
- function removeDuplicates() {
- if ( !$this->removeDuplicates ) {
- return;
- }
+ public function getRootJobParams() {
+ return array(
+ 'rootJobSignature' => isset( $this->params['rootJobSignature'] )
+ ? $this->params['rootJobSignature']
+ : null,
+ 'rootJobTimestamp' => isset( $this->params['rootJobTimestamp'] )
+ ? $this->params['rootJobTimestamp']
+ : null
+ );
+ }
- $fields = $this->insertFields();
- unset( $fields['job_id'] );
- unset( $fields['job_timestamp'] );
- $dbw = wfGetDB( DB_MASTER );
- $dbw->begin( __METHOD__ );
- $dbw->delete( 'job', $fields, __METHOD__ );
- $affected = $dbw->affectedRows();
- $dbw->commit( __METHOD__ );
- if ( $affected ) {
- wfIncrStats( 'job-dup-delete', $affected );
- }
+ /**
+ * Insert a single job into the queue.
+ * @return bool true on success
+ * @deprecated 1.21
+ */
+ public function insert() {
+ return JobQueueGroup::singleton()->push( $this );
}
/**
* @return string
*/
- function toString() {
+ public function toString() {
$paramString = '';
if ( $this->params ) {
foreach ( $this->params as $key => $value ) {
if ( $paramString != '' ) {
$paramString .= ' ';
}
+ if ( is_array( $value ) ) {
+ $value = "array(" . count( $value ) . ")";
+ } elseif ( is_object( $value ) && !method_exists( $value, '__toString' ) ) {
+ $value = "object(" . get_class( $value ) . ")";
+ }
+ $value = (string)$value;
+ if ( mb_strlen( $value ) > 1024 ) {
+ $value = "string(" . mb_strlen( $value ) . ")";
+ }
+
$paramString .= "$key=$value";
}
}
@@ -441,7 +283,7 @@ abstract class Job {
$this->error = $error;
}
- function getLastError() {
+ public function getLastError() {
return $this->error;
}
}
diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php
new file mode 100644
index 00000000..b0dd9258
--- /dev/null
+++ b/includes/job/JobQueue.php
@@ -0,0 +1,435 @@
+<?php
+/**
+ * Job queue base code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @defgroup JobQueue JobQueue
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class to handle enqueueing and running of background jobs
+ *
+ * @ingroup JobQueue
+ * @since 1.21
+ */
+abstract class JobQueue {
+ protected $wiki; // string; wiki ID
+ protected $type; // string; job type
+ protected $order; // string; job priority for pop()
+ protected $claimTTL; // integer; seconds
+ protected $maxTries; // integer; maximum number of times to try a job
+
+ const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions
+
+ /**
+ * @param $params array
+ */
+ protected function __construct( array $params ) {
+ $this->wiki = $params['wiki'];
+ $this->type = $params['type'];
+ $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0;
+ $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3;
+ if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
+ $this->order = $params['order'];
+ } else {
+ $this->order = $this->optimalOrder();
+ }
+ if ( !in_array( $this->order, $this->supportedOrders() ) ) {
+ throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
+ }
+ }
+
+ /**
+ * Get a job queue object of the specified type.
+ * $params includes:
+ * - class : What job class to use (determines job type)
+ * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki)
+ * - type : The name of the job types this queue handles
+ * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random".
+ * If "fifo" is used, the queue will effectively be FIFO. Note that
+ * job completion will not appear to be exactly FIFO if there are multiple
+ * job runners since jobs can take different times to finish once popped.
+ * If "timestamp" is used, the queue will at least be loosely ordered
+ * by timestamp, allowing for some jobs to be popped off out of order.
+ * If "random" is used, pop() will pick jobs in random order.
+ * Note that it may only be weakly random (e.g. a lottery of the oldest X).
+ * If "any" is choosen, the queue will use whatever order is the fastest.
+ * This might be useful for improving concurrency for job acquisition.
+ * - claimTTL : If supported, the queue will recycle jobs that have been popped
+ * but not acknowledged as completed after this many seconds. Recycling
+ * of jobs simple means re-inserting them into the queue. Jobs can be
+ * attempted up to three times before being discarded.
+ *
+ * Queue classes should throw an exception if they do not support the options given.
+ *
+ * @param $params array
+ * @return JobQueue
+ * @throws MWException
+ */
+ final public static function factory( array $params ) {
+ $class = $params['class'];
+ if ( !MWInit::classExists( $class ) ) {
+ throw new MWException( "Invalid job queue class '$class'." );
+ }
+ $obj = new $class( $params );
+ if ( !( $obj instanceof self ) ) {
+ throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." );
+ }
+ return $obj;
+ }
+
+ /**
+ * @return string Wiki ID
+ */
+ final public function getWiki() {
+ return $this->wiki;
+ }
+
+ /**
+ * @return string Job type that this queue handles
+ */
+ final public function getType() {
+ return $this->type;
+ }
+
+ /**
+ * @return string One of (random, timestamp, fifo)
+ */
+ final public function getOrder() {
+ return $this->order;
+ }
+
+ /**
+ * @return Array Subset of (random, timestamp, fifo)
+ */
+ abstract protected function supportedOrders();
+
+ /**
+ * @return string One of (random, timestamp, fifo)
+ */
+ abstract protected function optimalOrder();
+
+ /**
+ * Quickly check if the queue is empty (has no available jobs).
+ * Queue classes should use caching if they are any slower without memcached.
+ *
+ * If caching is used, this might return false when there are actually no jobs.
+ * If pop() is called and returns false then it should correct the cache. Also,
+ * calling flushCaches() first prevents this. However, this affect is typically
+ * not distinguishable from the race condition between isEmpty() and pop().
+ *
+ * @return bool
+ * @throws MWException
+ */
+ final public function isEmpty() {
+ wfProfileIn( __METHOD__ );
+ $res = $this->doIsEmpty();
+ wfProfileOut( __METHOD__ );
+ return $res;
+ }
+
+ /**
+ * @see JobQueue::isEmpty()
+ * @return bool
+ */
+ abstract protected function doIsEmpty();
+
+ /**
+ * Get the number of available (unacquired) jobs in the queue.
+ * Queue classes should use caching if they are any slower without memcached.
+ *
+ * If caching is used, this number might be out of date for a minute.
+ *
+ * @return integer
+ * @throws MWException
+ */
+ final public function getSize() {
+ wfProfileIn( __METHOD__ );
+ $res = $this->doGetSize();
+ wfProfileOut( __METHOD__ );
+ return $res;
+ }
+
+ /**
+ * @see JobQueue::getSize()
+ * @return integer
+ */
+ abstract protected function doGetSize();
+
+ /**
+ * Get the number of acquired jobs (these are temporarily out of the queue).
+ * Queue classes should use caching if they are any slower without memcached.
+ *
+ * If caching is used, this number might be out of date for a minute.
+ *
+ * @return integer
+ * @throws MWException
+ */
+ final public function getAcquiredCount() {
+ wfProfileIn( __METHOD__ );
+ $res = $this->doGetAcquiredCount();
+ wfProfileOut( __METHOD__ );
+ return $res;
+ }
+
+ /**
+ * @see JobQueue::getAcquiredCount()
+ * @return integer
+ */
+ abstract protected function doGetAcquiredCount();
+
+ /**
+ * Push a single jobs into the queue.
+ * This does not require $wgJobClasses to be set for the given job type.
+ * Outside callers should use JobQueueGroup::push() instead of this function.
+ *
+ * @param $jobs Job|Array
+ * @param $flags integer Bitfield (supports JobQueue::QoS_Atomic)
+ * @return bool Returns false on failure
+ * @throws MWException
+ */
+ final public function push( $jobs, $flags = 0 ) {
+ return $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags );
+ }
+
+ /**
+ * Push a batch of jobs into the queue.
+ * This does not require $wgJobClasses to be set for the given job type.
+ * Outside callers should use JobQueueGroup::push() instead of this function.
+ *
+ * @param array $jobs List of Jobs
+ * @param $flags integer Bitfield (supports JobQueue::QoS_Atomic)
+ * @return bool Returns false on failure
+ * @throws MWException
+ */
+ final public function batchPush( array $jobs, $flags = 0 ) {
+ if ( !count( $jobs ) ) {
+ return true; // nothing to do
+ }
+
+ foreach ( $jobs as $job ) {
+ if ( $job->getType() !== $this->type ) {
+ throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
+ }
+ }
+
+ wfProfileIn( __METHOD__ );
+ $ok = $this->doBatchPush( $jobs, $flags );
+ wfProfileOut( __METHOD__ );
+ return $ok;
+ }
+
+ /**
+ * @see JobQueue::batchPush()
+ * @return bool
+ */
+ abstract protected function doBatchPush( array $jobs, $flags );
+
+ /**
+ * Pop a job off of the queue.
+ * This requires $wgJobClasses to be set for the given job type.
+ * Outside callers should use JobQueueGroup::pop() instead of this function.
+ *
+ * @return Job|bool Returns false if there are no jobs
+ * @throws MWException
+ */
+ final public function pop() {
+ global $wgJobClasses;
+
+ if ( $this->wiki !== wfWikiID() ) {
+ throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." );
+ } elseif ( !isset( $wgJobClasses[$this->type] ) ) {
+ // Do not pop jobs if there is no class for the queue type
+ throw new MWException( "Unrecognized job type '{$this->type}'." );
+ }
+
+ wfProfileIn( __METHOD__ );
+ $job = $this->doPop();
+ wfProfileOut( __METHOD__ );
+ return $job;
+ }
+
+ /**
+ * @see JobQueue::pop()
+ * @return Job
+ */
+ abstract protected function doPop();
+
+ /**
+ * Acknowledge that a job was completed.
+ *
+ * This does nothing for certain queue classes or if "claimTTL" is not set.
+ * Outside callers should use JobQueueGroup::ack() instead of this function.
+ *
+ * @param $job Job
+ * @return bool
+ * @throws MWException
+ */
+ final public function ack( Job $job ) {
+ if ( $job->getType() !== $this->type ) {
+ throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
+ }
+ wfProfileIn( __METHOD__ );
+ $ok = $this->doAck( $job );
+ wfProfileOut( __METHOD__ );
+ return $ok;
+ }
+
+ /**
+ * @see JobQueue::ack()
+ * @return bool
+ */
+ abstract protected function doAck( Job $job );
+
+ /**
+ * Register the "root job" of a given job into the queue for de-duplication.
+ * This should only be called right *after* all the new jobs have been inserted.
+ * This is used to turn older, duplicate, job entries into no-ops. The root job
+ * information will remain in the registry until it simply falls out of cache.
+ *
+ * This requires that $job has two special fields in the "params" array:
+ * - rootJobSignature : hash (e.g. SHA1) that identifies the task
+ * - rootJobTimestamp : TS_MW timestamp of this instance of the task
+ *
+ * A "root job" is a conceptual job that consist of potentially many smaller jobs
+ * that are actually inserted into the queue. For example, "refreshLinks" jobs are
+ * spawned when a template is edited. One can think of the task as "update links
+ * of pages that use template X" and an instance of that task as a "root job".
+ * However, what actually goes into the queue are potentially many refreshLinks2 jobs.
+ * Since these jobs include things like page ID ranges and DB master positions, and morph
+ * into smaller refreshLinks2 jobs recursively, simple duplicate detection (like job_sha1)
+ * for individual jobs being identical is not useful.
+ *
+ * In the case of "refreshLinks", if these jobs are still in the queue when the template
+ * is edited again, we want all of these old refreshLinks jobs for that template to become
+ * no-ops. This can greatly reduce server load, since refreshLinks jobs involves parsing.
+ * Essentially, the new batch of jobs belong to a new "root job" and the older ones to a
+ * previous "root job" for the same task of "update links of pages that use template X".
+ *
+ * This does nothing for certain queue classes.
+ *
+ * @param $job Job
+ * @return bool
+ * @throws MWException
+ */
+ final public function deduplicateRootJob( Job $job ) {
+ if ( $job->getType() !== $this->type ) {
+ throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
+ }
+ wfProfileIn( __METHOD__ );
+ $ok = $this->doDeduplicateRootJob( $job );
+ wfProfileOut( __METHOD__ );
+ return $ok;
+ }
+
+ /**
+ * @see JobQueue::deduplicateRootJob()
+ * @param $job Job
+ * @return bool
+ */
+ protected function doDeduplicateRootJob( Job $job ) {
+ return true;
+ }
+
+ /**
+ * Wait for any slaves or backup servers to catch up.
+ *
+ * This does nothing for certain queue classes.
+ *
+ * @return void
+ * @throws MWException
+ */
+ final public function waitForBackups() {
+ wfProfileIn( __METHOD__ );
+ $this->doWaitForBackups();
+ wfProfileOut( __METHOD__ );
+ }
+
+ /**
+ * @see JobQueue::waitForBackups()
+ * @return void
+ */
+ protected function doWaitForBackups() {}
+
+ /**
+ * Return a map of task names to task definition maps.
+ * A "task" is a fast periodic queue maintenance action.
+ * Mutually exclusive tasks must implement their own locking in the callback.
+ *
+ * Each task value is an associative array with:
+ * - name : the name of the task
+ * - callback : a PHP callable that performs the task
+ * - period : the period in seconds corresponding to the task frequency
+ *
+ * @return Array
+ */
+ final public function getPeriodicTasks() {
+ $tasks = $this->doGetPeriodicTasks();
+ foreach ( $tasks as $name => &$def ) {
+ $def['name'] = $name;
+ }
+ return $tasks;
+ }
+
+ /**
+ * @see JobQueue::getPeriodicTasks()
+ * @return Array
+ */
+ protected function doGetPeriodicTasks() {
+ return array();
+ }
+
+ /**
+ * Clear any process and persistent caches
+ *
+ * @return void
+ */
+ final public function flushCaches() {
+ wfProfileIn( __METHOD__ );
+ $this->doFlushCaches();
+ wfProfileOut( __METHOD__ );
+ }
+
+ /**
+ * @see JobQueue::flushCaches()
+ * @return void
+ */
+ protected function doFlushCaches() {}
+
+ /**
+ * Get an iterator to traverse over all of the jobs in this queue.
+ * This does not include jobs that are current acquired. In general,
+ * this should only be called on a queue that is no longer being popped.
+ *
+ * @return Iterator|Traversable|Array
+ * @throws MWException
+ */
+ abstract public function getAllQueuedJobs();
+
+ /**
+ * Namespace the queue with a key to isolate it for testing
+ *
+ * @param $key string
+ * @return void
+ * @throws MWException
+ */
+ public function setTestingPrefix( $key ) {
+ throw new MWException( "Queue namespacing not supported for this queue type." );
+ }
+}
diff --git a/includes/job/JobQueueAggregator.php b/includes/job/JobQueueAggregator.php
new file mode 100644
index 00000000..3dba3c53
--- /dev/null
+++ b/includes/job/JobQueueAggregator.php
@@ -0,0 +1,139 @@
+<?php
+/**
+ * Job queue aggregator code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class to handle tracking information about all queues
+ *
+ * @ingroup JobQueue
+ * @since 1.21
+ */
+abstract class JobQueueAggregator {
+ /** @var JobQueueAggregator */
+ protected static $instance = null;
+
+ /**
+ * @param array $params
+ */
+ protected function __construct( array $params ) {}
+
+ /**
+ * @return JobQueueAggregator
+ */
+ final public static function singleton() {
+ global $wgJobQueueAggregator;
+
+ if ( !isset( self::$instance ) ) {
+ $class = $wgJobQueueAggregator['class'];
+ $obj = new $class( $wgJobQueueAggregator );
+ if ( !( $obj instanceof JobQueueAggregator ) ) {
+ throw new MWException( "Class '$class' is not a JobQueueAggregator class." );
+ }
+ self::$instance = $obj;
+ }
+
+ return self::$instance;
+ }
+
+ /**
+ * Destroy the singleton instance
+ *
+ * @return void
+ */
+ final public static function destroySingleton() {
+ self::$instance = null;
+ }
+
+ /**
+ * Mark a queue as being empty
+ *
+ * @param string $wiki
+ * @param string $type
+ * @return bool Success
+ */
+ final public function notifyQueueEmpty( $wiki, $type ) {
+ wfProfileIn( __METHOD__ );
+ $ok = $this->doNotifyQueueEmpty( $wiki, $type );
+ wfProfileOut( __METHOD__ );
+ return $ok;
+ }
+
+ /**
+ * @see JobQueueAggregator::notifyQueueEmpty()
+ */
+ abstract protected function doNotifyQueueEmpty( $wiki, $type );
+
+ /**
+ * Mark a queue as being non-empty
+ *
+ * @param string $wiki
+ * @param string $type
+ * @return bool Success
+ */
+ final public function notifyQueueNonEmpty( $wiki, $type ) {
+ wfProfileIn( __METHOD__ );
+ $ok = $this->doNotifyQueueNonEmpty( $wiki, $type );
+ wfProfileOut( __METHOD__ );
+ return $ok;
+ }
+
+ /**
+ * @see JobQueueAggregator::notifyQueueNonEmpty()
+ */
+ abstract protected function doNotifyQueueNonEmpty( $wiki, $type );
+
+ /**
+ * Get the list of all of the queues with jobs
+ *
+ * @return Array (job type => (list of wiki IDs))
+ */
+ final public function getAllReadyWikiQueues() {
+ wfProfileIn( __METHOD__ );
+ $res = $this->doGetAllReadyWikiQueues();
+ wfProfileOut( __METHOD__ );
+ return $res;
+ }
+
+ /**
+ * @see JobQueueAggregator::getAllReadyWikiQueues()
+ */
+ abstract protected function doGetAllReadyWikiQueues();
+
+ /**
+ * Get all databases that have a pending job.
+ * This poll all the queues and is this expensive.
+ *
+ * @return Array (job type => (list of wiki IDs))
+ */
+ protected function findPendingWikiQueues() {
+ global $wgLocalDatabases;
+
+ $pendingDBs = array(); // (job type => (db list))
+ foreach ( $wgLocalDatabases as $db ) {
+ foreach ( JobQueueGroup::singleton( $db )->getQueuesWithJobs() as $type ) {
+ $pendingDBs[$type][] = $db;
+ }
+ }
+
+ return $pendingDBs;
+ }
+}
diff --git a/includes/job/JobQueueAggregatorMemc.php b/includes/job/JobQueueAggregatorMemc.php
new file mode 100644
index 00000000..4b82cf92
--- /dev/null
+++ b/includes/job/JobQueueAggregatorMemc.php
@@ -0,0 +1,117 @@
+<?php
+/**
+ * Job queue aggregator code that uses BagOStuff.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class to handle tracking information about all queues using BagOStuff
+ *
+ * @ingroup JobQueue
+ * @since 1.21
+ */
+class JobQueueAggregatorMemc extends JobQueueAggregator {
+ /** @var BagOStuff */
+ protected $cache;
+
+ protected $cacheTTL; // integer; seconds
+
+ /**
+ * @params include:
+ * - objectCache : Name of an object cache registered in $wgObjectCaches.
+ * This defaults to the one specified by $wgMainCacheType.
+ * - cacheTTL : Seconds to cache the aggregate data before regenerating.
+ * @param array $params
+ */
+ protected function __construct( array $params ) {
+ parent::__construct( $params );
+ $this->cache = isset( $params['objectCache'] )
+ ? wfGetCache( $params['objectCache'] )
+ : wfGetMainCache();
+ $this->cacheTTL = isset( $params['cacheTTL'] ) ? $params['cacheTTL'] : 180; // 3 min
+ }
+
+ /**
+ * @see JobQueueAggregator::doNotifyQueueEmpty()
+ */
+ protected function doNotifyQueueEmpty( $wiki, $type ) {
+ $key = $this->getReadyQueueCacheKey();
+ // Delist the queue from the "ready queue" list
+ if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock
+ $curInfo = $this->cache->get( $key );
+ if ( is_array( $curInfo ) && isset( $curInfo['pendingDBs'][$type] ) ) {
+ if ( in_array( $wiki, $curInfo['pendingDBs'][$type] ) ) {
+ $curInfo['pendingDBs'][$type] = array_diff(
+ $curInfo['pendingDBs'][$type], array( $wiki ) );
+ $this->cache->set( $key, $curInfo );
+ }
+ }
+ $this->cache->delete( "$key:lock" ); // unlock
+ }
+ return true;
+ }
+
+ /**
+ * @see JobQueueAggregator::doNotifyQueueNonEmpty()
+ */
+ protected function doNotifyQueueNonEmpty( $wiki, $type ) {
+ return true; // updated periodically
+ }
+
+ /**
+ * @see JobQueueAggregator::doAllGetReadyWikiQueues()
+ */
+ protected function doGetAllReadyWikiQueues() {
+ $key = $this->getReadyQueueCacheKey();
+ // If the cache entry wasn't present, is stale, or in .1% of cases otherwise,
+ // regenerate the cache. Use any available stale cache if another process is
+ // currently regenerating the pending DB information.
+ $pendingDbInfo = $this->cache->get( $key );
+ if ( !is_array( $pendingDbInfo )
+ || ( time() - $pendingDbInfo['timestamp'] ) > $this->cacheTTL
+ || mt_rand( 0, 999 ) == 0
+ ) {
+ if ( $this->cache->add( "$key:rebuild", 1, 1800 ) ) { // lock
+ $pendingDbInfo = array(
+ 'pendingDBs' => $this->findPendingWikiQueues(),
+ 'timestamp' => time()
+ );
+ for ( $attempts=1; $attempts <= 25; ++$attempts ) {
+ if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock
+ $this->cache->set( $key, $pendingDbInfo );
+ $this->cache->delete( "$key:lock" ); // unlock
+ break;
+ }
+ }
+ $this->cache->delete( "$key:rebuild" ); // unlock
+ }
+ }
+ return is_array( $pendingDbInfo )
+ ? $pendingDbInfo['pendingDBs']
+ : array(); // cache is both empty and locked
+ }
+
+ /**
+ * @return string
+ */
+ private function getReadyQueueCacheKey() {
+ return "jobqueue:aggregator:ready-queues:v1"; // global
+ }
+}
diff --git a/includes/job/JobQueueAggregatorRedis.php b/includes/job/JobQueueAggregatorRedis.php
new file mode 100644
index 00000000..74e9171c
--- /dev/null
+++ b/includes/job/JobQueueAggregatorRedis.php
@@ -0,0 +1,165 @@
+<?php
+/**
+ * Job queue aggregator code that uses PhpRedis.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class to handle tracking information about all queues using PhpRedis
+ *
+ * @ingroup JobQueue
+ * @since 1.21
+ */
+class JobQueueAggregatorRedis extends JobQueueAggregator {
+ /** @var RedisConnectionPool */
+ protected $redisPool;
+
+ /**
+ * @params include:
+ * - redisConfig : An array of parameters to RedisConnectionPool::__construct().
+ * - redisServer : A hostname/port combination or the absolute path of a UNIX socket.
+ * If a hostname is specified but no port, the standard port number
+ * 6379 will be used. Required.
+ * @param array $params
+ */
+ protected function __construct( array $params ) {
+ parent::__construct( $params );
+ $this->server = $params['redisServer'];
+ $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
+ }
+
+ /**
+ * @see JobQueueAggregator::doNotifyQueueEmpty()
+ */
+ protected function doNotifyQueueEmpty( $wiki, $type ) {
+ $conn = $this->getConnection();
+ if ( !$conn ) {
+ return false;
+ }
+ try {
+ $conn->hDel( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ) );
+ return true;
+ } catch ( RedisException $e ) {
+ $this->handleException( $conn, $e );
+ return false;
+ }
+ }
+
+ /**
+ * @see JobQueueAggregator::doNotifyQueueNonEmpty()
+ */
+ protected function doNotifyQueueNonEmpty( $wiki, $type ) {
+ $conn = $this->getConnection();
+ if ( !$conn ) {
+ return false;
+ }
+ try {
+ $conn->hSet( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ), time() );
+ return true;
+ } catch ( RedisException $e ) {
+ $this->handleException( $conn, $e );
+ return false;
+ }
+ }
+
+ /**
+ * @see JobQueueAggregator::doAllGetReadyWikiQueues()
+ */
+ protected function doGetAllReadyWikiQueues() {
+ $conn = $this->getConnection();
+ if ( !$conn ) {
+ return array();
+ }
+ try {
+ $conn->multi( Redis::PIPELINE );
+ $conn->exists( $this->getReadyQueueKey() );
+ $conn->hGetAll( $this->getReadyQueueKey() );
+ list( $exists, $map ) = $conn->exec();
+
+ if ( $exists ) { // cache hit
+ $pendingDBs = array(); // (type => list of wikis)
+ foreach ( $map as $key => $time ) {
+ list( $type, $wiki ) = $this->dencQueueName( $key );
+ $pendingDBs[$type][] = $wiki;
+ }
+ } else { // cache miss
+ $pendingDBs = $this->findPendingWikiQueues(); // (type => list of wikis)
+
+ $now = time();
+ $map = array();
+ foreach ( $pendingDBs as $type => $wikis ) {
+ foreach ( $wikis as $wiki ) {
+ $map[$this->encQueueName( $type, $wiki )] = $now;
+ }
+ }
+ $conn->hMSet( $this->getReadyQueueKey(), $map );
+ }
+
+ return $pendingDBs;
+ } catch ( RedisException $e ) {
+ $this->handleException( $conn, $e );
+ return array();
+ }
+ }
+
+ /**
+ * Get a connection to the server that handles all sub-queues for this queue
+ *
+ * @return Array (server name, Redis instance)
+ * @throws MWException
+ */
+ protected function getConnection() {
+ return $this->redisPool->getConnection( $this->server );
+ }
+
+ /**
+ * @param RedisConnRef $conn
+ * @param RedisException $e
+ * @return void
+ */
+ protected function handleException( RedisConnRef $conn, $e ) {
+ $this->redisPool->handleException( $this->server, $conn, $e );
+ }
+
+ /**
+ * @return string
+ */
+ private function getReadyQueueKey() {
+ return "jobqueue:aggregator:h-ready-queues:v1"; // global
+ }
+
+ /**
+ * @param string $type
+ * @param string $wiki
+ * @return string
+ */
+ private function encQueueName( $type, $wiki ) {
+ return rawurlencode( $type ) . '/' . rawurlencode( $wiki );
+ }
+
+ /**
+ * @param string $name
+ * @return string
+ */
+ private function dencQueueName( $name ) {
+ list( $type, $wiki ) = explode( '/', $name, 2 );
+ return array( rawurldecode( $type ), rawurldecode( $wiki ) );
+ }
+}
diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php
new file mode 100644
index 00000000..ff7f7abc
--- /dev/null
+++ b/includes/job/JobQueueDB.php
@@ -0,0 +1,716 @@
+<?php
+/**
+ * Database-backed job queue code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class to handle job queues stored in the DB
+ *
+ * @ingroup JobQueue
+ * @since 1.21
+ */
+class JobQueueDB extends JobQueue {
+ const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days)
+ const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
+ const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
+ const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
+ const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
+ const MAX_OFFSET = 255; // integer; maximum number of rows to skip
+
+ protected $cluster = false; // string; name of an external DB cluster
+
+ /**
+ * Additional parameters include:
+ * - cluster : The name of an external cluster registered via LBFactory.
+ * If not specified, the primary DB cluster for the wiki will be used.
+ * This can be overridden with a custom cluster so that DB handles will
+ * be retrieved via LBFactory::getExternalLB() and getConnection().
+ * @param $params array
+ */
+ protected function __construct( array $params ) {
+ parent::__construct( $params );
+ $this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false;
+ }
+
+ protected function supportedOrders() {
+ return array( 'random', 'timestamp', 'fifo' );
+ }
+
+ protected function optimalOrder() {
+ return 'random';
+ }
+
+ /**
+ * @see JobQueue::doIsEmpty()
+ * @return bool
+ */
+ protected function doIsEmpty() {
+ global $wgMemc;
+
+ $key = $this->getCacheKey( 'empty' );
+
+ $isEmpty = $wgMemc->get( $key );
+ if ( $isEmpty === 'true' ) {
+ return true;
+ } elseif ( $isEmpty === 'false' ) {
+ return false;
+ }
+
+ list( $dbr, $scope ) = $this->getSlaveDB();
+ $found = $dbr->selectField( // unclaimed job
+ 'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__
+ );
+ $wgMemc->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG );
+
+ return !$found;
+ }
+
+ /**
+ * @see JobQueue::doGetSize()
+ * @return integer
+ */
+ protected function doGetSize() {
+ global $wgMemc;
+
+ $key = $this->getCacheKey( 'size' );
+
+ $size = $wgMemc->get( $key );
+ if ( is_int( $size ) ) {
+ return $size;
+ }
+
+ list( $dbr, $scope ) = $this->getSlaveDB();
+ $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
+ array( 'job_cmd' => $this->type, 'job_token' => '' ),
+ __METHOD__
+ );
+ $wgMemc->set( $key, $size, self::CACHE_TTL_SHORT );
+
+ return $size;
+ }
+
+ /**
+ * @see JobQueue::doGetAcquiredCount()
+ * @return integer
+ */
+ protected function doGetAcquiredCount() {
+ global $wgMemc;
+
+ if ( $this->claimTTL <= 0 ) {
+ return 0; // no acknowledgements
+ }
+
+ $key = $this->getCacheKey( 'acquiredcount' );
+
+ $count = $wgMemc->get( $key );
+ if ( is_int( $count ) ) {
+ return $count;
+ }
+
+ list( $dbr, $scope ) = $this->getSlaveDB();
+ $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
+ array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ),
+ __METHOD__
+ );
+ $wgMemc->set( $key, $count, self::CACHE_TTL_SHORT );
+
+ return $count;
+ }
+
+ /**
+ * @see JobQueue::doBatchPush()
+ * @param array $jobs
+ * @param $flags
+ * @throws DBError|Exception
+ * @return bool
+ */
+ protected function doBatchPush( array $jobs, $flags ) {
+ if ( count( $jobs ) ) {
+ list( $dbw, $scope ) = $this->getMasterDB();
+
+ $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated
+ $rowList = array(); // list of jobs for jobs that are are not de-duplicated
+
+ foreach ( $jobs as $job ) {
+ $row = $this->insertFields( $job );
+ if ( $job->ignoreDuplicates() ) {
+ $rowSet[$row['job_sha1']] = $row;
+ } else {
+ $rowList[] = $row;
+ }
+ }
+
+ $key = $this->getCacheKey( 'empty' );
+ $atomic = ( $flags & self::QoS_Atomic );
+
+ $dbw->onTransactionIdle(
+ function() use ( $dbw, $rowSet, $rowList, $atomic, $key, $scope
+ ) {
+ global $wgMemc;
+
+ if ( $atomic ) {
+ $dbw->begin( __METHOD__ ); // wrap all the job additions in one transaction
+ }
+ try {
+ // Strip out any duplicate jobs that are already in the queue...
+ if ( count( $rowSet ) ) {
+ $res = $dbw->select( 'job', 'job_sha1',
+ array(
+ // No job_type condition since it's part of the job_sha1 hash
+ 'job_sha1' => array_keys( $rowSet ),
+ 'job_token' => '' // unclaimed
+ ),
+ __METHOD__
+ );
+ foreach ( $res as $row ) {
+ wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." );
+ unset( $rowSet[$row->job_sha1] ); // already enqueued
+ }
+ }
+ // Build the full list of job rows to insert
+ $rows = array_merge( $rowList, array_values( $rowSet ) );
+ // Insert the job rows in chunks to avoid slave lag...
+ foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
+ $dbw->insert( 'job', $rowBatch, __METHOD__ );
+ }
+ wfIncrStats( 'job-insert', count( $rows ) );
+ wfIncrStats( 'job-insert-duplicate',
+ count( $rowSet ) + count( $rowList ) - count( $rows ) );
+ } catch ( DBError $e ) {
+ if ( $atomic ) {
+ $dbw->rollback( __METHOD__ );
+ }
+ throw $e;
+ }
+ if ( $atomic ) {
+ $dbw->commit( __METHOD__ );
+ }
+
+ $wgMemc->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
+ } );
+ }
+
+ return true;
+ }
+
+ /**
+ * @see JobQueue::doPop()
+ * @return Job|bool
+ */
+ protected function doPop() {
+ global $wgMemc;
+
+ if ( $wgMemc->get( $this->getCacheKey( 'empty' ) ) === 'true' ) {
+ return false; // queue is empty
+ }
+
+ list( $dbw, $scope ) = $this->getMasterDB();
+ $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
+ $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
+ $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
+ $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) {
+ $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
+ } );
+
+ $uuid = wfRandomString( 32 ); // pop attempt
+ $job = false; // job popped off
+ do { // retry when our row is invalid or deleted as a duplicate
+ // Try to reserve a row in the DB...
+ if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) {
+ $row = $this->claimOldest( $uuid );
+ } else { // random first
+ $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
+ $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
+ $row = $this->claimRandom( $uuid, $rand, $gte );
+ }
+ // Check if we found a row to reserve...
+ if ( !$row ) {
+ $wgMemc->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG );
+ break; // nothing to do
+ }
+ wfIncrStats( 'job-pop' );
+ // Get the job object from the row...
+ $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
+ if ( !$title ) {
+ $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
+ wfDebugLog( 'JobQueueDB', "Row has invalid title '{$row->job_title}'." );
+ continue; // try again
+ }
+ $job = Job::factory( $row->job_cmd, $title,
+ self::extractBlob( $row->job_params ), $row->job_id );
+ $job->id = $row->job_id; // XXX: work around broken subclasses
+ // Flag this job as an old duplicate based on its "root" job...
+ if ( $this->isRootJobOldDuplicate( $job ) ) {
+ wfIncrStats( 'job-pop-duplicate' );
+ $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
+ }
+ break; // done
+ } while( true );
+
+ return $job;
+ }
+
+ /**
+ * Reserve a row with a single UPDATE without holding row locks over RTTs...
+ *
+ * @param string $uuid 32 char hex string
+ * @param $rand integer Random unsigned integer (31 bits)
+ * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random)
+ * @return Row|false
+ */
+ protected function claimRandom( $uuid, $rand, $gte ) {
+ global $wgMemc;
+
+ list( $dbw, $scope ) = $this->getMasterDB();
+ // Check cache to see if the queue has <= OFFSET items
+ $tinyQueue = $wgMemc->get( $this->getCacheKey( 'small' ) );
+
+ $row = false; // the row acquired
+ $invertedDirection = false; // whether one job_random direction was already scanned
+ // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
+ // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
+ // not replication safe. Due to http://bugs.mysql.com/bug.php?id=6980, subqueries cannot
+ // be used here with MySQL.
+ do {
+ if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
+ // For small queues, using OFFSET will overshoot and return no rows more often.
+ // Instead, this uses job_random to pick a row (possibly checking both directions).
+ $ineq = $gte ? '>=' : '<=';
+ $dir = $gte ? 'ASC' : 'DESC';
+ $row = $dbw->selectRow( 'job', '*', // find a random job
+ array(
+ 'job_cmd' => $this->type,
+ 'job_token' => '', // unclaimed
+ "job_random {$ineq} {$dbw->addQuotes( $rand )}" ),
+ __METHOD__,
+ array( 'ORDER BY' => "job_random {$dir}" )
+ );
+ if ( !$row && !$invertedDirection ) {
+ $gte = !$gte;
+ $invertedDirection = true;
+ continue; // try the other direction
+ }
+ } else { // table *may* have >= MAX_OFFSET rows
+ // Bug 42614: "ORDER BY job_random" with a job_random inequality causes high CPU
+ // in MySQL if there are many rows for some reason. This uses a small OFFSET
+ // instead of job_random for reducing excess claim retries.
+ $row = $dbw->selectRow( 'job', '*', // find a random job
+ array(
+ 'job_cmd' => $this->type,
+ 'job_token' => '', // unclaimed
+ ),
+ __METHOD__,
+ array( 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) )
+ );
+ if ( !$row ) {
+ $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
+ $wgMemc->set( $this->getCacheKey( 'small' ), 1, 30 );
+ continue; // use job_random
+ }
+ }
+ if ( $row ) { // claim the job
+ $dbw->update( 'job', // update by PK
+ array(
+ 'job_token' => $uuid,
+ 'job_token_timestamp' => $dbw->timestamp(),
+ 'job_attempts = job_attempts+1' ),
+ array( 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ),
+ __METHOD__
+ );
+ // This might get raced out by another runner when claiming the previously
+ // selected row. The use of job_random should minimize this problem, however.
+ if ( !$dbw->affectedRows() ) {
+ $row = false; // raced out
+ }
+ } else {
+ break; // nothing to do
+ }
+ } while ( !$row );
+
+ return $row;
+ }
+
+ /**
+ * Reserve a row with a single UPDATE without holding row locks over RTTs...
+ *
+ * @param string $uuid 32 char hex string
+ * @return Row|false
+ */
+ protected function claimOldest( $uuid ) {
+ list( $dbw, $scope ) = $this->getMasterDB();
+
+ $row = false; // the row acquired
+ do {
+ if ( $dbw->getType() === 'mysql' ) {
+ // Per http://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
+ // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
+ // Oracle and Postgre have no such limitation. However, MySQL offers an
+ // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
+ $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
+ "SET " .
+ "job_token = {$dbw->addQuotes( $uuid ) }, " .
+ "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
+ "job_attempts = job_attempts+1 " .
+ "WHERE ( " .
+ "job_cmd = {$dbw->addQuotes( $this->type )} " .
+ "AND job_token = {$dbw->addQuotes( '' )} " .
+ ") ORDER BY job_id ASC LIMIT 1",
+ __METHOD__
+ );
+ } else {
+ // Use a subquery to find the job, within an UPDATE to claim it.
+ // This uses as much of the DB wrapper functions as possible.
+ $dbw->update( 'job',
+ array(
+ 'job_token' => $uuid,
+ 'job_token_timestamp' => $dbw->timestamp(),
+ 'job_attempts = job_attempts+1' ),
+ array( 'job_id = (' .
+ $dbw->selectSQLText( 'job', 'job_id',
+ array( 'job_cmd' => $this->type, 'job_token' => '' ),
+ __METHOD__,
+ array( 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ) ) .
+ ')'
+ ),
+ __METHOD__
+ );
+ }
+ // Fetch any row that we just reserved...
+ if ( $dbw->affectedRows() ) {
+ $row = $dbw->selectRow( 'job', '*',
+ array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__
+ );
+ if ( !$row ) { // raced out by duplicate job removal
+ wfDebugLog( 'JobQueueDB', "Row deleted as duplicate by another process." );
+ }
+ } else {
+ break; // nothing to do
+ }
+ } while ( !$row );
+
+ return $row;
+ }
+
+ /**
+ * Recycle or destroy any jobs that have been claimed for too long
+ *
+ * @return integer Number of jobs recycled/deleted
+ */
+ public function recycleAndDeleteStaleJobs() {
+ global $wgMemc;
+
+ $now = time();
+ list( $dbw, $scope ) = $this->getMasterDB();
+ $count = 0; // affected rows
+
+ if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
+ return $count; // already in progress
+ }
+
+ // Remove claims on jobs acquired for too long if enabled...
+ if ( $this->claimTTL > 0 ) {
+ $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
+ // Get the IDs of jobs that have be claimed but not finished after too long.
+ // These jobs can be recycled into the queue by expiring the claim. Selecting
+ // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
+ $res = $dbw->select( 'job', 'job_id',
+ array(
+ 'job_cmd' => $this->type,
+ "job_token != {$dbw->addQuotes( '' )}", // was acquired
+ "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
+ "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ), // retries left
+ __METHOD__
+ );
+ $ids = array_map( function( $o ) { return $o->job_id; }, iterator_to_array( $res ) );
+ if ( count( $ids ) ) {
+ // Reset job_token for these jobs so that other runners will pick them up.
+ // Set the timestamp to the current time, as it is useful to now that the job
+ // was already tried before (the timestamp becomes the "released" time).
+ $dbw->update( 'job',
+ array(
+ 'job_token' => '',
+ 'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release
+ array(
+ 'job_id' => $ids ),
+ __METHOD__
+ );
+ $count += $dbw->affectedRows();
+ wfIncrStats( 'job-recycle', $dbw->affectedRows() );
+ $wgMemc->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
+ }
+ }
+
+ // Just destroy any stale jobs...
+ $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
+ $conds = array(
+ 'job_cmd' => $this->type,
+ "job_token != {$dbw->addQuotes( '' )}", // was acquired
+ "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
+ );
+ if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
+ $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
+ }
+ // Get the IDs of jobs that are considered stale and should be removed. Selecting
+ // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
+ $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
+ $ids = array_map( function( $o ) { return $o->job_id; }, iterator_to_array( $res ) );
+ if ( count( $ids ) ) {
+ $dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ );
+ $count += $dbw->affectedRows();
+ }
+
+ $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
+
+ return $count;
+ }
+
+ /**
+ * @see JobQueue::doAck()
+ * @param Job $job
+ * @throws MWException
+ * @return Job|bool
+ */
+ protected function doAck( Job $job ) {
+ if ( !$job->getId() ) {
+ throw new MWException( "Job of type '{$job->getType()}' has no ID." );
+ }
+
+ list( $dbw, $scope ) = $this->getMasterDB();
+ $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
+ $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
+ $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
+ $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) {
+ $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
+ } );
+
+ // Delete a row with a single DELETE without holding row locks over RTTs...
+ $dbw->delete( 'job',
+ array( 'job_cmd' => $this->type, 'job_id' => $job->getId() ), __METHOD__ );
+
+ return true;
+ }
+
+ /**
+ * @see JobQueue::doDeduplicateRootJob()
+ * @param Job $job
+ * @throws MWException
+ * @return bool
+ */
+ protected function doDeduplicateRootJob( Job $job ) {
+ $params = $job->getParams();
+ if ( !isset( $params['rootJobSignature'] ) ) {
+ throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
+ } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
+ throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
+ }
+ $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
+ // Callers should call batchInsert() and then this function so that if the insert
+ // fails, the de-duplication registration will be aborted. Since the insert is
+ // deferred till "transaction idle", do the same here, so that the ordering is
+ // maintained. Having only the de-duplication registration succeed would cause
+ // jobs to become no-ops without any actual jobs that made them redundant.
+ list( $dbw, $scope ) = $this->getMasterDB();
+ $dbw->onTransactionIdle( function() use ( $params, $key, $scope ) {
+ global $wgMemc;
+
+ $timestamp = $wgMemc->get( $key ); // current last timestamp of this job
+ if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
+ return true; // a newer version of this root job was enqueued
+ }
+
+ // Update the timestamp of the last root job started at the location...
+ return $wgMemc->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
+ } );
+
+ return true;
+ }
+
+ /**
+ * Check if the "root" job of a given job has been superseded by a newer one
+ *
+ * @param $job Job
+ * @return bool
+ */
+ protected function isRootJobOldDuplicate( Job $job ) {
+ global $wgMemc;
+
+ $params = $job->getParams();
+ if ( !isset( $params['rootJobSignature'] ) ) {
+ return false; // job has no de-deplication info
+ } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
+ trigger_error( "Cannot check root job; missing 'rootJobTimestamp'." );
+ return false;
+ }
+
+ // Get the last time this root job was enqueued
+ $timestamp = $wgMemc->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
+
+ // Check if a new root job was started at the location after this one's...
+ return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
+ }
+
+ /**
+ * @see JobQueue::doWaitForBackups()
+ * @return void
+ */
+ protected function doWaitForBackups() {
+ wfWaitForSlaves();
+ }
+
+ /**
+ * @return Array
+ */
+ protected function doGetPeriodicTasks() {
+ return array(
+ 'recycleAndDeleteStaleJobs' => array(
+ 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
+ 'period' => ceil( $this->claimTTL / 2 )
+ )
+ );
+ }
+
+ /**
+ * @return void
+ */
+ protected function doFlushCaches() {
+ global $wgMemc;
+
+ foreach ( array( 'empty', 'size', 'acquiredcount' ) as $type ) {
+ $wgMemc->delete( $this->getCacheKey( $type ) );
+ }
+ }
+
+ /**
+ * @see JobQueue::getAllQueuedJobs()
+ * @return Iterator
+ */
+ public function getAllQueuedJobs() {
+ list( $dbr, $scope ) = $this->getSlaveDB();
+ return new MappedIterator(
+ $dbr->select( 'job', '*', array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ),
+ function( $row ) use ( $scope ) {
+ $job = Job::factory(
+ $row->job_cmd,
+ Title::makeTitle( $row->job_namespace, $row->job_title ),
+ strlen( $row->job_params ) ? unserialize( $row->job_params ) : false,
+ $row->job_id
+ );
+ $job->id = $row->job_id; // XXX: work around broken subclasses
+ return $job;
+ }
+ );
+ }
+
+ /**
+ * @return Array (DatabaseBase, ScopedCallback)
+ */
+ protected function getSlaveDB() {
+ return $this->getDB( DB_SLAVE );
+ }
+
+ /**
+ * @return Array (DatabaseBase, ScopedCallback)
+ */
+ protected function getMasterDB() {
+ return $this->getDB( DB_MASTER );
+ }
+
+ /**
+ * @param $index integer (DB_SLAVE/DB_MASTER)
+ * @return Array (DatabaseBase, ScopedCallback)
+ */
+ protected function getDB( $index ) {
+ $lb = ( $this->cluster !== false )
+ ? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki )
+ : wfGetLB( $this->wiki );
+ $conn = $lb->getConnection( $index, array(), $this->wiki );
+ return array(
+ $conn,
+ new ScopedCallback( function() use ( $lb, $conn ) {
+ $lb->reuseConnection( $conn );
+ } )
+ );
+ }
+
+ /**
+ * @param $job Job
+ * @return array
+ */
+ protected function insertFields( Job $job ) {
+ list( $dbw, $scope ) = $this->getMasterDB();
+ return array(
+ // Fields that describe the nature of the job
+ 'job_cmd' => $job->getType(),
+ 'job_namespace' => $job->getTitle()->getNamespace(),
+ 'job_title' => $job->getTitle()->getDBkey(),
+ 'job_params' => self::makeBlob( $job->getParams() ),
+ // Additional job metadata
+ 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
+ 'job_timestamp' => $dbw->timestamp(),
+ 'job_sha1' => wfBaseConvert(
+ sha1( serialize( $job->getDeduplicationInfo() ) ),
+ 16, 36, 31
+ ),
+ 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
+ );
+ }
+
+ /**
+ * @return string
+ */
+ private function getCacheKey( $property ) {
+ list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+ return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property );
+ }
+
+ /**
+ * @param string $signature Hash identifier of the root job
+ * @return string
+ */
+ private function getRootJobCacheKey( $signature ) {
+ list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+ return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
+ }
+
+ /**
+ * @param $params
+ * @return string
+ */
+ protected static function makeBlob( $params ) {
+ if ( $params !== false ) {
+ return serialize( $params );
+ } else {
+ return '';
+ }
+ }
+
+ /**
+ * @param $blob
+ * @return bool|mixed
+ */
+ protected static function extractBlob( $blob ) {
+ if ( (string)$blob !== '' ) {
+ return unserialize( $blob );
+ } else {
+ return false;
+ }
+ }
+}
diff --git a/includes/job/JobQueueGroup.php b/includes/job/JobQueueGroup.php
new file mode 100644
index 00000000..351c71a3
--- /dev/null
+++ b/includes/job/JobQueueGroup.php
@@ -0,0 +1,351 @@
+<?php
+/**
+ * Job queue base code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class to handle enqueueing of background jobs
+ *
+ * @ingroup JobQueue
+ * @since 1.21
+ */
+class JobQueueGroup {
+ /** @var Array */
+ protected static $instances = array();
+
+ /** @var ProcessCacheLRU */
+ protected $cache;
+
+ protected $wiki; // string; wiki ID
+
+ const TYPE_DEFAULT = 1; // integer; jobs popped by default
+ const TYPE_ANY = 2; // integer; any job
+
+ const USE_CACHE = 1; // integer; use process or persistent cache
+
+ const PROC_CACHE_TTL = 15; // integer; seconds
+
+ const CACHE_VERSION = 1; // integer; cache version
+
+ /**
+ * @param string $wiki Wiki ID
+ */
+ protected function __construct( $wiki ) {
+ $this->wiki = $wiki;
+ $this->cache = new ProcessCacheLRU( 10 );
+ }
+
+ /**
+ * @param string $wiki Wiki ID
+ * @return JobQueueGroup
+ */
+ public static function singleton( $wiki = false ) {
+ $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
+ if ( !isset( self::$instances[$wiki] ) ) {
+ self::$instances[$wiki] = new self( $wiki );
+ }
+ return self::$instances[$wiki];
+ }
+
+ /**
+ * Destroy the singleton instances
+ *
+ * @return void
+ */
+ public static function destroySingletons() {
+ self::$instances = array();
+ }
+
+ /**
+ * Get the job queue object for a given queue type
+ *
+ * @param $type string
+ * @return JobQueue
+ */
+ public function get( $type ) {
+ global $wgJobTypeConf;
+
+ $conf = array( 'wiki' => $this->wiki, 'type' => $type );
+ if ( isset( $wgJobTypeConf[$type] ) ) {
+ $conf = $conf + $wgJobTypeConf[$type];
+ } else {
+ $conf = $conf + $wgJobTypeConf['default'];
+ }
+
+ return JobQueue::factory( $conf );
+ }
+
+ /**
+ * Insert jobs into the respective queues of with the belong.
+ *
+ * This inserts the jobs into the queue specified by $wgJobTypeConf
+ * and updates the aggregate job queue information cache as needed.
+ *
+ * @param $jobs Job|array A single Job or a list of Jobs
+ * @throws MWException
+ * @return bool
+ */
+ public function push( $jobs ) {
+ $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
+
+ $jobsByType = array(); // (job type => list of jobs)
+ foreach ( $jobs as $job ) {
+ if ( $job instanceof Job ) {
+ $jobsByType[$job->getType()][] = $job;
+ } else {
+ throw new MWException( "Attempted to push a non-Job object into a queue." );
+ }
+ }
+
+ $ok = true;
+ foreach ( $jobsByType as $type => $jobs ) {
+ if ( $this->get( $type )->push( $jobs ) ) {
+ JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
+ } else {
+ $ok = false;
+ }
+ }
+
+ if ( $this->cache->has( 'queues-ready', 'list' ) ) {
+ $list = $this->cache->get( 'queues-ready', 'list' );
+ if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
+ $this->cache->clear( 'queues-ready' );
+ }
+ }
+
+ return $ok;
+ }
+
+ /**
+ * Pop a job off one of the job queues
+ *
+ * This pops a job off a queue as specified by $wgJobTypeConf and
+ * updates the aggregate job queue information cache as needed.
+ *
+ * @param $qtype integer|string JobQueueGroup::TYPE_DEFAULT or type string
+ * @param $flags integer Bitfield of JobQueueGroup::USE_* constants
+ * @return Job|bool Returns false on failure
+ */
+ public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0 ) {
+ if ( is_string( $qtype ) ) { // specific job type
+ $job = $this->get( $qtype )->pop();
+ if ( !$job ) {
+ JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype );
+ }
+ return $job;
+ } else { // any job in the "default" jobs types
+ if ( $flags & self::USE_CACHE ) {
+ if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
+ $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() );
+ }
+ $types = $this->cache->get( 'queues-ready', 'list' );
+ } else {
+ $types = $this->getQueuesWithJobs();
+ }
+
+ if ( $qtype == self::TYPE_DEFAULT ) {
+ $types = array_intersect( $types, $this->getDefaultQueueTypes() );
+ }
+ shuffle( $types ); // avoid starvation
+
+ foreach ( $types as $type ) { // for each queue...
+ $job = $this->get( $type )->pop();
+ if ( $job ) { // found
+ return $job;
+ } else { // not found
+ JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type );
+ $this->cache->clear( 'queues-ready' );
+ }
+ }
+
+ return false; // no jobs found
+ }
+ }
+
+ /**
+ * Acknowledge that a job was completed
+ *
+ * @param $job Job
+ * @return bool
+ */
+ public function ack( Job $job ) {
+ return $this->get( $job->getType() )->ack( $job );
+ }
+
+ /**
+ * Register the "root job" of a given job into the queue for de-duplication.
+ * This should only be called right *after* all the new jobs have been inserted.
+ *
+ * @param $job Job
+ * @return bool
+ */
+ public function deduplicateRootJob( Job $job ) {
+ return $this->get( $job->getType() )->deduplicateRootJob( $job );
+ }
+
+ /**
+ * Wait for any slaves or backup queue servers to catch up.
+ *
+ * This does nothing for certain queue classes.
+ *
+ * @return void
+ * @throws MWException
+ */
+ public function waitForBackups() {
+ global $wgJobTypeConf;
+
+ wfProfileIn( __METHOD__ );
+ // Try to avoid doing this more than once per queue storage medium
+ foreach ( $wgJobTypeConf as $type => $conf ) {
+ $this->get( $type )->waitForBackups();
+ }
+ wfProfileOut( __METHOD__ );
+ }
+
+ /**
+ * Get the list of queue types
+ *
+ * @return array List of strings
+ */
+ public function getQueueTypes() {
+ return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
+ }
+
+ /**
+ * Get the list of default queue types
+ *
+ * @return array List of strings
+ */
+ public function getDefaultQueueTypes() {
+ global $wgJobTypesExcludedFromDefaultQueue;
+
+ return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
+ }
+
+ /**
+ * Get the list of job types that have non-empty queues
+ *
+ * @return Array List of job types that have non-empty queues
+ */
+ public function getQueuesWithJobs() {
+ $types = array();
+ foreach ( $this->getQueueTypes() as $type ) {
+ if ( !$this->get( $type )->isEmpty() ) {
+ $types[] = $type;
+ }
+ }
+ return $types;
+ }
+
+ /**
+ * Check if jobs should not be popped of a queue right now.
+ * This is only used for performance, such as to avoid spamming
+ * the queue with many sub-jobs before they actually get run.
+ *
+ * @param $type string
+ * @return bool
+ */
+ public function isQueueDeprioritized( $type ) {
+ if ( $type === 'refreshLinks2' ) {
+ // Don't keep converting refreshLinks2 => refreshLinks jobs if the
+ // later jobs have not been done yet. This helps throttle queue spam.
+ return !$this->get( 'refreshLinks' )->isEmpty();
+ }
+ return false;
+ }
+
+ /**
+ * Execute any due periodic queue maintenance tasks for all queues.
+ *
+ * A task is "due" if the time ellapsed since the last run is greater than
+ * the defined run period. Concurrent calls to this function will cause tasks
+ * to be attempted twice, so they may need their own methods of mutual exclusion.
+ *
+ * @return integer Number of tasks run
+ */
+ public function executeReadyPeriodicTasks() {
+ global $wgMemc;
+
+ list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+ $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' );
+ $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp)
+
+ $count = 0;
+ $tasksRun = array(); // (queue => task => UNIX timestamp)
+ foreach ( $this->getQueueTypes() as $type ) {
+ $queue = $this->get( $type );
+ foreach ( $queue->getPeriodicTasks() as $task => $definition ) {
+ if ( $definition['period'] <= 0 ) {
+ continue; // disabled
+ } elseif ( !isset( $lastRuns[$type][$task] )
+ || $lastRuns[$type][$task] < ( time() - $definition['period'] ) )
+ {
+ if ( call_user_func( $definition['callback'] ) !== null ) {
+ $tasksRun[$type][$task] = time();
+ ++$count;
+ }
+ }
+ }
+ }
+
+ $wgMemc->merge( $key, function( $cache, $key, $lastRuns ) use ( $tasksRun ) {
+ if ( is_array( $lastRuns ) ) {
+ foreach ( $tasksRun as $type => $tasks ) {
+ foreach ( $tasks as $task => $timestamp ) {
+ if ( !isset( $lastRuns[$type][$task] )
+ || $timestamp > $lastRuns[$type][$task] )
+ {
+ $lastRuns[$type][$task] = $timestamp;
+ }
+ }
+ }
+ } else {
+ $lastRuns = $tasksRun;
+ }
+ return $lastRuns;
+ } );
+
+ return $count;
+ }
+
+ /**
+ * @param $name string
+ * @return mixed
+ */
+ private function getCachedConfigVar( $name ) {
+ global $wgConf, $wgMemc;
+
+ if ( $this->wiki === wfWikiID() ) {
+ return $GLOBALS[$name]; // common case
+ } else {
+ list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+ $key = wfForeignMemcKey( $db, $prefix, 'configvalue', $name );
+ $value = $wgMemc->get( $key ); // ('v' => ...) or false
+ if ( is_array( $value ) ) {
+ return $value['v'];
+ } else {
+ $value = $wgConf->getConfig( $this->wiki, $name );
+ $wgMemc->set( $key, array( 'v' => $value ), 86400 + mt_rand( 0, 86400 ) );
+ return $value;
+ }
+ }
+ }
+}
diff --git a/includes/job/README b/includes/job/README
new file mode 100644
index 00000000..c11d5a78
--- /dev/null
+++ b/includes/job/README
@@ -0,0 +1,81 @@
+/*!
+\ingroup JobQueue
+\page jobqueue_design Job queue design
+
+Notes on the Job queuing system architecture.
+
+\section intro Introduction
+
+The data model consist of the following main components:
+* The Job object represents a particular deferred task that happens in the
+ background. All jobs subclass the Job object and put the main logic in the
+ function called run().
+* The JobQueue object represents a particular queue of jobs of a certain type.
+ For example there may be a queue for email jobs and a queue for squid purge
+ jobs.
+
+\section jobqueue Job queues
+
+Each job type has its own queue and is associated to a storage medium. One
+queue might save its jobs in redis while another one uses would use a database.
+
+Storage medium are defined in a queue class. Before using it, you must
+define in $wgJobTypeConf a mapping of the job type to a queue class.
+
+The factory class JobQueueGroup provides helper functions:
+- getting the queue for a given job
+- route new job insertions to the proper queue
+
+The following queue classes are available:
+* JobQueueDB (stores jobs in the `job` table in a database)
+* JobQueueRedis (stores jobs in a redis server)
+
+All queue classes support some basic operations (though some may be no-ops):
+* enqueueing a batch of jobs
+* dequeueing a single job
+* acknowledging a job is completed
+* checking if the queue is empty
+
+Some queue classes (like JobQueueDB) may dequeue jobs in random order while other
+queues might dequeue jobs in exact FIFO order. Callers should thus not assume jobs
+are executed in FIFO order.
+
+Also note that not all queue classes will have the same reliability guarantees.
+In-memory queues may lose data when restarted depending on snapshot and journal
+settings (including journal fsync() frequency). Some queue types may totally remove
+jobs when dequeued while leaving the ack() function as a no-op; if a job is
+dequeued by a job runner, which crashes before completion, the job will be
+lost. Some jobs, like purging squid caches after a template change, may not
+require durable queues, whereas other jobs might be more important.
+
+\section aggregator Job queue aggregator
+
+The aggregators are used by nextJobDB.php, which is a script that will return a
+random ready queue (on any wiki in the farm) that can be used with runJobs.php.
+This can be used in conjunction with any scripts that handle wiki farm job queues.
+Note that $wgLocalDatabases defines what wikis are in the wiki farm.
+
+Since each job type has its own queue, and wiki-farms may have many wikis,
+there might be a large number of queues to keep track of. To avoid wasting
+large amounts of time polling empty queues, aggregators exists to keep track
+of which queues are ready.
+
+The following queue aggregator classes are available:
+* JobQueueAggregatorMemc (uses $wgMemc to track ready queues)
+* JobQueueAggregatorRedis (uses a redis server to track ready queues)
+
+Some aggregators cache data for a few minutes while others may be always up to date.
+This can be an important factor for jobs that need a low pickup time (or latency).
+
+\section jobs Jobs
+
+Callers should also try to make jobs maintain correctness when executed twice.
+This is useful for queues that actually implement ack(), since they may recycle
+dequeued but un-acknowledged jobs back into the queue to be attempted again. If
+a runner dequeues a job, runs it, but then crashes before calling ack(), the
+job may be returned to the queue and run a second time. Jobs like cache purging can
+happen several times without any correctness problems. However, a pathological case
+would be if a bug causes the problem to systematically keep repeating. For example,
+a job may always throw a DB error at the end of run(). This problem is trickier to
+solve and more obnoxious for things like email jobs, for example. For such jobs,
+it might be useful to use a queue that does not retry jobs.
diff --git a/includes/job/RefreshLinksJob.php b/includes/job/RefreshLinksJob.php
deleted file mode 100644
index b23951c6..00000000
--- a/includes/job/RefreshLinksJob.php
+++ /dev/null
@@ -1,202 +0,0 @@
-<?php
-/**
- * Job to update links for a given title.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- * @ingroup JobQueue
- */
-
-/**
- * Background job to update links for a given title.
- *
- * @ingroup JobQueue
- */
-class RefreshLinksJob extends Job {
-
- function __construct( $title, $params = '', $id = 0 ) {
- parent::__construct( 'refreshLinks', $title, $params, $id );
- }
-
- /**
- * Run a refreshLinks job
- * @return boolean success
- */
- function run() {
- wfProfileIn( __METHOD__ );
-
- $linkCache = LinkCache::singleton();
- $linkCache->clear();
-
- if ( is_null( $this->title ) ) {
- $this->error = "refreshLinks: Invalid title";
- wfProfileOut( __METHOD__ );
- return false;
- }
-
- # Wait for the DB of the current/next slave DB handle to catch up to the master.
- # This way, we get the correct page_latest for templates or files that just changed
- # milliseconds ago, having triggered this job to begin with.
- if ( isset( $this->params['masterPos'] ) ) {
- wfGetLB()->waitFor( $this->params['masterPos'] );
- }
-
- $revision = Revision::newFromTitle( $this->title, false, Revision::READ_NORMAL );
- if ( !$revision ) {
- $this->error = 'refreshLinks: Article not found "' .
- $this->title->getPrefixedDBkey() . '"';
- wfProfileOut( __METHOD__ );
- return false; // XXX: what if it was just deleted?
- }
-
- self::runForTitleInternal( $this->title, $revision, __METHOD__ );
-
- wfProfileOut( __METHOD__ );
- return true;
- }
-
- public static function runForTitleInternal( Title $title, Revision $revision, $fname ) {
- global $wgParser, $wgContLang;
-
- wfProfileIn( $fname . '-parse' );
- $options = ParserOptions::newFromUserAndLang( new User, $wgContLang );
- $parserOutput = $wgParser->parse(
- $revision->getText(), $title, $options, true, true, $revision->getId() );
- wfProfileOut( $fname . '-parse' );
-
- wfProfileIn( $fname . '-update' );
- $updates = $parserOutput->getSecondaryDataUpdates( $title, false );
- DataUpdate::runUpdates( $updates );
- wfProfileOut( $fname . '-update' );
- }
-}
-
-/**
- * Background job to update links for a given title.
- * Newer version for high use templates.
- *
- * @ingroup JobQueue
- */
-class RefreshLinksJob2 extends Job {
- const MAX_TITLES_RUN = 10;
-
- function __construct( $title, $params, $id = 0 ) {
- parent::__construct( 'refreshLinks2', $title, $params, $id );
- }
-
- /**
- * Run a refreshLinks2 job
- * @return boolean success
- */
- function run() {
- wfProfileIn( __METHOD__ );
-
- $linkCache = LinkCache::singleton();
- $linkCache->clear();
-
- if ( is_null( $this->title ) ) {
- $this->error = "refreshLinks2: Invalid title";
- wfProfileOut( __METHOD__ );
- return false;
- } elseif ( !isset( $this->params['start'] ) || !isset( $this->params['end'] ) ) {
- $this->error = "refreshLinks2: Invalid params";
- wfProfileOut( __METHOD__ );
- return false;
- }
-
- // Back compat for pre-r94435 jobs
- $table = isset( $this->params['table'] ) ? $this->params['table'] : 'templatelinks';
-
- // Avoid slave lag when fetching templates
- if ( isset( $this->params['masterPos'] ) ) {
- $masterPos = $this->params['masterPos'];
- } elseif ( wfGetLB()->getServerCount() > 1 ) {
- $masterPos = wfGetLB()->getMasterPos();
- } else {
- $masterPos = false;
- }
-
- $titles = $this->title->getBacklinkCache()->getLinks(
- $table, $this->params['start'], $this->params['end'] );
-
- if ( $titles->count() > self::MAX_TITLES_RUN ) {
- # We don't want to parse too many pages per job as it can starve other jobs.
- # If there are too many pages to parse, break this up into smaller jobs. By passing
- # in the master position here we can cut down on the time spent waiting for slaves to
- # catch up by the runners handling these jobs since time will have passed between now
- # and when they pop these jobs off the queue.
- $start = 0; // batch start
- $end = 0; // batch end
- $bsize = 0; // batch size
- $first = true; // first of batch
- $jobs = array();
- foreach ( $titles as $title ) {
- $start = $first ? $title->getArticleId() : $start;
- $end = $title->getArticleId();
- $first = false;
- if ( ++$bsize >= self::MAX_TITLES_RUN ) {
- $jobs[] = new RefreshLinksJob2( $this->title, array(
- 'table' => $table,
- 'start' => $start,
- 'end' => $end,
- 'masterPos' => $masterPos
- ) );
- $first = true;
- $start = $end = $bsize = 0;
- }
- }
- if ( $bsize > 0 ) { // group remaining pages into a job
- $jobs[] = new RefreshLinksJob2( $this->title, array(
- 'table' => $table,
- 'start' => $start,
- 'end' => $end,
- 'masterPos' => $masterPos
- ) );
- }
- Job::batchInsert( $jobs );
- } elseif ( php_sapi_name() != 'cli' ) {
- # Not suitable for page load triggered job running!
- # Gracefully switch to refreshLinks jobs if this happens.
- $jobs = array();
- foreach ( $titles as $title ) {
- $jobs[] = new RefreshLinksJob( $title, array( 'masterPos' => $masterPos ) );
- }
- Job::batchInsert( $jobs );
- } else {
- # Wait for the DB of the current/next slave DB handle to catch up to the master.
- # This way, we get the correct page_latest for templates or files that just changed
- # milliseconds ago, having triggered this job to begin with.
- if ( $masterPos ) {
- wfGetLB()->waitFor( $masterPos );
- }
- # Re-parse each page that transcludes this page and update their tracking links...
- foreach ( $titles as $title ) {
- $revision = Revision::newFromTitle( $title, false, Revision::READ_NORMAL );
- if ( !$revision ) {
- $this->error = 'refreshLinks: Article not found "' .
- $title->getPrefixedDBkey() . '"';
- continue; // skip this page
- }
- RefreshLinksJob::runForTitleInternal( $title, $revision, __METHOD__ );
- wfWaitForSlaves();
- }
- }
-
- wfProfileOut( __METHOD__ );
- return true;
- }
-}
diff --git a/includes/job/jobs/AssembleUploadChunksJob.php b/includes/job/jobs/AssembleUploadChunksJob.php
new file mode 100644
index 00000000..c5dd9eaa
--- /dev/null
+++ b/includes/job/jobs/AssembleUploadChunksJob.php
@@ -0,0 +1,118 @@
+<?php
+/**
+ * Assemble the segments of a chunked upload.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup Upload
+ */
+
+/**
+ * Assemble the segments of a chunked upload.
+ *
+ * @ingroup Upload
+ */
+class AssembleUploadChunksJob extends Job {
+ public function __construct( $title, $params, $id = 0 ) {
+ parent::__construct( 'AssembleUploadChunks', $title, $params, $id );
+ $this->removeDuplicates = true;
+ }
+
+ public function run() {
+ $scope = RequestContext::importScopedSession( $this->params['session'] );
+ $context = RequestContext::getMain();
+ try {
+ $user = $context->getUser();
+ if ( !$user->isLoggedIn() ) {
+ $this->setLastError( "Could not load the author user from session." );
+ return false;
+ }
+
+ UploadBase::setSessionStatus(
+ $this->params['filekey'],
+ array( 'result' => 'Poll', 'stage' => 'assembling', 'status' => Status::newGood() )
+ );
+
+ $upload = new UploadFromChunks( $user );
+ $upload->continueChunks(
+ $this->params['filename'],
+ $this->params['filekey'],
+ $context->getRequest()
+ );
+
+ // Combine all of the chunks into a local file and upload that to a new stash file
+ $status = $upload->concatenateChunks();
+ if ( !$status->isGood() ) {
+ UploadBase::setSessionStatus(
+ $this->params['filekey'],
+ array( 'result' => 'Failure', 'stage' => 'assembling', 'status' => $status )
+ );
+ $this->setLastError( $status->getWikiText() );
+ return false;
+ }
+
+ // We have a new filekey for the fully concatenated file
+ $newFileKey = $upload->getLocalFile()->getFileKey();
+
+ // Remove the old stash file row and first chunk file
+ $upload->stash->removeFileNoAuth( $this->params['filekey'] );
+
+ // Build the image info array while we have the local reference handy
+ $apiMain = new ApiMain(); // dummy object (XXX)
+ $imageInfo = $upload->getImageInfo( $apiMain->getResult() );
+
+ // Cleanup any temporary local file
+ $upload->cleanupTempFile();
+
+ // Cache the info so the user doesn't have to wait forever to get the final info
+ UploadBase::setSessionStatus(
+ $this->params['filekey'],
+ array(
+ 'result' => 'Success',
+ 'stage' => 'assembling',
+ 'filekey' => $newFileKey,
+ 'imageinfo' => $imageInfo,
+ 'status' => Status::newGood()
+ )
+ );
+ } catch ( MWException $e ) {
+ UploadBase::setSessionStatus(
+ $this->params['filekey'],
+ array(
+ 'result' => 'Failure',
+ 'stage' => 'assembling',
+ 'status' => Status::newFatal( 'api-error-stashfailed' )
+ )
+ );
+ $this->setLastError( get_class( $e ) . ": " . $e->getText() );
+ return false;
+ }
+ return true;
+ }
+
+ public function getDeduplicationInfo() {
+ $info = parent::getDeduplicationInfo();
+ if ( is_array( $info['params'] ) ) {
+ $info['params'] = array( 'filekey' => $info['params']['filekey'] );
+ }
+ return $info;
+ }
+
+ public function allowRetries() {
+ return false;
+ }
+}
diff --git a/includes/job/DoubleRedirectJob.php b/includes/job/jobs/DoubleRedirectJob.php
index 08af9975..05abeeef 100644
--- a/includes/job/DoubleRedirectJob.php
+++ b/includes/job/jobs/DoubleRedirectJob.php
@@ -27,7 +27,7 @@
* @ingroup JobQueue
*/
class DoubleRedirectJob extends Job {
- var $reason, $redirTitle, $destTitleText;
+ var $reason, $redirTitle;
/**
* @var User
@@ -36,9 +36,9 @@ class DoubleRedirectJob extends Job {
/**
* Insert jobs into the job queue to fix redirects to the given title
- * @param $reason String: the reason for the fix, see message "double-redirect-fixed-<reason>"
+ * @param string $reason the reason for the fix, see message "double-redirect-fixed-<reason>"
* @param $redirTitle Title: the title which has changed, redirects pointing to this title are fixed
- * @param $destTitle bool Not used
+ * @param bool $destTitle Not used
*/
public static function fixRedirects( $reason, $redirTitle, $destTitle = false ) {
# Need to use the master to get the redirect table updated in the same transaction
@@ -66,18 +66,17 @@ class DoubleRedirectJob extends Job {
'redirTitle' => $redirTitle->getPrefixedDBkey() ) );
# Avoid excessive memory usage
if ( count( $jobs ) > 10000 ) {
- Job::batchInsert( $jobs );
+ JobQueueGroup::singleton()->push( $jobs );
$jobs = array();
}
}
- Job::batchInsert( $jobs );
+ JobQueueGroup::singleton()->push( $jobs );
}
function __construct( $title, $params = false, $id = 0 ) {
parent::__construct( 'fixDoubleRedirect', $title, $params, $id );
$this->reason = $params['reason'];
$this->redirTitle = Title::newFromText( $params['redirTitle'] );
- $this->destTitleText = !empty( $params['destTitle'] ) ? $params['destTitle'] : '';
}
/**
@@ -94,8 +93,8 @@ class DoubleRedirectJob extends Job {
wfDebug( __METHOD__.": target redirect already deleted, ignoring\n" );
return true;
}
- $text = $targetRev->getText();
- $currentDest = Title::newFromRedirect( $text );
+ $content = $targetRev->getContent();
+ $currentDest = $content ? $content->getRedirectTarget() : null;
if ( !$currentDest || !$currentDest->equals( $this->redirTitle ) ) {
wfDebug( __METHOD__.": Redirect has changed since the job was queued\n" );
return true;
@@ -103,7 +102,7 @@ class DoubleRedirectJob extends Job {
# Check for a suppression tag (used e.g. in periodically archived discussions)
$mw = MagicWord::get( 'staticredirect' );
- if ( $mw->match( $text ) ) {
+ if ( $content->matchMagicWord( $mw ) ) {
wfDebug( __METHOD__.": skipping: suppressed with __STATICREDIRECT__\n" );
return true;
}
@@ -122,29 +121,31 @@ class DoubleRedirectJob extends Job {
# Preserve fragment (bug 14904)
$newTitle = Title::makeTitle( $newTitle->getNamespace(), $newTitle->getDBkey(),
- $currentDest->getFragment() );
+ $currentDest->getFragment(), $newTitle->getInterwiki() );
# Fix the text
- # Remember that redirect pages can have categories, templates, etc.,
- # so the regex has to be fairly general
- $newText = preg_replace( '/ \[ \[ [^\]]* \] \] /x',
- '[[' . $newTitle->getFullText() . ']]',
- $text, 1 );
-
- if ( $newText === $text ) {
- $this->setLastError( 'Text unchanged???' );
+ $newContent = $content->updateRedirect( $newTitle );
+
+ if ( $newContent->equals( $content ) ) {
+ $this->setLastError( 'Content unchanged???' );
+ return false;
+ }
+
+ $user = $this->getUser();
+ if ( !$user ) {
+ $this->setLastError( 'Invalid user' );
return false;
}
# Save it
global $wgUser;
$oldUser = $wgUser;
- $wgUser = $this->getUser();
+ $wgUser = $user;
$article = WikiPage::factory( $this->title );
$reason = wfMessage( 'double-redirect-fixed-' . $this->reason,
$this->redirTitle->getPrefixedText(), $newTitle->getPrefixedText()
)->inContentLanguage()->text();
- $article->doEdit( $newText, $reason, EDIT_UPDATE | EDIT_SUPPRESS_RC, false, $this->getUser() );
+ $article->doEditContent( $newContent, $reason, EDIT_UPDATE | EDIT_SUPPRESS_RC, false, $user );
$wgUser = $oldUser;
return true;
@@ -171,9 +172,17 @@ class DoubleRedirectJob extends Job {
}
$seenTitles[$titleText] = true;
+ if ( $title->getInterwiki() ) {
+ // If the target is interwiki, we have to break early (bug 40352).
+ // Otherwise it will look up a row in the local page table
+ // with the namespace/page of the interwiki target which can cause
+ // unexpected results (e.g. X -> foo:Bar -> Bar -> .. )
+ break;
+ }
+
$row = $dbw->selectRow(
array( 'redirect', 'page' ),
- array( 'rd_namespace', 'rd_title' ),
+ array( 'rd_namespace', 'rd_title', 'rd_interwiki' ),
array(
'rd_from=page_id',
'page_namespace' => $title->getNamespace(),
@@ -183,7 +192,7 @@ class DoubleRedirectJob extends Job {
# No redirect from here, chain terminates
break;
} else {
- $dest = $title = Title::makeTitle( $row->rd_namespace, $row->rd_title );
+ $dest = $title = Title::makeTitle( $row->rd_namespace, $row->rd_title, '', $row->rd_interwiki );
}
}
return $dest;
@@ -191,17 +200,19 @@ class DoubleRedirectJob extends Job {
/**
* Get a user object for doing edits, from a request-lifetime cache
- * @return User
+ * False will be returned if the user name specified in the
+ * 'double-redirect-fixer' message is invalid.
+ *
+ * @return User|bool
*/
function getUser() {
if ( !self::$user ) {
- self::$user = User::newFromName( wfMessage( 'double-redirect-fixer' )->inContentLanguage()->text(), false );
- # FIXME: newFromName could return false on a badly configured wiki.
- if ( !self::$user->isLoggedIn() ) {
+ self::$user = User::newFromName( wfMessage( 'double-redirect-fixer' )->inContentLanguage()->text() );
+ # User::newFromName() can return false on a badly configured wiki.
+ if ( self::$user && !self::$user->isLoggedIn() ) {
self::$user->addToDatabase();
}
}
return self::$user;
}
}
-
diff --git a/includes/job/jobs/DuplicateJob.php b/includes/job/jobs/DuplicateJob.php
new file mode 100644
index 00000000..524983b8
--- /dev/null
+++ b/includes/job/jobs/DuplicateJob.php
@@ -0,0 +1,59 @@
+<?php
+/**
+ * No-op job that does nothing.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup Cache
+ */
+
+/**
+ * No-op job that does nothing. Used to represent duplicates.
+ *
+ * @ingroup JobQueue
+ */
+final class DuplicateJob extends Job {
+ /**
+ * Callers should use DuplicateJob::newFromJob() instead
+ *
+ * @param $title Title
+ * @param array $params job parameters
+ * @param $id Integer: job id
+ */
+ function __construct( $title, $params, $id = 0 ) {
+ parent::__construct( 'duplicate', $title, $params, $id );
+ }
+
+ /**
+ * Get a duplicate no-op version of a job
+ *
+ * @param Job $job
+ * @return Job
+ */
+ public static function newFromJob( Job $job ) {
+ $djob = new self( $job->getTitle(), $job->getParams(), $job->getId() );
+ $djob->command = $job->getType();
+ $djob->params = is_array( $djob->params ) ? $djob->params : array();
+ $djob->params = array( 'isDuplicate' => true ) + $djob->params;
+ $djob->metadata = $job->metadata;
+ return $djob;
+ }
+
+ public function run() {
+ return true;
+ }
+}
diff --git a/includes/job/EmaillingJob.php b/includes/job/jobs/EmaillingJob.php
index d3599882..9fbf3124 100644
--- a/includes/job/EmaillingJob.php
+++ b/includes/job/jobs/EmaillingJob.php
@@ -33,14 +33,15 @@ class EmaillingJob extends Job {
}
function run() {
- UserMailer::send(
+ $status = UserMailer::send(
$this->params['to'],
$this->params['from'],
$this->params['subj'],
$this->params['body'],
$this->params['replyto']
);
- return true;
+
+ return $status->isOK();
}
}
diff --git a/includes/job/EnotifNotifyJob.php b/includes/job/jobs/EnotifNotifyJob.php
index b4c925e9..2be05b63 100644
--- a/includes/job/EnotifNotifyJob.php
+++ b/includes/job/jobs/EnotifNotifyJob.php
@@ -49,7 +49,8 @@ class EnotifNotifyJob extends Job {
$this->params['summary'],
$this->params['minorEdit'],
$this->params['oldid'],
- $this->params['watchers']
+ $this->params['watchers'],
+ $this->params['pageStatus']
);
return true;
}
diff --git a/includes/job/jobs/HTMLCacheUpdateJob.php b/includes/job/jobs/HTMLCacheUpdateJob.php
new file mode 100644
index 00000000..818c6abf
--- /dev/null
+++ b/includes/job/jobs/HTMLCacheUpdateJob.php
@@ -0,0 +1,254 @@
+<?php
+/**
+ * HTML cache invalidation of all pages linking to a given title.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup Cache
+ */
+
+/**
+ * Job wrapper for HTMLCacheUpdate. Gets run whenever a related
+ * job gets called from the queue.
+ *
+ * This class is designed to work efficiently with small numbers of links, and
+ * to work reasonably well with up to ~10^5 links. Above ~10^6 links, the memory
+ * and time requirements of loading all backlinked IDs in doUpdate() might become
+ * prohibitive. The requirements measured at Wikimedia are approximately:
+ *
+ * memory: 48 bytes per row
+ * time: 16us per row for the query plus processing
+ *
+ * The reason this query is done is to support partitioning of the job
+ * by backlinked ID. The memory issue could be allieviated by doing this query in
+ * batches, but of course LIMIT with an offset is inefficient on the DB side.
+ *
+ * The class is nevertheless a vast improvement on the previous method of using
+ * File::getLinksTo() and Title::touchArray(), which uses about 2KB of memory per
+ * link.
+ *
+ * @ingroup JobQueue
+ */
+class HTMLCacheUpdateJob extends Job {
+ /** @var BacklinkCache */
+ protected $blCache;
+
+ protected $rowsPerJob, $rowsPerQuery;
+
+ /**
+ * Construct a job
+ * @param $title Title: the title linked to
+ * @param array $params job parameters (table, start and end page_ids)
+ * @param $id Integer: job id
+ */
+ function __construct( $title, $params, $id = 0 ) {
+ global $wgUpdateRowsPerJob, $wgUpdateRowsPerQuery;
+
+ parent::__construct( 'htmlCacheUpdate', $title, $params, $id );
+
+ $this->rowsPerJob = $wgUpdateRowsPerJob;
+ $this->rowsPerQuery = $wgUpdateRowsPerQuery;
+ $this->blCache = $title->getBacklinkCache();
+ }
+
+ public function run() {
+ if ( isset( $this->params['start'] ) && isset( $this->params['end'] ) ) {
+ # This is hit when a job is actually performed
+ return $this->doPartialUpdate();
+ } else {
+ # This is hit when the jobs have to be inserted
+ return $this->doFullUpdate();
+ }
+ }
+
+ /**
+ * Update all of the backlinks
+ */
+ protected function doFullUpdate() {
+ # Get an estimate of the number of rows from the BacklinkCache
+ $numRows = $this->blCache->getNumLinks( $this->params['table'] );
+ if ( $numRows > $this->rowsPerJob * 2 ) {
+ # Do fast cached partition
+ $this->insertPartitionJobs();
+ } else {
+ # Get the links from the DB
+ $titleArray = $this->blCache->getLinks( $this->params['table'] );
+ # Check if the row count estimate was correct
+ if ( $titleArray->count() > $this->rowsPerJob * 2 ) {
+ # Not correct, do accurate partition
+ wfDebug( __METHOD__.": row count estimate was incorrect, repartitioning\n" );
+ $this->insertJobsFromTitles( $titleArray );
+ } else {
+ $this->invalidateTitles( $titleArray ); // just do the query
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Update some of the backlinks, defined by a page ID range
+ */
+ protected function doPartialUpdate() {
+ $titleArray = $this->blCache->getLinks(
+ $this->params['table'], $this->params['start'], $this->params['end'] );
+ if ( $titleArray->count() <= $this->rowsPerJob * 2 ) {
+ # This partition is small enough, do the update
+ $this->invalidateTitles( $titleArray );
+ } else {
+ # Partitioning was excessively inaccurate. Divide the job further.
+ # This can occur when a large number of links are added in a short
+ # period of time, say by updating a heavily-used template.
+ $this->insertJobsFromTitles( $titleArray );
+ }
+ return true;
+ }
+
+ /**
+ * Partition the current range given by $this->params['start'] and $this->params['end'],
+ * using a pre-calculated title array which gives the links in that range.
+ * Queue the resulting jobs.
+ *
+ * @param $titleArray array
+ * @param $rootJobParams array
+ * @return void
+ */
+ protected function insertJobsFromTitles( $titleArray, $rootJobParams = array() ) {
+ // Carry over any "root job" information
+ $rootJobParams = $this->getRootJobParams();
+ # We make subpartitions in the sense that the start of the first job
+ # will be the start of the parent partition, and the end of the last
+ # job will be the end of the parent partition.
+ $jobs = array();
+ $start = $this->params['start']; # start of the current job
+ $numTitles = 0;
+ foreach ( $titleArray as $title ) {
+ $id = $title->getArticleID();
+ # $numTitles is now the number of titles in the current job not
+ # including the current ID
+ if ( $numTitles >= $this->rowsPerJob ) {
+ # Add a job up to but not including the current ID
+ $jobs[] = new HTMLCacheUpdateJob( $this->title,
+ array(
+ 'table' => $this->params['table'],
+ 'start' => $start,
+ 'end' => $id - 1
+ ) + $rootJobParams // carry over information for de-duplication
+ );
+ $start = $id;
+ $numTitles = 0;
+ }
+ $numTitles++;
+ }
+ # Last job
+ $jobs[] = new HTMLCacheUpdateJob( $this->title,
+ array(
+ 'table' => $this->params['table'],
+ 'start' => $start,
+ 'end' => $this->params['end']
+ ) + $rootJobParams // carry over information for de-duplication
+ );
+ wfDebug( __METHOD__.": repartitioning into " . count( $jobs ) . " jobs\n" );
+
+ if ( count( $jobs ) < 2 ) {
+ # I don't think this is possible at present, but handling this case
+ # makes the code a bit more robust against future code updates and
+ # avoids a potential infinite loop of repartitioning
+ wfDebug( __METHOD__.": repartitioning failed!\n" );
+ $this->invalidateTitles( $titleArray );
+ } else {
+ JobQueueGroup::singleton()->push( $jobs );
+ }
+ }
+
+ /**
+ * @param $rootJobParams array
+ * @return void
+ */
+ protected function insertPartitionJobs( $rootJobParams = array() ) {
+ // Carry over any "root job" information
+ $rootJobParams = $this->getRootJobParams();
+
+ $batches = $this->blCache->partition( $this->params['table'], $this->rowsPerJob );
+ if ( !count( $batches ) ) {
+ return; // no jobs to insert
+ }
+
+ $jobs = array();
+ foreach ( $batches as $batch ) {
+ list( $start, $end ) = $batch;
+ $jobs[] = new HTMLCacheUpdateJob( $this->title,
+ array(
+ 'table' => $this->params['table'],
+ 'start' => $start,
+ 'end' => $end,
+ ) + $rootJobParams // carry over information for de-duplication
+ );
+ }
+
+ JobQueueGroup::singleton()->push( $jobs );
+ }
+
+ /**
+ * Invalidate an array (or iterator) of Title objects, right now
+ * @param $titleArray array
+ */
+ protected function invalidateTitles( $titleArray ) {
+ global $wgUseFileCache, $wgUseSquid;
+
+ $dbw = wfGetDB( DB_MASTER );
+ $timestamp = $dbw->timestamp();
+
+ # Get all IDs in this query into an array
+ $ids = array();
+ foreach ( $titleArray as $title ) {
+ $ids[] = $title->getArticleID();
+ }
+
+ if ( !$ids ) {
+ return;
+ }
+
+ # Don't invalidated pages that were already invalidated
+ $touchedCond = isset( $this->params['rootJobTimestamp'] )
+ ? array( "page_touched < " .
+ $dbw->addQuotes( $dbw->timestamp( $this->params['rootJobTimestamp'] ) ) )
+ : array();
+
+ # Update page_touched
+ $batches = array_chunk( $ids, $this->rowsPerQuery );
+ foreach ( $batches as $batch ) {
+ $dbw->update( 'page',
+ array( 'page_touched' => $timestamp ),
+ array( 'page_id' => $batch ) + $touchedCond,
+ __METHOD__
+ );
+ }
+
+ # Update squid
+ if ( $wgUseSquid ) {
+ $u = SquidUpdate::newFromTitles( $titleArray );
+ $u->doUpdate();
+ }
+
+ # Update file cache
+ if ( $wgUseFileCache ) {
+ foreach ( $titleArray as $title ) {
+ HTMLFileCache::clearFileCache( $title );
+ }
+ }
+ }
+}
diff --git a/includes/job/jobs/NullJob.php b/includes/job/jobs/NullJob.php
new file mode 100644
index 00000000..d282a8e6
--- /dev/null
+++ b/includes/job/jobs/NullJob.php
@@ -0,0 +1,60 @@
+<?php
+/**
+ * Degenerate job that does nothing.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup Cache
+ */
+
+/**
+ * Degenerate job that does nothing, but can optionally replace itself
+ * in the queue and/or sleep for a brief time period. These can be used
+ * to represent "no-op" jobs or test lock contention and performance.
+ *
+ * @ingroup JobQueue
+ */
+class NullJob extends Job {
+ /**
+ * @param $title Title (can be anything)
+ * @param array $params job parameters (lives, usleep)
+ * @param $id Integer: job id
+ */
+ function __construct( $title, $params, $id = 0 ) {
+ parent::__construct( 'null', $title, $params, $id );
+ if ( !isset( $this->params['lives'] ) ) {
+ $this->params['lives'] = 1;
+ }
+ if ( !isset( $this->params['usleep'] ) ) {
+ $this->params['usleep'] = 0;
+ }
+ $this->removeDuplicates = !empty( $this->params['removeDuplicates'] );
+ }
+
+ public function run() {
+ if ( $this->params['usleep'] > 0 ) {
+ usleep( $this->params['usleep'] );
+ }
+ if ( $this->params['lives'] > 1 ) {
+ $params = $this->params;
+ $params['lives']--;
+ $job = new self( $this->title, $params );
+ JobQueueGroup::singleton()->push( $job );
+ }
+ return true;
+ }
+}
diff --git a/includes/job/jobs/PublishStashedFileJob.php b/includes/job/jobs/PublishStashedFileJob.php
new file mode 100644
index 00000000..d3feda28
--- /dev/null
+++ b/includes/job/jobs/PublishStashedFileJob.php
@@ -0,0 +1,130 @@
+<?php
+/**
+ * Upload a file from the upload stash into the local file repo.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup Upload
+ */
+
+/**
+ * Upload a file from the upload stash into the local file repo.
+ *
+ * @ingroup Upload
+ */
+class PublishStashedFileJob extends Job {
+ public function __construct( $title, $params, $id = 0 ) {
+ parent::__construct( 'PublishStashedFile', $title, $params, $id );
+ $this->removeDuplicates = true;
+ }
+
+ public function run() {
+ $scope = RequestContext::importScopedSession( $this->params['session'] );
+ $context = RequestContext::getMain();
+ try {
+ $user = $context->getUser();
+ if ( !$user->isLoggedIn() ) {
+ $this->setLastError( "Could not load the author user from session." );
+ return false;
+ }
+
+ UploadBase::setSessionStatus(
+ $this->params['filekey'],
+ array( 'result' => 'Poll', 'stage' => 'publish', 'status' => Status::newGood() )
+ );
+
+ $upload = new UploadFromStash( $user );
+ // @TODO: initialize() causes a GET, ideally we could frontload the antivirus
+ // checks and anything else to the stash stage (which includes concatenation and
+ // the local file is thus already there). That way, instead of GET+PUT, there could
+ // just be a COPY operation from the stash to the public zone.
+ $upload->initialize( $this->params['filekey'], $this->params['filename'] );
+
+ // Check if the local file checks out (this is generally a no-op)
+ $verification = $upload->verifyUpload();
+ if ( $verification['status'] !== UploadBase::OK ) {
+ $status = Status::newFatal( 'verification-error' );
+ $status->value = array( 'verification' => $verification );
+ UploadBase::setSessionStatus(
+ $this->params['filekey'],
+ array( 'result' => 'Failure', 'stage' => 'publish', 'status' => $status )
+ );
+ $this->setLastError( "Could not verify upload." );
+ return false;
+ }
+
+ // Upload the stashed file to a permanent location
+ $status = $upload->performUpload(
+ $this->params['comment'],
+ $this->params['text'],
+ $this->params['watch'],
+ $user
+ );
+ if ( !$status->isGood() ) {
+ UploadBase::setSessionStatus(
+ $this->params['filekey'],
+ array( 'result' => 'Failure', 'stage' => 'publish', 'status' => $status )
+ );
+ $this->setLastError( $status->getWikiText() );
+ return false;
+ }
+
+ // Build the image info array while we have the local reference handy
+ $apiMain = new ApiMain(); // dummy object (XXX)
+ $imageInfo = $upload->getImageInfo( $apiMain->getResult() );
+
+ // Cleanup any temporary local file
+ $upload->cleanupTempFile();
+
+ // Cache the info so the user doesn't have to wait forever to get the final info
+ UploadBase::setSessionStatus(
+ $this->params['filekey'],
+ array(
+ 'result' => 'Success',
+ 'stage' => 'publish',
+ 'filename' => $upload->getLocalFile()->getName(),
+ 'imageinfo' => $imageInfo,
+ 'status' => Status::newGood()
+ )
+ );
+ } catch ( MWException $e ) {
+ UploadBase::setSessionStatus(
+ $this->params['filekey'],
+ array(
+ 'result' => 'Failure',
+ 'stage' => 'publish',
+ 'status' => Status::newFatal( 'api-error-publishfailed' )
+ )
+ );
+ $this->setLastError( get_class( $e ) . ": " . $e->getText() );
+ return false;
+ }
+ return true;
+ }
+
+ public function getDeduplicationInfo() {
+ $info = parent::getDeduplicationInfo();
+ if ( is_array( $info['params'] ) ) {
+ $info['params'] = array( 'filekey' => $info['params']['filekey'] );
+ }
+ return $info;
+ }
+
+ public function allowRetries() {
+ return false;
+ }
+}
diff --git a/includes/job/jobs/RefreshLinksJob.php b/includes/job/jobs/RefreshLinksJob.php
new file mode 100644
index 00000000..9dbe8278
--- /dev/null
+++ b/includes/job/jobs/RefreshLinksJob.php
@@ -0,0 +1,226 @@
+<?php
+/**
+ * Job to update links for a given title.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+
+/**
+ * Background job to update links for a given title.
+ *
+ * @ingroup JobQueue
+ */
+class RefreshLinksJob extends Job {
+ function __construct( $title, $params = '', $id = 0 ) {
+ parent::__construct( 'refreshLinks', $title, $params, $id );
+ $this->removeDuplicates = true; // job is expensive
+ }
+
+ /**
+ * Run a refreshLinks job
+ * @return boolean success
+ */
+ function run() {
+ wfProfileIn( __METHOD__ );
+
+ $linkCache = LinkCache::singleton();
+ $linkCache->clear();
+
+ if ( is_null( $this->title ) ) {
+ $this->error = "refreshLinks: Invalid title";
+ wfProfileOut( __METHOD__ );
+ return false;
+ }
+
+ # Wait for the DB of the current/next slave DB handle to catch up to the master.
+ # This way, we get the correct page_latest for templates or files that just changed
+ # milliseconds ago, having triggered this job to begin with.
+ if ( isset( $this->params['masterPos'] ) ) {
+ wfGetLB()->waitFor( $this->params['masterPos'] );
+ }
+
+ $revision = Revision::newFromTitle( $this->title, false, Revision::READ_NORMAL );
+ if ( !$revision ) {
+ $this->error = 'refreshLinks: Article not found "' .
+ $this->title->getPrefixedDBkey() . '"';
+ wfProfileOut( __METHOD__ );
+ return false; // XXX: what if it was just deleted?
+ }
+
+ self::runForTitleInternal( $this->title, $revision, __METHOD__ );
+
+ wfProfileOut( __METHOD__ );
+ return true;
+ }
+
+ /**
+ * @return Array
+ */
+ public function getDeduplicationInfo() {
+ $info = parent::getDeduplicationInfo();
+ // Don't let highly unique "masterPos" values ruin duplicate detection
+ if ( is_array( $info['params'] ) ) {
+ unset( $info['params']['masterPos'] );
+ }
+ return $info;
+ }
+
+ /**
+ * @param $title Title
+ * @param $revision Revision
+ * @param $fname string
+ * @return void
+ */
+ public static function runForTitleInternal( Title $title, Revision $revision, $fname ) {
+ wfProfileIn( $fname );
+ $content = $revision->getContent( Revision::RAW );
+
+ if ( !$content ) {
+ // if there is no content, pretend the content is empty
+ $content = $revision->getContentHandler()->makeEmptyContent();
+ }
+
+ // Revision ID must be passed to the parser output to get revision variables correct
+ $parserOutput = $content->getParserOutput( $title, $revision->getId(), null, false );
+
+ $updates = $content->getSecondaryDataUpdates( $title, null, false, $parserOutput );
+ DataUpdate::runUpdates( $updates );
+ wfProfileOut( $fname );
+ }
+}
+
+/**
+ * Background job to update links for a given title.
+ * Newer version for high use templates.
+ *
+ * @ingroup JobQueue
+ */
+class RefreshLinksJob2 extends Job {
+ function __construct( $title, $params, $id = 0 ) {
+ parent::__construct( 'refreshLinks2', $title, $params, $id );
+ }
+
+ /**
+ * Run a refreshLinks2 job
+ * @return boolean success
+ */
+ function run() {
+ global $wgUpdateRowsPerJob;
+
+ wfProfileIn( __METHOD__ );
+
+ $linkCache = LinkCache::singleton();
+ $linkCache->clear();
+
+ if ( is_null( $this->title ) ) {
+ $this->error = "refreshLinks2: Invalid title";
+ wfProfileOut( __METHOD__ );
+ return false;
+ }
+
+ // Back compat for pre-r94435 jobs
+ $table = isset( $this->params['table'] ) ? $this->params['table'] : 'templatelinks';
+
+ // Avoid slave lag when fetching templates.
+ // When the outermost job is run, we know that the caller that enqueued it must have
+ // committed the relevant changes to the DB by now. At that point, record the master
+ // position and pass it along as the job recursively breaks into smaller range jobs.
+ // Hopefully, when leaf jobs are popped, the slaves will have reached that position.
+ if ( isset( $this->params['masterPos'] ) ) {
+ $masterPos = $this->params['masterPos'];
+ } elseif ( wfGetLB()->getServerCount() > 1 ) {
+ $masterPos = wfGetLB()->getMasterPos();
+ } else {
+ $masterPos = false;
+ }
+
+ $tbc = $this->title->getBacklinkCache();
+
+ $jobs = array(); // jobs to insert
+ if ( isset( $this->params['start'] ) && isset( $this->params['end'] ) ) {
+ # This is a partition job to trigger the insertion of leaf jobs...
+ $jobs = array_merge( $jobs, $this->getSingleTitleJobs( $table, $masterPos ) );
+ } else {
+ # This is a base job to trigger the insertion of partitioned jobs...
+ if ( $tbc->getNumLinks( $table ) <= $wgUpdateRowsPerJob ) {
+ # Just directly insert the single per-title jobs
+ $jobs = array_merge( $jobs, $this->getSingleTitleJobs( $table, $masterPos ) );
+ } else {
+ # Insert the partition jobs to make per-title jobs
+ foreach ( $tbc->partition( $table, $wgUpdateRowsPerJob ) as $batch ) {
+ list( $start, $end ) = $batch;
+ $jobs[] = new RefreshLinksJob2( $this->title,
+ array(
+ 'table' => $table,
+ 'start' => $start,
+ 'end' => $end,
+ 'masterPos' => $masterPos,
+ ) + $this->getRootJobParams() // carry over information for de-duplication
+ );
+ }
+ }
+ }
+
+ if ( count( $jobs ) ) {
+ JobQueueGroup::singleton()->push( $jobs );
+ }
+
+ wfProfileOut( __METHOD__ );
+ return true;
+ }
+
+ /**
+ * @param $table string
+ * @param $masterPos mixed
+ * @return Array
+ */
+ protected function getSingleTitleJobs( $table, $masterPos ) {
+ # The "start"/"end" fields are not set for the base jobs
+ $start = isset( $this->params['start'] ) ? $this->params['start'] : false;
+ $end = isset( $this->params['end'] ) ? $this->params['end'] : false;
+ $titles = $this->title->getBacklinkCache()->getLinks( $table, $start, $end );
+ # Convert into single page refresh links jobs.
+ # This handles well when in sapi mode and is useful in any case for job
+ # de-duplication. If many pages use template A, and that template itself
+ # uses template B, then an edit to both will create many duplicate jobs.
+ # Roughly speaking, for each page, one of the "RefreshLinksJob" jobs will
+ # get run first, and when it does, it will remove the duplicates. Of course,
+ # one page could have its job popped when the other page's job is still
+ # buried within the logic of a refreshLinks2 job.
+ $jobs = array();
+ foreach ( $titles as $title ) {
+ $jobs[] = new RefreshLinksJob( $title,
+ array( 'masterPos' => $masterPos ) + $this->getRootJobParams()
+ ); // carry over information for de-duplication
+ }
+ return $jobs;
+ }
+
+ /**
+ * @return Array
+ */
+ public function getDeduplicationInfo() {
+ $info = parent::getDeduplicationInfo();
+ // Don't let highly unique "masterPos" values ruin duplicate detection
+ if ( is_array( $info['params'] ) ) {
+ unset( $info['params']['masterPos'] );
+ }
+ return $info;
+ }
+}
diff --git a/includes/job/UploadFromUrlJob.php b/includes/job/jobs/UploadFromUrlJob.php
index e06f68e4..87549140 100644
--- a/includes/job/UploadFromUrlJob.php
+++ b/includes/job/jobs/UploadFromUrlJob.php
@@ -148,8 +148,8 @@ class UploadFromUrlJob extends Job {
* Store a result in the session data. Note that the caller is responsible
* for appropriate session_start and session_write_close calls.
*
- * @param $result String: the result (Success|Warning|Failure)
- * @param $dataKey String: the key of the extra data
+ * @param string $result the result (Success|Warning|Failure)
+ * @param string $dataKey the key of the extra data
* @param $dataValue Mixed: the extra data itself
*/
protected function storeResultInSession( $result, $dataKey, $dataValue ) {