summaryrefslogtreecommitdiff
path: root/includes/jobqueue
diff options
context:
space:
mode:
authorPierre Schmitz <pierre@archlinux.de>2014-12-27 15:41:37 +0100
committerPierre Schmitz <pierre@archlinux.de>2014-12-31 11:43:28 +0100
commitc1f9b1f7b1b77776192048005dcc66dcf3df2bfb (patch)
tree2b38796e738dd74cb42ecd9bfd151803108386bc /includes/jobqueue
parentb88ab0086858470dd1f644e64cb4e4f62bb2be9b (diff)
Update to MediaWiki 1.24.1
Diffstat (limited to 'includes/jobqueue')
-rw-r--r--includes/jobqueue/Job.php350
-rw-r--r--includes/jobqueue/JobQueue.php740
-rw-r--r--includes/jobqueue/JobQueueDB.php849
-rw-r--r--includes/jobqueue/JobQueueFederated.php559
-rw-r--r--includes/jobqueue/JobQueueGroup.php440
-rw-r--r--includes/jobqueue/JobQueueRedis.php865
-rw-r--r--includes/jobqueue/JobRunner.php350
-rw-r--r--includes/jobqueue/JobSpecification.php189
-rw-r--r--includes/jobqueue/README81
-rw-r--r--includes/jobqueue/aggregator/JobQueueAggregator.php162
-rw-r--r--includes/jobqueue/aggregator/JobQueueAggregatorMemc.php125
-rw-r--r--includes/jobqueue/aggregator/JobQueueAggregatorRedis.php218
-rw-r--r--includes/jobqueue/jobs/AssembleUploadChunksJob.php136
-rw-r--r--includes/jobqueue/jobs/DoubleRedirectJob.php250
-rw-r--r--includes/jobqueue/jobs/DuplicateJob.php59
-rw-r--r--includes/jobqueue/jobs/EmaillingJob.php46
-rw-r--r--includes/jobqueue/jobs/EnotifNotifyJob.php57
-rw-r--r--includes/jobqueue/jobs/HTMLCacheUpdateJob.php162
-rw-r--r--includes/jobqueue/jobs/NullJob.php76
-rw-r--r--includes/jobqueue/jobs/PublishStashedFileJob.php150
-rw-r--r--includes/jobqueue/jobs/RefreshLinksJob.php199
-rw-r--r--includes/jobqueue/jobs/RefreshLinksJob2.php141
-rw-r--r--includes/jobqueue/jobs/UploadFromUrlJob.php187
-rw-r--r--includes/jobqueue/utils/BacklinkJobUtils.php122
24 files changed, 6513 insertions, 0 deletions
diff --git a/includes/jobqueue/Job.php b/includes/jobqueue/Job.php
new file mode 100644
index 00000000..ee3f2c2b
--- /dev/null
+++ b/includes/jobqueue/Job.php
@@ -0,0 +1,350 @@
+<?php
+/**
+ * Job queue task base code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @defgroup JobQueue JobQueue
+ */
+
+/**
+ * Class to both describe a background job and handle jobs.
+ * The queue aspects of this class are now deprecated.
+ * Using the class to push jobs onto queues is deprecated (use JobSpecification).
+ *
+ * @ingroup JobQueue
+ */
+abstract class Job implements IJobSpecification {
+ /** @var string */
+ public $command;
+
+ /** @var array|bool Array of job parameters or false if none */
+ public $params;
+
+ /** @var array Additional queue metadata */
+ public $metadata = array();
+
+ /** @var Title */
+ protected $title;
+
+ /** @var bool Expensive jobs may set this to true */
+ protected $removeDuplicates;
+
+ /** @var string Text for error that occurred last */
+ protected $error;
+
+ /*-------------------------------------------------------------------------
+ * Abstract functions
+ *------------------------------------------------------------------------*/
+
+ /**
+ * Run the job
+ * @return bool Success
+ */
+ abstract public function run();
+
+ /*-------------------------------------------------------------------------
+ * Static functions
+ *------------------------------------------------------------------------*/
+
+ /**
+ * Create the appropriate object to handle a specific job
+ *
+ * @param string $command Job command
+ * @param Title $title Associated title
+ * @param array|bool $params Job parameters
+ * @throws MWException
+ * @return Job
+ */
+ public static function factory( $command, Title $title, $params = false ) {
+ global $wgJobClasses;
+ if ( isset( $wgJobClasses[$command] ) ) {
+ $class = $wgJobClasses[$command];
+
+ return new $class( $title, $params );
+ }
+ throw new MWException( "Invalid job command `{$command}`" );
+ }
+
+ /**
+ * Batch-insert a group of jobs into the queue.
+ * This will be wrapped in a transaction with a forced commit.
+ *
+ * This may add duplicate at insert time, but they will be
+ * removed later on, when the first one is popped.
+ *
+ * @param array $jobs Array of Job objects
+ * @return bool
+ * @deprecated since 1.21
+ */
+ public static function batchInsert( $jobs ) {
+ JobQueueGroup::singleton()->push( $jobs );
+ return true;
+ }
+
+ /**
+ * Insert a group of jobs into the queue.
+ *
+ * Same as batchInsert() but does not commit and can thus
+ * be rolled-back as part of a larger transaction. However,
+ * large batches of jobs can cause slave lag.
+ *
+ * @param array $jobs Array of Job objects
+ * @return bool
+ * @deprecated since 1.21
+ */
+ public static function safeBatchInsert( $jobs ) {
+ JobQueueGroup::singleton()->push( $jobs, JobQueue::QOS_ATOMIC );
+ return true;
+ }
+
+ /**
+ * Pop a job of a certain type. This tries less hard than pop() to
+ * actually find a job; it may be adversely affected by concurrent job
+ * runners.
+ *
+ * @param string $type
+ * @return Job|bool Returns false if there are no jobs
+ * @deprecated since 1.21
+ */
+ public static function pop_type( $type ) {
+ return JobQueueGroup::singleton()->get( $type )->pop();
+ }
+
+ /**
+ * Pop a job off the front of the queue.
+ * This is subject to $wgJobTypesExcludedFromDefaultQueue.
+ *
+ * @return Job|bool False if there are no jobs
+ * @deprecated since 1.21
+ */
+ public static function pop() {
+ return JobQueueGroup::singleton()->pop();
+ }
+
+ /*-------------------------------------------------------------------------
+ * Non-static functions
+ *------------------------------------------------------------------------*/
+
+ /**
+ * @param string $command
+ * @param Title $title
+ * @param array|bool $params
+ */
+ public function __construct( $command, $title, $params = false ) {
+ $this->command = $command;
+ $this->title = $title;
+ $this->params = $params;
+
+ // expensive jobs may set this to true
+ $this->removeDuplicates = false;
+ }
+
+ /**
+ * @return string
+ */
+ public function getType() {
+ return $this->command;
+ }
+
+ /**
+ * @return Title
+ */
+ public function getTitle() {
+ return $this->title;
+ }
+
+ /**
+ * @return array
+ */
+ public function getParams() {
+ return $this->params;
+ }
+
+ /**
+ * @return int|null UNIX timestamp to delay running this job until, otherwise null
+ * @since 1.22
+ */
+ public function getReleaseTimestamp() {
+ return isset( $this->params['jobReleaseTimestamp'] )
+ ? wfTimestampOrNull( TS_UNIX, $this->params['jobReleaseTimestamp'] )
+ : null;
+ }
+
+ /**
+ * @return bool Whether only one of each identical set of jobs should be run
+ */
+ public function ignoreDuplicates() {
+ return $this->removeDuplicates;
+ }
+
+ /**
+ * @return bool Whether this job can be retried on failure by job runners
+ * @since 1.21
+ */
+ public function allowRetries() {
+ return true;
+ }
+
+ /**
+ * @return int Number of actually "work items" handled in this job
+ * @see $wgJobBackoffThrottling
+ * @since 1.23
+ */
+ public function workItemCount() {
+ return 1;
+ }
+
+ /**
+ * Subclasses may need to override this to make duplication detection work.
+ * The resulting map conveys everything that makes the job unique. This is
+ * only checked if ignoreDuplicates() returns true, meaning that duplicate
+ * jobs are supposed to be ignored.
+ *
+ * @return array Map of key/values
+ * @since 1.21
+ */
+ public function getDeduplicationInfo() {
+ $info = array(
+ 'type' => $this->getType(),
+ 'namespace' => $this->getTitle()->getNamespace(),
+ 'title' => $this->getTitle()->getDBkey(),
+ 'params' => $this->getParams()
+ );
+ if ( is_array( $info['params'] ) ) {
+ // Identical jobs with different "root" jobs should count as duplicates
+ unset( $info['params']['rootJobSignature'] );
+ unset( $info['params']['rootJobTimestamp'] );
+ // Likewise for jobs with different delay times
+ unset( $info['params']['jobReleaseTimestamp'] );
+ }
+
+ return $info;
+ }
+
+ /**
+ * @see JobQueue::deduplicateRootJob()
+ * @param string $key A key that identifies the task
+ * @return array Map of:
+ * - rootJobSignature : hash (e.g. SHA1) that identifies the task
+ * - rootJobTimestamp : TS_MW timestamp of this instance of the task
+ * @since 1.21
+ */
+ public static function newRootJobParams( $key ) {
+ return array(
+ 'rootJobSignature' => sha1( $key ),
+ 'rootJobTimestamp' => wfTimestampNow()
+ );
+ }
+
+ /**
+ * @see JobQueue::deduplicateRootJob()
+ * @return array
+ * @since 1.21
+ */
+ public function getRootJobParams() {
+ return array(
+ 'rootJobSignature' => isset( $this->params['rootJobSignature'] )
+ ? $this->params['rootJobSignature']
+ : null,
+ 'rootJobTimestamp' => isset( $this->params['rootJobTimestamp'] )
+ ? $this->params['rootJobTimestamp']
+ : null
+ );
+ }
+
+ /**
+ * @see JobQueue::deduplicateRootJob()
+ * @return bool
+ * @since 1.22
+ */
+ public function hasRootJobParams() {
+ return isset( $this->params['rootJobSignature'] )
+ && isset( $this->params['rootJobTimestamp'] );
+ }
+
+ /**
+ * Insert a single job into the queue.
+ * @return bool True on success
+ * @deprecated since 1.21
+ */
+ public function insert() {
+ JobQueueGroup::singleton()->push( $this );
+ return true;
+ }
+
+ /**
+ * @return string
+ */
+ public function toString() {
+ $truncFunc = function ( $value ) {
+ $value = (string)$value;
+ if ( mb_strlen( $value ) > 1024 ) {
+ $value = "string(" . mb_strlen( $value ) . ")";
+ }
+ return $value;
+ };
+
+ $paramString = '';
+ if ( $this->params ) {
+ foreach ( $this->params as $key => $value ) {
+ if ( $paramString != '' ) {
+ $paramString .= ' ';
+ }
+ if ( is_array( $value ) ) {
+ $filteredValue = array();
+ foreach ( $value as $k => $v ) {
+ if ( is_scalar( $v ) ) {
+ $filteredValue[$k] = $truncFunc( $v );
+ } else {
+ $filteredValue = null;
+ break;
+ }
+ }
+ if ( $filteredValue ) {
+ $value = FormatJson::encode( $filteredValue );
+ } else {
+ $value = "array(" . count( $value ) . ")";
+ }
+ } elseif ( is_object( $value ) && !method_exists( $value, '__toString' ) ) {
+ $value = "object(" . get_class( $value ) . ")";
+ }
+
+ $paramString .= "$key={$truncFunc( $value )}";
+ }
+ }
+
+ if ( is_object( $this->title ) ) {
+ $s = "{$this->command} " . $this->title->getPrefixedDBkey();
+ if ( $paramString !== '' ) {
+ $s .= ' ' . $paramString;
+ }
+
+ return $s;
+ } else {
+ return "{$this->command} $paramString";
+ }
+ }
+
+ protected function setLastError( $error ) {
+ $this->error = $error;
+ }
+
+ public function getLastError() {
+ return $this->error;
+ }
+}
diff --git a/includes/jobqueue/JobQueue.php b/includes/jobqueue/JobQueue.php
new file mode 100644
index 00000000..c00d22e9
--- /dev/null
+++ b/includes/jobqueue/JobQueue.php
@@ -0,0 +1,740 @@
+<?php
+/**
+ * Job queue base code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @defgroup JobQueue JobQueue
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class to handle enqueueing and running of background jobs
+ *
+ * @ingroup JobQueue
+ * @since 1.21
+ */
+abstract class JobQueue {
+ /** @var string Wiki ID */
+ protected $wiki;
+
+ /** @var string Job type */
+ protected $type;
+
+ /** @var string Job priority for pop() */
+ protected $order;
+
+ /** @var int Time to live in seconds */
+ protected $claimTTL;
+
+ /** @var int Maximum number of times to try a job */
+ protected $maxTries;
+
+ /** @var bool Allow delayed jobs */
+ protected $checkDelay;
+
+ /** @var BagOStuff */
+ protected $dupCache;
+
+ const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
+
+ const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
+
+ /**
+ * @param array $params
+ * @throws MWException
+ */
+ protected function __construct( array $params ) {
+ $this->wiki = $params['wiki'];
+ $this->type = $params['type'];
+ $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0;
+ $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3;
+ if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
+ $this->order = $params['order'];
+ } else {
+ $this->order = $this->optimalOrder();
+ }
+ if ( !in_array( $this->order, $this->supportedOrders() ) ) {
+ throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
+ }
+ $this->checkDelay = !empty( $params['checkDelay'] );
+ if ( $this->checkDelay && !$this->supportsDelayedJobs() ) {
+ throw new MWException( __CLASS__ . " does not support delayed jobs." );
+ }
+ $this->dupCache = wfGetCache( CACHE_ANYTHING );
+ }
+
+ /**
+ * Get a job queue object of the specified type.
+ * $params includes:
+ * - class : What job class to use (determines job type)
+ * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki)
+ * - type : The name of the job types this queue handles
+ * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random".
+ * If "fifo" is used, the queue will effectively be FIFO. Note that job
+ * completion will not appear to be exactly FIFO if there are multiple
+ * job runners since jobs can take different times to finish once popped.
+ * If "timestamp" is used, the queue will at least be loosely ordered
+ * by timestamp, allowing for some jobs to be popped off out of order.
+ * If "random" is used, pop() will pick jobs in random order.
+ * Note that it may only be weakly random (e.g. a lottery of the oldest X).
+ * If "any" is choosen, the queue will use whatever order is the fastest.
+ * This might be useful for improving concurrency for job acquisition.
+ * - claimTTL : If supported, the queue will recycle jobs that have been popped
+ * but not acknowledged as completed after this many seconds. Recycling
+ * of jobs simple means re-inserting them into the queue. Jobs can be
+ * attempted up to three times before being discarded.
+ * - checkDelay : If supported, respect Job::getReleaseTimestamp() in the push functions.
+ * This lets delayed jobs wait in a staging area until a given timestamp is
+ * reached, at which point they will enter the queue. If this is not enabled
+ * or not supported, an exception will be thrown on delayed job insertion.
+ *
+ * Queue classes should throw an exception if they do not support the options given.
+ *
+ * @param array $params
+ * @return JobQueue
+ * @throws MWException
+ */
+ final public static function factory( array $params ) {
+ $class = $params['class'];
+ if ( !class_exists( $class ) ) {
+ throw new MWException( "Invalid job queue class '$class'." );
+ }
+ $obj = new $class( $params );
+ if ( !( $obj instanceof self ) ) {
+ throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." );
+ }
+
+ return $obj;
+ }
+
+ /**
+ * @return string Wiki ID
+ */
+ final public function getWiki() {
+ return $this->wiki;
+ }
+
+ /**
+ * @return string Job type that this queue handles
+ */
+ final public function getType() {
+ return $this->type;
+ }
+
+ /**
+ * @return string One of (random, timestamp, fifo, undefined)
+ */
+ final public function getOrder() {
+ return $this->order;
+ }
+
+ /**
+ * @return bool Whether delayed jobs are enabled
+ * @since 1.22
+ */
+ final public function delayedJobsEnabled() {
+ return $this->checkDelay;
+ }
+
+ /**
+ * Get the allowed queue orders for configuration validation
+ *
+ * @return array Subset of (random, timestamp, fifo, undefined)
+ */
+ abstract protected function supportedOrders();
+
+ /**
+ * Get the default queue order to use if configuration does not specify one
+ *
+ * @return string One of (random, timestamp, fifo, undefined)
+ */
+ abstract protected function optimalOrder();
+
+ /**
+ * Find out if delayed jobs are supported for configuration validation
+ *
+ * @return bool Whether delayed jobs are supported
+ */
+ protected function supportsDelayedJobs() {
+ return false; // not implemented
+ }
+
+ /**
+ * Quickly check if the queue has no available (unacquired, non-delayed) jobs.
+ * Queue classes should use caching if they are any slower without memcached.
+ *
+ * If caching is used, this might return false when there are actually no jobs.
+ * If pop() is called and returns false then it should correct the cache. Also,
+ * calling flushCaches() first prevents this. However, this affect is typically
+ * not distinguishable from the race condition between isEmpty() and pop().
+ *
+ * @return bool
+ * @throws JobQueueError
+ */
+ final public function isEmpty() {
+ wfProfileIn( __METHOD__ );
+ $res = $this->doIsEmpty();
+ wfProfileOut( __METHOD__ );
+
+ return $res;
+ }
+
+ /**
+ * @see JobQueue::isEmpty()
+ * @return bool
+ */
+ abstract protected function doIsEmpty();
+
+ /**
+ * Get the number of available (unacquired, non-delayed) jobs in the queue.
+ * Queue classes should use caching if they are any slower without memcached.
+ *
+ * If caching is used, this number might be out of date for a minute.
+ *
+ * @return int
+ * @throws JobQueueError
+ */
+ final public function getSize() {
+ wfProfileIn( __METHOD__ );
+ $res = $this->doGetSize();
+ wfProfileOut( __METHOD__ );
+
+ return $res;
+ }
+
+ /**
+ * @see JobQueue::getSize()
+ * @return int
+ */
+ abstract protected function doGetSize();
+
+ /**
+ * Get the number of acquired jobs (these are temporarily out of the queue).
+ * Queue classes should use caching if they are any slower without memcached.
+ *
+ * If caching is used, this number might be out of date for a minute.
+ *
+ * @return int
+ * @throws JobQueueError
+ */
+ final public function getAcquiredCount() {
+ wfProfileIn( __METHOD__ );
+ $res = $this->doGetAcquiredCount();
+ wfProfileOut( __METHOD__ );
+
+ return $res;
+ }
+
+ /**
+ * @see JobQueue::getAcquiredCount()
+ * @return int
+ */
+ abstract protected function doGetAcquiredCount();
+
+ /**
+ * Get the number of delayed jobs (these are temporarily out of the queue).
+ * Queue classes should use caching if they are any slower without memcached.
+ *
+ * If caching is used, this number might be out of date for a minute.
+ *
+ * @return int
+ * @throws JobQueueError
+ * @since 1.22
+ */
+ final public function getDelayedCount() {
+ wfProfileIn( __METHOD__ );
+ $res = $this->doGetDelayedCount();
+ wfProfileOut( __METHOD__ );
+
+ return $res;
+ }
+
+ /**
+ * @see JobQueue::getDelayedCount()
+ * @return int
+ */
+ protected function doGetDelayedCount() {
+ return 0; // not implemented
+ }
+
+ /**
+ * Get the number of acquired jobs that can no longer be attempted.
+ * Queue classes should use caching if they are any slower without memcached.
+ *
+ * If caching is used, this number might be out of date for a minute.
+ *
+ * @return int
+ * @throws JobQueueError
+ */
+ final public function getAbandonedCount() {
+ wfProfileIn( __METHOD__ );
+ $res = $this->doGetAbandonedCount();
+ wfProfileOut( __METHOD__ );
+
+ return $res;
+ }
+
+ /**
+ * @see JobQueue::getAbandonedCount()
+ * @return int
+ */
+ protected function doGetAbandonedCount() {
+ return 0; // not implemented
+ }
+
+ /**
+ * Push one or more jobs into the queue.
+ * This does not require $wgJobClasses to be set for the given job type.
+ * Outside callers should use JobQueueGroup::push() instead of this function.
+ *
+ * @param Job|array $jobs A single job or an array of Jobs
+ * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC)
+ * @return void
+ * @throws JobQueueError
+ */
+ final public function push( $jobs, $flags = 0 ) {
+ $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags );
+ }
+
+ /**
+ * Push a batch of jobs into the queue.
+ * This does not require $wgJobClasses to be set for the given job type.
+ * Outside callers should use JobQueueGroup::push() instead of this function.
+ *
+ * @param array $jobs List of Jobs
+ * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC)
+ * @return void
+ * @throws MWException
+ */
+ final public function batchPush( array $jobs, $flags = 0 ) {
+ if ( !count( $jobs ) ) {
+ return; // nothing to do
+ }
+
+ foreach ( $jobs as $job ) {
+ if ( $job->getType() !== $this->type ) {
+ throw new MWException(
+ "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
+ } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) {
+ throw new MWException(
+ "Got delayed '{$job->getType()}' job; delays are not supported." );
+ }
+ }
+
+ wfProfileIn( __METHOD__ );
+ $this->doBatchPush( $jobs, $flags );
+ wfProfileOut( __METHOD__ );
+ }
+
+ /**
+ * @see JobQueue::batchPush()
+ * @param array $jobs
+ * @param int $flags
+ */
+ abstract protected function doBatchPush( array $jobs, $flags );
+
+ /**
+ * Pop a job off of the queue.
+ * This requires $wgJobClasses to be set for the given job type.
+ * Outside callers should use JobQueueGroup::pop() instead of this function.
+ *
+ * @throws MWException
+ * @return Job|bool Returns false if there are no jobs
+ */
+ final public function pop() {
+ global $wgJobClasses;
+
+ if ( $this->wiki !== wfWikiID() ) {
+ throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." );
+ } elseif ( !isset( $wgJobClasses[$this->type] ) ) {
+ // Do not pop jobs if there is no class for the queue type
+ throw new MWException( "Unrecognized job type '{$this->type}'." );
+ }
+
+ wfProfileIn( __METHOD__ );
+ $job = $this->doPop();
+ wfProfileOut( __METHOD__ );
+
+ // Flag this job as an old duplicate based on its "root" job...
+ try {
+ if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
+ JobQueue::incrStats( 'job-pop-duplicate', $this->type, 1, $this->wiki );
+ $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
+ }
+ } catch ( MWException $e ) {
+ // don't lose jobs over this
+ }
+
+ return $job;
+ }
+
+ /**
+ * @see JobQueue::pop()
+ * @return Job
+ */
+ abstract protected function doPop();
+
+ /**
+ * Acknowledge that a job was completed.
+ *
+ * This does nothing for certain queue classes or if "claimTTL" is not set.
+ * Outside callers should use JobQueueGroup::ack() instead of this function.
+ *
+ * @param Job $job
+ * @return void
+ * @throws MWException
+ */
+ final public function ack( Job $job ) {
+ if ( $job->getType() !== $this->type ) {
+ throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
+ }
+ wfProfileIn( __METHOD__ );
+ $this->doAck( $job );
+ wfProfileOut( __METHOD__ );
+ }
+
+ /**
+ * @see JobQueue::ack()
+ * @param Job $job
+ */
+ abstract protected function doAck( Job $job );
+
+ /**
+ * Register the "root job" of a given job into the queue for de-duplication.
+ * This should only be called right *after* all the new jobs have been inserted.
+ * This is used to turn older, duplicate, job entries into no-ops. The root job
+ * information will remain in the registry until it simply falls out of cache.
+ *
+ * This requires that $job has two special fields in the "params" array:
+ * - rootJobSignature : hash (e.g. SHA1) that identifies the task
+ * - rootJobTimestamp : TS_MW timestamp of this instance of the task
+ *
+ * A "root job" is a conceptual job that consist of potentially many smaller jobs
+ * that are actually inserted into the queue. For example, "refreshLinks" jobs are
+ * spawned when a template is edited. One can think of the task as "update links
+ * of pages that use template X" and an instance of that task as a "root job".
+ * However, what actually goes into the queue are range and leaf job subtypes.
+ * Since these jobs include things like page ID ranges and DB master positions,
+ * and can morph into smaller jobs recursively, simple duplicate detection
+ * for individual jobs being identical (like that of job_sha1) is not useful.
+ *
+ * In the case of "refreshLinks", if these jobs are still in the queue when the template
+ * is edited again, we want all of these old refreshLinks jobs for that template to become
+ * no-ops. This can greatly reduce server load, since refreshLinks jobs involves parsing.
+ * Essentially, the new batch of jobs belong to a new "root job" and the older ones to a
+ * previous "root job" for the same task of "update links of pages that use template X".
+ *
+ * This does nothing for certain queue classes.
+ *
+ * @param Job $job
+ * @throws MWException
+ * @return bool
+ */
+ final public function deduplicateRootJob( Job $job ) {
+ if ( $job->getType() !== $this->type ) {
+ throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
+ }
+ wfProfileIn( __METHOD__ );
+ $ok = $this->doDeduplicateRootJob( $job );
+ wfProfileOut( __METHOD__ );
+
+ return $ok;
+ }
+
+ /**
+ * @see JobQueue::deduplicateRootJob()
+ * @param Job $job
+ * @throws MWException
+ * @return bool
+ */
+ protected function doDeduplicateRootJob( Job $job ) {
+ if ( !$job->hasRootJobParams() ) {
+ throw new MWException( "Cannot register root job; missing parameters." );
+ }
+ $params = $job->getRootJobParams();
+
+ $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
+ // Callers should call batchInsert() and then this function so that if the insert
+ // fails, the de-duplication registration will be aborted. Since the insert is
+ // deferred till "transaction idle", do the same here, so that the ordering is
+ // maintained. Having only the de-duplication registration succeed would cause
+ // jobs to become no-ops without any actual jobs that made them redundant.
+ $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job
+ if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
+ return true; // a newer version of this root job was enqueued
+ }
+
+ // Update the timestamp of the last root job started at the location...
+ return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
+ }
+
+ /**
+ * Check if the "root" job of a given job has been superseded by a newer one
+ *
+ * @param Job $job
+ * @throws MWException
+ * @return bool
+ */
+ final protected function isRootJobOldDuplicate( Job $job ) {
+ if ( $job->getType() !== $this->type ) {
+ throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
+ }
+ wfProfileIn( __METHOD__ );
+ $isDuplicate = $this->doIsRootJobOldDuplicate( $job );
+ wfProfileOut( __METHOD__ );
+
+ return $isDuplicate;
+ }
+
+ /**
+ * @see JobQueue::isRootJobOldDuplicate()
+ * @param Job $job
+ * @return bool
+ */
+ protected function doIsRootJobOldDuplicate( Job $job ) {
+ if ( !$job->hasRootJobParams() ) {
+ return false; // job has no de-deplication info
+ }
+ $params = $job->getRootJobParams();
+
+ $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
+ // Get the last time this root job was enqueued
+ $timestamp = $this->dupCache->get( $key );
+
+ // Check if a new root job was started at the location after this one's...
+ return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
+ }
+
+ /**
+ * @param string $signature Hash identifier of the root job
+ * @return string
+ */
+ protected function getRootJobCacheKey( $signature ) {
+ list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+
+ return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
+ }
+
+ /**
+ * Deleted all unclaimed and delayed jobs from the queue
+ *
+ * @throws JobQueueError
+ * @since 1.22
+ * @return void
+ */
+ final public function delete() {
+ wfProfileIn( __METHOD__ );
+ $this->doDelete();
+ wfProfileOut( __METHOD__ );
+ }
+
+ /**
+ * @see JobQueue::delete()
+ * @throws MWException
+ */
+ protected function doDelete() {
+ throw new MWException( "This method is not implemented." );
+ }
+
+ /**
+ * Wait for any slaves or backup servers to catch up.
+ *
+ * This does nothing for certain queue classes.
+ *
+ * @return void
+ * @throws JobQueueError
+ */
+ final public function waitForBackups() {
+ wfProfileIn( __METHOD__ );
+ $this->doWaitForBackups();
+ wfProfileOut( __METHOD__ );
+ }
+
+ /**
+ * @see JobQueue::waitForBackups()
+ * @return void
+ */
+ protected function doWaitForBackups() {
+ }
+
+ /**
+ * Return a map of task names to task definition maps.
+ * A "task" is a fast periodic queue maintenance action.
+ * Mutually exclusive tasks must implement their own locking in the callback.
+ *
+ * Each task value is an associative array with:
+ * - name : the name of the task
+ * - callback : a PHP callable that performs the task
+ * - period : the period in seconds corresponding to the task frequency
+ *
+ * @return array
+ */
+ final public function getPeriodicTasks() {
+ $tasks = $this->doGetPeriodicTasks();
+ foreach ( $tasks as $name => &$def ) {
+ $def['name'] = $name;
+ }
+
+ return $tasks;
+ }
+
+ /**
+ * @see JobQueue::getPeriodicTasks()
+ * @return array
+ */
+ protected function doGetPeriodicTasks() {
+ return array();
+ }
+
+ /**
+ * Clear any process and persistent caches
+ *
+ * @return void
+ */
+ final public function flushCaches() {
+ wfProfileIn( __METHOD__ );
+ $this->doFlushCaches();
+ wfProfileOut( __METHOD__ );
+ }
+
+ /**
+ * @see JobQueue::flushCaches()
+ * @return void
+ */
+ protected function doFlushCaches() {
+ }
+
+ /**
+ * Get an iterator to traverse over all available jobs in this queue.
+ * This does not include jobs that are currently acquired or delayed.
+ * Note: results may be stale if the queue is concurrently modified.
+ *
+ * @return Iterator
+ * @throws JobQueueError
+ */
+ abstract public function getAllQueuedJobs();
+
+ /**
+ * Get an iterator to traverse over all delayed jobs in this queue.
+ * Note: results may be stale if the queue is concurrently modified.
+ *
+ * @return Iterator
+ * @throws JobQueueError
+ * @since 1.22
+ */
+ public function getAllDelayedJobs() {
+ return new ArrayIterator( array() ); // not implemented
+ }
+
+ /**
+ * Do not use this function outside of JobQueue/JobQueueGroup
+ *
+ * @return string
+ * @since 1.22
+ */
+ public function getCoalesceLocationInternal() {
+ return null;
+ }
+
+ /**
+ * Check whether each of the given queues are empty.
+ * This is used for batching checks for queues stored at the same place.
+ *
+ * @param array $types List of queues types
+ * @return array|null (list of non-empty queue types) or null if unsupported
+ * @throws MWException
+ * @since 1.22
+ */
+ final public function getSiblingQueuesWithJobs( array $types ) {
+ $section = new ProfileSection( __METHOD__ );
+
+ return $this->doGetSiblingQueuesWithJobs( $types );
+ }
+
+ /**
+ * @see JobQueue::getSiblingQueuesWithJobs()
+ * @param array $types List of queues types
+ * @return array|null (list of queue types) or null if unsupported
+ */
+ protected function doGetSiblingQueuesWithJobs( array $types ) {
+ return null; // not supported
+ }
+
+ /**
+ * Check the size of each of the given queues.
+ * For queues not served by the same store as this one, 0 is returned.
+ * This is used for batching checks for queues stored at the same place.
+ *
+ * @param array $types List of queues types
+ * @return array|null (job type => whether queue is empty) or null if unsupported
+ * @throws MWException
+ * @since 1.22
+ */
+ final public function getSiblingQueueSizes( array $types ) {
+ $section = new ProfileSection( __METHOD__ );
+
+ return $this->doGetSiblingQueueSizes( $types );
+ }
+
+ /**
+ * @see JobQueue::getSiblingQueuesSize()
+ * @param array $types List of queues types
+ * @return array|null (list of queue types) or null if unsupported
+ */
+ protected function doGetSiblingQueueSizes( array $types ) {
+ return null; // not supported
+ }
+
+ /**
+ * Call wfIncrStats() for the queue overall and for the queue type
+ *
+ * @param string $key Event type
+ * @param string $type Job type
+ * @param int $delta
+ * @param string $wiki Wiki ID (added in 1.23)
+ * @since 1.22
+ */
+ public static function incrStats( $key, $type, $delta = 1, $wiki = null ) {
+ wfIncrStats( $key, $delta );
+ wfIncrStats( "{$key}-{$type}", $delta );
+ if ( $wiki !== null ) {
+ wfIncrStats( "{$key}-{$type}-{$wiki}", $delta );
+ }
+ }
+
+ /**
+ * Namespace the queue with a key to isolate it for testing
+ *
+ * @param string $key
+ * @return void
+ * @throws MWException
+ */
+ public function setTestingPrefix( $key ) {
+ throw new MWException( "Queue namespacing not supported for this queue type." );
+ }
+}
+
+/**
+ * @ingroup JobQueue
+ * @since 1.22
+ */
+class JobQueueError extends MWException {
+}
+
+class JobQueueConnectionError extends JobQueueError {
+}
diff --git a/includes/jobqueue/JobQueueDB.php b/includes/jobqueue/JobQueueDB.php
new file mode 100644
index 00000000..08873cc1
--- /dev/null
+++ b/includes/jobqueue/JobQueueDB.php
@@ -0,0 +1,849 @@
+<?php
+/**
+ * Database-backed job queue code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class to handle job queues stored in the DB
+ *
+ * @ingroup JobQueue
+ * @since 1.21
+ */
+class JobQueueDB extends JobQueue {
+ const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
+ const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
+ const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
+ const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
+ const MAX_OFFSET = 255; // integer; maximum number of rows to skip
+
+ /** @var BagOStuff */
+ protected $cache;
+
+ /** @var bool|string Name of an external DB cluster. False if not set */
+ protected $cluster = false;
+
+ /**
+ * Additional parameters include:
+ * - cluster : The name of an external cluster registered via LBFactory.
+ * If not specified, the primary DB cluster for the wiki will be used.
+ * This can be overridden with a custom cluster so that DB handles will
+ * be retrieved via LBFactory::getExternalLB() and getConnection().
+ * @param array $params
+ */
+ protected function __construct( array $params ) {
+ global $wgMemc;
+
+ parent::__construct( $params );
+
+ $this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false;
+ // Make sure that we don't use the SQL cache, which would be harmful
+ $this->cache = ( $wgMemc instanceof SqlBagOStuff ) ? new EmptyBagOStuff() : $wgMemc;
+ }
+
+ protected function supportedOrders() {
+ return array( 'random', 'timestamp', 'fifo' );
+ }
+
+ protected function optimalOrder() {
+ return 'random';
+ }
+
+ /**
+ * @see JobQueue::doIsEmpty()
+ * @return bool
+ */
+ protected function doIsEmpty() {
+ $key = $this->getCacheKey( 'empty' );
+
+ $isEmpty = $this->cache->get( $key );
+ if ( $isEmpty === 'true' ) {
+ return true;
+ } elseif ( $isEmpty === 'false' ) {
+ return false;
+ }
+
+ $dbr = $this->getSlaveDB();
+ try {
+ $found = $dbr->selectField( // unclaimed job
+ 'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__
+ );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
+ $this->cache->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG );
+
+ return !$found;
+ }
+
+ /**
+ * @see JobQueue::doGetSize()
+ * @return int
+ */
+ protected function doGetSize() {
+ $key = $this->getCacheKey( 'size' );
+
+ $size = $this->cache->get( $key );
+ if ( is_int( $size ) ) {
+ return $size;
+ }
+
+ try {
+ $dbr = $this->getSlaveDB();
+ $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
+ array( 'job_cmd' => $this->type, 'job_token' => '' ),
+ __METHOD__
+ );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
+ $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
+
+ return $size;
+ }
+
+ /**
+ * @see JobQueue::doGetAcquiredCount()
+ * @return int
+ */
+ protected function doGetAcquiredCount() {
+ if ( $this->claimTTL <= 0 ) {
+ return 0; // no acknowledgements
+ }
+
+ $key = $this->getCacheKey( 'acquiredcount' );
+
+ $count = $this->cache->get( $key );
+ if ( is_int( $count ) ) {
+ return $count;
+ }
+
+ $dbr = $this->getSlaveDB();
+ try {
+ $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
+ array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ),
+ __METHOD__
+ );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
+ $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
+
+ return $count;
+ }
+
+ /**
+ * @see JobQueue::doGetAbandonedCount()
+ * @return int
+ * @throws MWException
+ */
+ protected function doGetAbandonedCount() {
+ global $wgMemc;
+
+ if ( $this->claimTTL <= 0 ) {
+ return 0; // no acknowledgements
+ }
+
+ $key = $this->getCacheKey( 'abandonedcount' );
+
+ $count = $wgMemc->get( $key );
+ if ( is_int( $count ) ) {
+ return $count;
+ }
+
+ $dbr = $this->getSlaveDB();
+ try {
+ $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
+ array(
+ 'job_cmd' => $this->type,
+ "job_token != {$dbr->addQuotes( '' )}",
+ "job_attempts >= " . $dbr->addQuotes( $this->maxTries )
+ ),
+ __METHOD__
+ );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
+ $wgMemc->set( $key, $count, self::CACHE_TTL_SHORT );
+
+ return $count;
+ }
+
+ /**
+ * @see JobQueue::doBatchPush()
+ * @param array $jobs
+ * @param int $flags
+ * @throws DBError|Exception
+ * @return void
+ */
+ protected function doBatchPush( array $jobs, $flags ) {
+ $dbw = $this->getMasterDB();
+
+ $that = $this;
+ $method = __METHOD__;
+ $dbw->onTransactionIdle(
+ function () use ( $dbw, $that, $jobs, $flags, $method ) {
+ $that->doBatchPushInternal( $dbw, $jobs, $flags, $method );
+ }
+ );
+ }
+
+ /**
+ * This function should *not* be called outside of JobQueueDB
+ *
+ * @param IDatabase $dbw
+ * @param array $jobs
+ * @param int $flags
+ * @param string $method
+ * @throws DBError
+ * @return void
+ */
+ public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
+ if ( !count( $jobs ) ) {
+ return;
+ }
+
+ $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated
+ $rowList = array(); // list of jobs for jobs that are are not de-duplicated
+ foreach ( $jobs as $job ) {
+ $row = $this->insertFields( $job );
+ if ( $job->ignoreDuplicates() ) {
+ $rowSet[$row['job_sha1']] = $row;
+ } else {
+ $rowList[] = $row;
+ }
+ }
+
+ if ( $flags & self::QOS_ATOMIC ) {
+ $dbw->begin( $method ); // wrap all the job additions in one transaction
+ }
+ try {
+ // Strip out any duplicate jobs that are already in the queue...
+ if ( count( $rowSet ) ) {
+ $res = $dbw->select( 'job', 'job_sha1',
+ array(
+ // No job_type condition since it's part of the job_sha1 hash
+ 'job_sha1' => array_keys( $rowSet ),
+ 'job_token' => '' // unclaimed
+ ),
+ $method
+ );
+ foreach ( $res as $row ) {
+ wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" );
+ unset( $rowSet[$row->job_sha1] ); // already enqueued
+ }
+ }
+ // Build the full list of job rows to insert
+ $rows = array_merge( $rowList, array_values( $rowSet ) );
+ // Insert the job rows in chunks to avoid slave lag...
+ foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
+ $dbw->insert( 'job', $rowBatch, $method );
+ }
+ JobQueue::incrStats( 'job-insert', $this->type, count( $rows ), $this->wiki );
+ JobQueue::incrStats(
+ 'job-insert-duplicate',
+ $this->type,
+ count( $rowSet ) + count( $rowList ) - count( $rows ),
+ $this->wiki
+ );
+ } catch ( DBError $e ) {
+ if ( $flags & self::QOS_ATOMIC ) {
+ $dbw->rollback( $method );
+ }
+ throw $e;
+ }
+ if ( $flags & self::QOS_ATOMIC ) {
+ $dbw->commit( $method );
+ }
+
+ $this->cache->set( $this->getCacheKey( 'empty' ), 'false', JobQueueDB::CACHE_TTL_LONG );
+
+ return;
+ }
+
+ /**
+ * @see JobQueue::doPop()
+ * @return Job|bool
+ */
+ protected function doPop() {
+ if ( $this->cache->get( $this->getCacheKey( 'empty' ) ) === 'true' ) {
+ return false; // queue is empty
+ }
+
+ $dbw = $this->getMasterDB();
+ try {
+ $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
+ $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
+ $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
+ $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
+ $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
+ } );
+
+ $uuid = wfRandomString( 32 ); // pop attempt
+ $job = false; // job popped off
+ do { // retry when our row is invalid or deleted as a duplicate
+ // Try to reserve a row in the DB...
+ if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) {
+ $row = $this->claimOldest( $uuid );
+ } else { // random first
+ $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
+ $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
+ $row = $this->claimRandom( $uuid, $rand, $gte );
+ }
+ // Check if we found a row to reserve...
+ if ( !$row ) {
+ $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG );
+ break; // nothing to do
+ }
+ JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki );
+ // Get the job object from the row...
+ $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
+ if ( !$title ) {
+ $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
+ wfDebug( "Row has invalid title '{$row->job_title}'.\n" );
+ continue; // try again
+ }
+ $job = Job::factory( $row->job_cmd, $title,
+ self::extractBlob( $row->job_params ), $row->job_id );
+ $job->metadata['id'] = $row->job_id;
+ break; // done
+ } while ( true );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
+
+ return $job;
+ }
+
+ /**
+ * Reserve a row with a single UPDATE without holding row locks over RTTs...
+ *
+ * @param string $uuid 32 char hex string
+ * @param int $rand Random unsigned integer (31 bits)
+ * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random)
+ * @return stdClass|bool Row|false
+ */
+ protected function claimRandom( $uuid, $rand, $gte ) {
+ $dbw = $this->getMasterDB();
+ // Check cache to see if the queue has <= OFFSET items
+ $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
+
+ $row = false; // the row acquired
+ $invertedDirection = false; // whether one job_random direction was already scanned
+ // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
+ // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
+ // not replication safe. Due to http://bugs.mysql.com/bug.php?id=6980, subqueries cannot
+ // be used here with MySQL.
+ do {
+ if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
+ // For small queues, using OFFSET will overshoot and return no rows more often.
+ // Instead, this uses job_random to pick a row (possibly checking both directions).
+ $ineq = $gte ? '>=' : '<=';
+ $dir = $gte ? 'ASC' : 'DESC';
+ $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
+ array(
+ 'job_cmd' => $this->type,
+ 'job_token' => '', // unclaimed
+ "job_random {$ineq} {$dbw->addQuotes( $rand )}" ),
+ __METHOD__,
+ array( 'ORDER BY' => "job_random {$dir}" )
+ );
+ if ( !$row && !$invertedDirection ) {
+ $gte = !$gte;
+ $invertedDirection = true;
+ continue; // try the other direction
+ }
+ } else { // table *may* have >= MAX_OFFSET rows
+ // Bug 42614: "ORDER BY job_random" with a job_random inequality causes high CPU
+ // in MySQL if there are many rows for some reason. This uses a small OFFSET
+ // instead of job_random for reducing excess claim retries.
+ $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
+ array(
+ 'job_cmd' => $this->type,
+ 'job_token' => '', // unclaimed
+ ),
+ __METHOD__,
+ array( 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) )
+ );
+ if ( !$row ) {
+ $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
+ $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
+ continue; // use job_random
+ }
+ }
+
+ if ( $row ) { // claim the job
+ $dbw->update( 'job', // update by PK
+ array(
+ 'job_token' => $uuid,
+ 'job_token_timestamp' => $dbw->timestamp(),
+ 'job_attempts = job_attempts+1' ),
+ array( 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ),
+ __METHOD__
+ );
+ // This might get raced out by another runner when claiming the previously
+ // selected row. The use of job_random should minimize this problem, however.
+ if ( !$dbw->affectedRows() ) {
+ $row = false; // raced out
+ }
+ } else {
+ break; // nothing to do
+ }
+ } while ( !$row );
+
+ return $row;
+ }
+
+ /**
+ * Reserve a row with a single UPDATE without holding row locks over RTTs...
+ *
+ * @param string $uuid 32 char hex string
+ * @return stdClass|bool Row|false
+ */
+ protected function claimOldest( $uuid ) {
+ $dbw = $this->getMasterDB();
+
+ $row = false; // the row acquired
+ do {
+ if ( $dbw->getType() === 'mysql' ) {
+ // Per http://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
+ // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
+ // Oracle and Postgre have no such limitation. However, MySQL offers an
+ // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
+ $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
+ "SET " .
+ "job_token = {$dbw->addQuotes( $uuid ) }, " .
+ "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
+ "job_attempts = job_attempts+1 " .
+ "WHERE ( " .
+ "job_cmd = {$dbw->addQuotes( $this->type )} " .
+ "AND job_token = {$dbw->addQuotes( '' )} " .
+ ") ORDER BY job_id ASC LIMIT 1",
+ __METHOD__
+ );
+ } else {
+ // Use a subquery to find the job, within an UPDATE to claim it.
+ // This uses as much of the DB wrapper functions as possible.
+ $dbw->update( 'job',
+ array(
+ 'job_token' => $uuid,
+ 'job_token_timestamp' => $dbw->timestamp(),
+ 'job_attempts = job_attempts+1' ),
+ array( 'job_id = (' .
+ $dbw->selectSQLText( 'job', 'job_id',
+ array( 'job_cmd' => $this->type, 'job_token' => '' ),
+ __METHOD__,
+ array( 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ) ) .
+ ')'
+ ),
+ __METHOD__
+ );
+ }
+ // Fetch any row that we just reserved...
+ if ( $dbw->affectedRows() ) {
+ $row = $dbw->selectRow( 'job', self::selectFields(),
+ array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__
+ );
+ if ( !$row ) { // raced out by duplicate job removal
+ wfDebug( "Row deleted as duplicate by another process.\n" );
+ }
+ } else {
+ break; // nothing to do
+ }
+ } while ( !$row );
+
+ return $row;
+ }
+
+ /**
+ * @see JobQueue::doAck()
+ * @param Job $job
+ * @throws MWException
+ * @return Job|bool
+ */
+ protected function doAck( Job $job ) {
+ if ( !isset( $job->metadata['id'] ) ) {
+ throw new MWException( "Job of type '{$job->getType()}' has no ID." );
+ }
+
+ $dbw = $this->getMasterDB();
+ try {
+ $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
+ $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
+ $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
+ $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
+ $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
+ } );
+
+ // Delete a row with a single DELETE without holding row locks over RTTs...
+ $dbw->delete( 'job',
+ array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
+
+ return true;
+ }
+
+ /**
+ * @see JobQueue::doDeduplicateRootJob()
+ * @param Job $job
+ * @throws MWException
+ * @return bool
+ */
+ protected function doDeduplicateRootJob( Job $job ) {
+ $params = $job->getParams();
+ if ( !isset( $params['rootJobSignature'] ) ) {
+ throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
+ } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
+ throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
+ }
+ $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
+ // Callers should call batchInsert() and then this function so that if the insert
+ // fails, the de-duplication registration will be aborted. Since the insert is
+ // deferred till "transaction idle", do the same here, so that the ordering is
+ // maintained. Having only the de-duplication registration succeed would cause
+ // jobs to become no-ops without any actual jobs that made them redundant.
+ $dbw = $this->getMasterDB();
+ $cache = $this->dupCache;
+ $dbw->onTransactionIdle( function () use ( $cache, $params, $key, $dbw ) {
+ $timestamp = $cache->get( $key ); // current last timestamp of this job
+ if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
+ return true; // a newer version of this root job was enqueued
+ }
+
+ // Update the timestamp of the last root job started at the location...
+ return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
+ } );
+
+ return true;
+ }
+
+ /**
+ * @see JobQueue::doDelete()
+ * @return bool
+ */
+ protected function doDelete() {
+ $dbw = $this->getMasterDB();
+ try {
+ $dbw->delete( 'job', array( 'job_cmd' => $this->type ) );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
+
+ return true;
+ }
+
+ /**
+ * @see JobQueue::doWaitForBackups()
+ * @return void
+ */
+ protected function doWaitForBackups() {
+ wfWaitForSlaves();
+ }
+
+ /**
+ * @return array
+ */
+ protected function doGetPeriodicTasks() {
+ return array(
+ 'recycleAndDeleteStaleJobs' => array(
+ 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
+ 'period' => ceil( $this->claimTTL / 2 )
+ )
+ );
+ }
+
+ /**
+ * @return void
+ */
+ protected function doFlushCaches() {
+ foreach ( array( 'empty', 'size', 'acquiredcount' ) as $type ) {
+ $this->cache->delete( $this->getCacheKey( $type ) );
+ }
+ }
+
+ /**
+ * @see JobQueue::getAllQueuedJobs()
+ * @return Iterator
+ */
+ public function getAllQueuedJobs() {
+ $dbr = $this->getSlaveDB();
+ try {
+ return new MappedIterator(
+ $dbr->select( 'job', self::selectFields(),
+ array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ),
+ function ( $row ) use ( $dbr ) {
+ $job = Job::factory(
+ $row->job_cmd,
+ Title::makeTitle( $row->job_namespace, $row->job_title ),
+ strlen( $row->job_params ) ? unserialize( $row->job_params ) : false
+ );
+ $job->metadata['id'] = $row->job_id;
+ return $job;
+ }
+ );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
+ }
+
+ public function getCoalesceLocationInternal() {
+ return $this->cluster
+ ? "DBCluster:{$this->cluster}:{$this->wiki}"
+ : "LBFactory:{$this->wiki}";
+ }
+
+ protected function doGetSiblingQueuesWithJobs( array $types ) {
+ $dbr = $this->getSlaveDB();
+ $res = $dbr->select( 'job', 'DISTINCT job_cmd',
+ array( 'job_cmd' => $types ), __METHOD__ );
+
+ $types = array();
+ foreach ( $res as $row ) {
+ $types[] = $row->job_cmd;
+ }
+
+ return $types;
+ }
+
+ protected function doGetSiblingQueueSizes( array $types ) {
+ $dbr = $this->getSlaveDB();
+ $res = $dbr->select( 'job', array( 'job_cmd', 'COUNT(*) AS count' ),
+ array( 'job_cmd' => $types ), __METHOD__, array( 'GROUP BY' => 'job_cmd' ) );
+
+ $sizes = array();
+ foreach ( $res as $row ) {
+ $sizes[$row->job_cmd] = (int)$row->count;
+ }
+
+ return $sizes;
+ }
+
+ /**
+ * Recycle or destroy any jobs that have been claimed for too long
+ *
+ * @return int Number of jobs recycled/deleted
+ */
+ public function recycleAndDeleteStaleJobs() {
+ $now = time();
+ $count = 0; // affected rows
+ $dbw = $this->getMasterDB();
+
+ try {
+ if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
+ return $count; // already in progress
+ }
+
+ // Remove claims on jobs acquired for too long if enabled...
+ if ( $this->claimTTL > 0 ) {
+ $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
+ // Get the IDs of jobs that have be claimed but not finished after too long.
+ // These jobs can be recycled into the queue by expiring the claim. Selecting
+ // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
+ $res = $dbw->select( 'job', 'job_id',
+ array(
+ 'job_cmd' => $this->type,
+ "job_token != {$dbw->addQuotes( '' )}", // was acquired
+ "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
+ "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ), // retries left
+ __METHOD__
+ );
+ $ids = array_map(
+ function ( $o ) {
+ return $o->job_id;
+ }, iterator_to_array( $res )
+ );
+ if ( count( $ids ) ) {
+ // Reset job_token for these jobs so that other runners will pick them up.
+ // Set the timestamp to the current time, as it is useful to now that the job
+ // was already tried before (the timestamp becomes the "released" time).
+ $dbw->update( 'job',
+ array(
+ 'job_token' => '',
+ 'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release
+ array(
+ 'job_id' => $ids ),
+ __METHOD__
+ );
+ $affected = $dbw->affectedRows();
+ $count += $affected;
+ JobQueue::incrStats( 'job-recycle', $this->type, $affected, $this->wiki );
+ $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
+ }
+ }
+
+ // Just destroy any stale jobs...
+ $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
+ $conds = array(
+ 'job_cmd' => $this->type,
+ "job_token != {$dbw->addQuotes( '' )}", // was acquired
+ "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
+ );
+ if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
+ $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
+ }
+ // Get the IDs of jobs that are considered stale and should be removed. Selecting
+ // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
+ $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
+ $ids = array_map(
+ function ( $o ) {
+ return $o->job_id;
+ }, iterator_to_array( $res )
+ );
+ if ( count( $ids ) ) {
+ $dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ );
+ $affected = $dbw->affectedRows();
+ $count += $affected;
+ JobQueue::incrStats( 'job-abandon', $this->type, $affected, $this->wiki );
+ }
+
+ $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
+
+ return $count;
+ }
+
+ /**
+ * @param IJobSpecification $job
+ * @return array
+ */
+ protected function insertFields( IJobSpecification $job ) {
+ $dbw = $this->getMasterDB();
+
+ return array(
+ // Fields that describe the nature of the job
+ 'job_cmd' => $job->getType(),
+ 'job_namespace' => $job->getTitle()->getNamespace(),
+ 'job_title' => $job->getTitle()->getDBkey(),
+ 'job_params' => self::makeBlob( $job->getParams() ),
+ // Additional job metadata
+ 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
+ 'job_timestamp' => $dbw->timestamp(),
+ 'job_sha1' => wfBaseConvert(
+ sha1( serialize( $job->getDeduplicationInfo() ) ),
+ 16, 36, 31
+ ),
+ 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
+ );
+ }
+
+ /**
+ * @throws JobQueueConnectionError
+ * @return DBConnRef
+ */
+ protected function getSlaveDB() {
+ try {
+ return $this->getDB( DB_SLAVE );
+ } catch ( DBConnectionError $e ) {
+ throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
+ }
+ }
+
+ /**
+ * @throws JobQueueConnectionError
+ * @return DBConnRef
+ */
+ protected function getMasterDB() {
+ try {
+ return $this->getDB( DB_MASTER );
+ } catch ( DBConnectionError $e ) {
+ throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
+ }
+ }
+
+ /**
+ * @param int $index (DB_SLAVE/DB_MASTER)
+ * @return DBConnRef
+ */
+ protected function getDB( $index ) {
+ $lb = ( $this->cluster !== false )
+ ? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki )
+ : wfGetLB( $this->wiki );
+
+ return $lb->getConnectionRef( $index, array(), $this->wiki );
+ }
+
+ /**
+ * @param string $property
+ * @return string
+ */
+ private function getCacheKey( $property ) {
+ list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+ $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
+
+ return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property );
+ }
+
+ /**
+ * @param array|bool $params
+ * @return string
+ */
+ protected static function makeBlob( $params ) {
+ if ( $params !== false ) {
+ return serialize( $params );
+ } else {
+ return '';
+ }
+ }
+
+ /**
+ * @param string $blob
+ * @return bool|mixed
+ */
+ protected static function extractBlob( $blob ) {
+ if ( (string)$blob !== '' ) {
+ return unserialize( $blob );
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * @param DBError $e
+ * @throws JobQueueError
+ */
+ protected function throwDBException( DBError $e ) {
+ throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
+ }
+
+ /**
+ * Return the list of job fields that should be selected.
+ * @since 1.23
+ * @return array
+ */
+ public static function selectFields() {
+ return array(
+ 'job_id',
+ 'job_cmd',
+ 'job_namespace',
+ 'job_title',
+ 'job_timestamp',
+ 'job_params',
+ 'job_random',
+ 'job_attempts',
+ 'job_token',
+ 'job_token_timestamp',
+ 'job_sha1',
+ );
+ }
+}
diff --git a/includes/jobqueue/JobQueueFederated.php b/includes/jobqueue/JobQueueFederated.php
new file mode 100644
index 00000000..c4301eed
--- /dev/null
+++ b/includes/jobqueue/JobQueueFederated.php
@@ -0,0 +1,559 @@
+<?php
+/**
+ * Job queue code for federated queues.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class to handle enqueueing and running of background jobs for federated queues
+ *
+ * This class allows for queues to be partitioned into smaller queues.
+ * A partition is defined by the configuration for a JobQueue instance.
+ * For example, one can set $wgJobTypeConf['refreshLinks'] to point to a
+ * JobQueueFederated instance, which itself would consist of three JobQueueRedis
+ * instances, each using their own redis server. This would allow for the jobs
+ * to be split (evenly or based on weights) accross multiple servers if a single
+ * server becomes impractical or expensive. Different JobQueue classes can be mixed.
+ *
+ * The basic queue configuration (e.g. "order", "claimTTL") of a federated queue
+ * is inherited by the partition queues. Additional configuration defines what
+ * section each wiki is in, what partition queues each section uses (and their weight),
+ * and the JobQueue configuration for each partition. Some sections might only need a
+ * single queue partition, like the sections for groups of small wikis.
+ *
+ * If used for performance, then $wgMainCacheType should be set to memcached/redis.
+ * Note that "fifo" cannot be used for the ordering, since the data is distributed.
+ * One can still use "timestamp" instead, as in "roughly timestamp ordered". Also,
+ * queue classes used by this should ignore down servers (with TTL) to avoid slowness.
+ *
+ * @ingroup JobQueue
+ * @since 1.22
+ */
+class JobQueueFederated extends JobQueue {
+ /** @var HashRing */
+ protected $partitionRing;
+ /** @var HashRing */
+ protected $partitionPushRing;
+ /** @var array (partition name => JobQueue) reverse sorted by weight */
+ protected $partitionQueues = array();
+
+ /** @var BagOStuff */
+ protected $cache;
+
+ /** @var int Maximum number of partitions to try */
+ protected $maxPartitionsTry;
+
+ const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
+ const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
+
+ /**
+ * @param array $params Possible keys:
+ * - sectionsByWiki : A map of wiki IDs to section names.
+ * Wikis will default to using the section "default".
+ * - partitionsBySection : Map of section names to maps of (partition name => weight).
+ * A section called 'default' must be defined if not all wikis
+ * have explicitly defined sections.
+ * - configByPartition : Map of queue partition names to configuration arrays.
+ * These configuration arrays are passed to JobQueue::factory().
+ * The options set here are overriden by those passed to this
+ * the federated queue itself (e.g. 'order' and 'claimTTL').
+ * - partitionsNoPush : List of partition names that can handle pop() but not push().
+ * This can be used to migrate away from a certain partition.
+ * - maxPartitionsTry : Maximum number of times to attempt job insertion using
+ * different partition queues. This improves availability
+ * during failure, at the cost of added latency and somewhat
+ * less reliable job de-duplication mechanisms.
+ * @throws MWException
+ */
+ protected function __construct( array $params ) {
+ parent::__construct( $params );
+ $section = isset( $params['sectionsByWiki'][$this->wiki] )
+ ? $params['sectionsByWiki'][$this->wiki]
+ : 'default';
+ if ( !isset( $params['partitionsBySection'][$section] ) ) {
+ throw new MWException( "No configuration for section '$section'." );
+ }
+ $this->maxPartitionsTry = isset( $params['maxPartitionsTry'] )
+ ? $params['maxPartitionsTry']
+ : 2;
+ // Get the full partition map
+ $partitionMap = $params['partitionsBySection'][$section];
+ arsort( $partitionMap, SORT_NUMERIC );
+ // Get the partitions jobs can actually be pushed to
+ $partitionPushMap = $partitionMap;
+ if ( isset( $params['partitionsNoPush'] ) ) {
+ foreach ( $params['partitionsNoPush'] as $partition ) {
+ unset( $partitionPushMap[$partition] );
+ }
+ }
+ // Get the config to pass to merge into each partition queue config
+ $baseConfig = $params;
+ foreach ( array( 'class', 'sectionsByWiki', 'maxPartitionsTry',
+ 'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o
+ ) {
+ unset( $baseConfig[$o] ); // partition queue doesn't care about this
+ }
+ // Get the partition queue objects
+ foreach ( $partitionMap as $partition => $w ) {
+ if ( !isset( $params['configByPartition'][$partition] ) ) {
+ throw new MWException( "No configuration for partition '$partition'." );
+ }
+ $this->partitionQueues[$partition] = JobQueue::factory(
+ $baseConfig + $params['configByPartition'][$partition] );
+ }
+ // Ring of all partitions
+ $this->partitionRing = new HashRing( $partitionMap );
+ // Get the ring of partitions to push jobs into
+ if ( count( $partitionPushMap ) === count( $partitionMap ) ) {
+ $this->partitionPushRing = clone $this->partitionRing; // faster
+ } else {
+ $this->partitionPushRing = new HashRing( $partitionPushMap );
+ }
+ // Aggregate cache some per-queue values if there are multiple partition queues
+ $this->cache = count( $partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff();
+ }
+
+ protected function supportedOrders() {
+ // No FIFO due to partitioning, though "rough timestamp order" is supported
+ return array( 'undefined', 'random', 'timestamp' );
+ }
+
+ protected function optimalOrder() {
+ return 'undefined'; // defer to the partitions
+ }
+
+ protected function supportsDelayedJobs() {
+ return true; // defer checks to the partitions
+ }
+
+ protected function doIsEmpty() {
+ $key = $this->getCacheKey( 'empty' );
+
+ $isEmpty = $this->cache->get( $key );
+ if ( $isEmpty === 'true' ) {
+ return true;
+ } elseif ( $isEmpty === 'false' ) {
+ return false;
+ }
+
+ $empty = true;
+ $failed = 0;
+ foreach ( $this->partitionQueues as $queue ) {
+ try {
+ $empty = $empty && $queue->doIsEmpty();
+ } catch ( JobQueueError $e ) {
+ ++$failed;
+ MWExceptionHandler::logException( $e );
+ }
+ }
+ $this->throwErrorIfAllPartitionsDown( $failed );
+
+ $this->cache->add( $key, $empty ? 'true' : 'false', self::CACHE_TTL_LONG );
+ return $empty;
+ }
+
+ protected function doGetSize() {
+ return $this->getCrossPartitionSum( 'size', 'doGetSize' );
+ }
+
+ protected function doGetAcquiredCount() {
+ return $this->getCrossPartitionSum( 'acquiredcount', 'doGetAcquiredCount' );
+ }
+
+ protected function doGetDelayedCount() {
+ return $this->getCrossPartitionSum( 'delayedcount', 'doGetDelayedCount' );
+ }
+
+ protected function doGetAbandonedCount() {
+ return $this->getCrossPartitionSum( 'abandonedcount', 'doGetAbandonedCount' );
+ }
+
+ /**
+ * @param string $type
+ * @param string $method
+ * @return int
+ */
+ protected function getCrossPartitionSum( $type, $method ) {
+ $key = $this->getCacheKey( $type );
+
+ $count = $this->cache->get( $key );
+ if ( $count !== false ) {
+ return $count;
+ }
+
+ $failed = 0;
+ foreach ( $this->partitionQueues as $queue ) {
+ try {
+ $count += $queue->$method();
+ } catch ( JobQueueError $e ) {
+ ++$failed;
+ MWExceptionHandler::logException( $e );
+ }
+ }
+ $this->throwErrorIfAllPartitionsDown( $failed );
+
+ $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
+
+ return $count;
+ }
+
+ protected function doBatchPush( array $jobs, $flags ) {
+ // Local ring variable that may be changed to point to a new ring on failure
+ $partitionRing = $this->partitionPushRing;
+ // Try to insert the jobs and update $partitionsTry on any failures.
+ // Retry to insert any remaning jobs again, ignoring the bad partitions.
+ $jobsLeft = $jobs;
+ // @codingStandardsIgnoreStart Generic.CodeAnalysis.ForLoopWithTestFunctionCall.NotAllowed
+ for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) {
+ // @codingStandardsIgnoreEnd
+ try {
+ $partitionRing->getLiveRing();
+ } catch ( UnexpectedValueException $e ) {
+ break; // all servers down; nothing to insert to
+ }
+ $jobsLeft = $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags );
+ }
+ if ( count( $jobsLeft ) ) {
+ throw new JobQueueError(
+ "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." );
+ }
+ }
+
+ /**
+ * @param array $jobs
+ * @param HashRing $partitionRing
+ * @param int $flags
+ * @throws JobQueueError
+ * @return array List of Job object that could not be inserted
+ */
+ protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) {
+ $jobsLeft = array();
+
+ // Because jobs are spread across partitions, per-job de-duplication needs
+ // to use a consistent hash to avoid allowing duplicate jobs per partition.
+ // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded.
+ $uJobsByPartition = array(); // (partition name => job list)
+ /** @var Job $job */
+ foreach ( $jobs as $key => $job ) {
+ if ( $job->ignoreDuplicates() ) {
+ $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) );
+ $uJobsByPartition[$partitionRing->getLiveLocation( $sha1 )][] = $job;
+ unset( $jobs[$key] );
+ }
+ }
+ // Get the batches of jobs that are not de-duplicated
+ if ( $flags & self::QOS_ATOMIC ) {
+ $nuJobBatches = array( $jobs ); // all or nothing
+ } else {
+ // Split the jobs into batches and spread them out over servers if there
+ // are many jobs. This helps keep the partitions even. Otherwise, send all
+ // the jobs to a single partition queue to avoids the extra connections.
+ $nuJobBatches = array_chunk( $jobs, 300 );
+ }
+
+ // Insert the de-duplicated jobs into the queues...
+ foreach ( $uJobsByPartition as $partition => $jobBatch ) {
+ /** @var JobQueue $queue */
+ $queue = $this->partitionQueues[$partition];
+ try {
+ $ok = true;
+ $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
+ } catch ( JobQueueError $e ) {
+ $ok = false;
+ MWExceptionHandler::logException( $e );
+ }
+ if ( $ok ) {
+ $key = $this->getCacheKey( 'empty' );
+ $this->cache->set( $key, 'false', self::CACHE_TTL_LONG );
+ } else {
+ if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist
+ throw new JobQueueError( "Could not insert job(s), no partitions available." );
+ }
+ $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
+ }
+ }
+
+ // Insert the jobs that are not de-duplicated into the queues...
+ foreach ( $nuJobBatches as $jobBatch ) {
+ $partition = ArrayUtils::pickRandom( $partitionRing->getLiveLocationWeights() );
+ $queue = $this->partitionQueues[$partition];
+ try {
+ $ok = true;
+ $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
+ } catch ( JobQueueError $e ) {
+ $ok = false;
+ MWExceptionHandler::logException( $e );
+ }
+ if ( $ok ) {
+ $key = $this->getCacheKey( 'empty' );
+ $this->cache->set( $key, 'false', self::CACHE_TTL_LONG );
+ } else {
+ if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist
+ throw new JobQueueError( "Could not insert job(s), no partitions available." );
+ }
+ $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
+ }
+ }
+
+ return $jobsLeft;
+ }
+
+ protected function doPop() {
+ $partitionsTry = $this->partitionRing->getLiveLocationWeights(); // (partition => weight)
+
+ $failed = 0;
+ while ( count( $partitionsTry ) ) {
+ $partition = ArrayUtils::pickRandom( $partitionsTry );
+ if ( $partition === false ) {
+ break; // all partitions at 0 weight
+ }
+
+ /** @var JobQueue $queue */
+ $queue = $this->partitionQueues[$partition];
+ try {
+ $job = $queue->pop();
+ } catch ( JobQueueError $e ) {
+ ++$failed;
+ MWExceptionHandler::logException( $e );
+ $job = false;
+ }
+ if ( $job ) {
+ $job->metadata['QueuePartition'] = $partition;
+
+ return $job;
+ } else {
+ unset( $partitionsTry[$partition] ); // blacklist partition
+ }
+ }
+ $this->throwErrorIfAllPartitionsDown( $failed );
+
+ $key = $this->getCacheKey( 'empty' );
+ $this->cache->set( $key, 'true', self::CACHE_TTL_LONG );
+
+ return false;
+ }
+
+ protected function doAck( Job $job ) {
+ if ( !isset( $job->metadata['QueuePartition'] ) ) {
+ throw new MWException( "The given job has no defined partition name." );
+ }
+
+ return $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job );
+ }
+
+ protected function doIsRootJobOldDuplicate( Job $job ) {
+ $params = $job->getRootJobParams();
+ $sigature = $params['rootJobSignature'];
+ $partition = $this->partitionPushRing->getLiveLocation( $sigature );
+ try {
+ return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
+ } catch ( JobQueueError $e ) {
+ if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) {
+ $partition = $this->partitionPushRing->getLiveLocation( $sigature );
+ return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
+ }
+ }
+
+ return false;
+ }
+
+ protected function doDeduplicateRootJob( Job $job ) {
+ $params = $job->getRootJobParams();
+ $sigature = $params['rootJobSignature'];
+ $partition = $this->partitionPushRing->getLiveLocation( $sigature );
+ try {
+ return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
+ } catch ( JobQueueError $e ) {
+ if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) {
+ $partition = $this->partitionPushRing->getLiveLocation( $sigature );
+ return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
+ }
+ }
+
+ return false;
+ }
+
+ protected function doDelete() {
+ $failed = 0;
+ /** @var JobQueue $queue */
+ foreach ( $this->partitionQueues as $queue ) {
+ try {
+ $queue->doDelete();
+ } catch ( JobQueueError $e ) {
+ ++$failed;
+ MWExceptionHandler::logException( $e );
+ }
+ }
+ $this->throwErrorIfAllPartitionsDown( $failed );
+ return true;
+ }
+
+ protected function doWaitForBackups() {
+ $failed = 0;
+ /** @var JobQueue $queue */
+ foreach ( $this->partitionQueues as $queue ) {
+ try {
+ $queue->waitForBackups();
+ } catch ( JobQueueError $e ) {
+ ++$failed;
+ MWExceptionHandler::logException( $e );
+ }
+ }
+ $this->throwErrorIfAllPartitionsDown( $failed );
+ }
+
+ protected function doGetPeriodicTasks() {
+ $tasks = array();
+ /** @var JobQueue $queue */
+ foreach ( $this->partitionQueues as $partition => $queue ) {
+ foreach ( $queue->getPeriodicTasks() as $task => $def ) {
+ $tasks["{$partition}:{$task}"] = $def;
+ }
+ }
+
+ return $tasks;
+ }
+
+ protected function doFlushCaches() {
+ static $types = array(
+ 'empty',
+ 'size',
+ 'acquiredcount',
+ 'delayedcount',
+ 'abandonedcount'
+ );
+
+ foreach ( $types as $type ) {
+ $this->cache->delete( $this->getCacheKey( $type ) );
+ }
+
+ /** @var JobQueue $queue */
+ foreach ( $this->partitionQueues as $queue ) {
+ $queue->doFlushCaches();
+ }
+ }
+
+ public function getAllQueuedJobs() {
+ $iterator = new AppendIterator();
+
+ /** @var JobQueue $queue */
+ foreach ( $this->partitionQueues as $queue ) {
+ $iterator->append( $queue->getAllQueuedJobs() );
+ }
+
+ return $iterator;
+ }
+
+ public function getAllDelayedJobs() {
+ $iterator = new AppendIterator();
+
+ /** @var JobQueue $queue */
+ foreach ( $this->partitionQueues as $queue ) {
+ $iterator->append( $queue->getAllDelayedJobs() );
+ }
+
+ return $iterator;
+ }
+
+ public function getCoalesceLocationInternal() {
+ return "JobQueueFederated:wiki:{$this->wiki}" .
+ sha1( serialize( array_keys( $this->partitionQueues ) ) );
+ }
+
+ protected function doGetSiblingQueuesWithJobs( array $types ) {
+ $result = array();
+
+ $failed = 0;
+ /** @var JobQueue $queue */
+ foreach ( $this->partitionQueues as $queue ) {
+ try {
+ $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
+ if ( is_array( $nonEmpty ) ) {
+ $result = array_unique( array_merge( $result, $nonEmpty ) );
+ } else {
+ return null; // not supported on all partitions; bail
+ }
+ if ( count( $result ) == count( $types ) ) {
+ break; // short-circuit
+ }
+ } catch ( JobQueueError $e ) {
+ ++$failed;
+ MWExceptionHandler::logException( $e );
+ }
+ }
+ $this->throwErrorIfAllPartitionsDown( $failed );
+
+ return array_values( $result );
+ }
+
+ protected function doGetSiblingQueueSizes( array $types ) {
+ $result = array();
+ $failed = 0;
+ /** @var JobQueue $queue */
+ foreach ( $this->partitionQueues as $queue ) {
+ try {
+ $sizes = $queue->doGetSiblingQueueSizes( $types );
+ if ( is_array( $sizes ) ) {
+ foreach ( $sizes as $type => $size ) {
+ $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size;
+ }
+ } else {
+ return null; // not supported on all partitions; bail
+ }
+ } catch ( JobQueueError $e ) {
+ ++$failed;
+ MWExceptionHandler::logException( $e );
+ }
+ }
+ $this->throwErrorIfAllPartitionsDown( $failed );
+
+ return $result;
+ }
+
+ /**
+ * Throw an error if no partitions available
+ *
+ * @param int $down The number of up partitions down
+ * @return void
+ * @throws JobQueueError
+ */
+ protected function throwErrorIfAllPartitionsDown( $down ) {
+ if ( $down >= count( $this->partitionQueues ) ) {
+ throw new JobQueueError( 'No queue partitions available.' );
+ }
+ }
+
+ public function setTestingPrefix( $key ) {
+ /** @var JobQueue $queue */
+ foreach ( $this->partitionQueues as $queue ) {
+ $queue->setTestingPrefix( $key );
+ }
+ }
+
+ /**
+ * @param string $property
+ * @return string
+ */
+ private function getCacheKey( $property ) {
+ list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+
+ return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property );
+ }
+}
diff --git a/includes/jobqueue/JobQueueGroup.php b/includes/jobqueue/JobQueueGroup.php
new file mode 100644
index 00000000..98a78c5e
--- /dev/null
+++ b/includes/jobqueue/JobQueueGroup.php
@@ -0,0 +1,440 @@
+<?php
+/**
+ * Job queue base code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class to handle enqueueing of background jobs
+ *
+ * @ingroup JobQueue
+ * @since 1.21
+ */
+class JobQueueGroup {
+ /** @var array */
+ protected static $instances = array();
+
+ /** @var ProcessCacheLRU */
+ protected $cache;
+
+ /** @var string Wiki ID */
+ protected $wiki;
+
+ /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */
+ protected $coalescedQueues;
+
+ const TYPE_DEFAULT = 1; // integer; jobs popped by default
+ const TYPE_ANY = 2; // integer; any job
+
+ const USE_CACHE = 1; // integer; use process or persistent cache
+
+ const PROC_CACHE_TTL = 15; // integer; seconds
+
+ const CACHE_VERSION = 1; // integer; cache version
+
+ /**
+ * @param string $wiki Wiki ID
+ */
+ protected function __construct( $wiki ) {
+ $this->wiki = $wiki;
+ $this->cache = new ProcessCacheLRU( 10 );
+ }
+
+ /**
+ * @param bool|string $wiki Wiki ID
+ * @return JobQueueGroup
+ */
+ public static function singleton( $wiki = false ) {
+ $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
+ if ( !isset( self::$instances[$wiki] ) ) {
+ self::$instances[$wiki] = new self( $wiki );
+ }
+
+ return self::$instances[$wiki];
+ }
+
+ /**
+ * Destroy the singleton instances
+ *
+ * @return void
+ */
+ public static function destroySingletons() {
+ self::$instances = array();
+ }
+
+ /**
+ * Get the job queue object for a given queue type
+ *
+ * @param string $type
+ * @return JobQueue
+ */
+ public function get( $type ) {
+ global $wgJobTypeConf;
+
+ $conf = array( 'wiki' => $this->wiki, 'type' => $type );
+ if ( isset( $wgJobTypeConf[$type] ) ) {
+ $conf = $conf + $wgJobTypeConf[$type];
+ } else {
+ $conf = $conf + $wgJobTypeConf['default'];
+ }
+
+ return JobQueue::factory( $conf );
+ }
+
+ /**
+ * Insert jobs into the respective queues of with the belong.
+ *
+ * This inserts the jobs into the queue specified by $wgJobTypeConf
+ * and updates the aggregate job queue information cache as needed.
+ *
+ * @param Job|array $jobs A single Job or a list of Jobs
+ * @throws MWException
+ * @return void
+ */
+ public function push( $jobs ) {
+ $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
+ if ( !count( $jobs ) ) {
+ return;
+ }
+
+ $jobsByType = array(); // (job type => list of jobs)
+ foreach ( $jobs as $job ) {
+ if ( $job instanceof IJobSpecification ) {
+ $jobsByType[$job->getType()][] = $job;
+ } else {
+ throw new MWException( "Attempted to push a non-Job object into a queue." );
+ }
+ }
+
+ foreach ( $jobsByType as $type => $jobs ) {
+ $this->get( $type )->push( $jobs );
+ JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
+ }
+
+ if ( $this->cache->has( 'queues-ready', 'list' ) ) {
+ $list = $this->cache->get( 'queues-ready', 'list' );
+ if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
+ $this->cache->clear( 'queues-ready' );
+ }
+ }
+ }
+
+ /**
+ * Pop a job off one of the job queues
+ *
+ * This pops a job off a queue as specified by $wgJobTypeConf and
+ * updates the aggregate job queue information cache as needed.
+ *
+ * @param int|string $qtype JobQueueGroup::TYPE_* constant or job type string
+ * @param int $flags Bitfield of JobQueueGroup::USE_* constants
+ * @param array $blacklist List of job types to ignore
+ * @return Job|bool Returns false on failure
+ */
+ public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = array() ) {
+ $job = false;
+
+ if ( is_string( $qtype ) ) { // specific job type
+ if ( !in_array( $qtype, $blacklist ) ) {
+ $job = $this->get( $qtype )->pop();
+ if ( !$job ) {
+ JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype );
+ }
+ }
+ } else { // any job in the "default" jobs types
+ if ( $flags & self::USE_CACHE ) {
+ if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
+ $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() );
+ }
+ $types = $this->cache->get( 'queues-ready', 'list' );
+ } else {
+ $types = $this->getQueuesWithJobs();
+ }
+
+ if ( $qtype == self::TYPE_DEFAULT ) {
+ $types = array_intersect( $types, $this->getDefaultQueueTypes() );
+ }
+
+ $types = array_diff( $types, $blacklist ); // avoid selected types
+ shuffle( $types ); // avoid starvation
+
+ foreach ( $types as $type ) { // for each queue...
+ $job = $this->get( $type )->pop();
+ if ( $job ) { // found
+ break;
+ } else { // not found
+ JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type );
+ $this->cache->clear( 'queues-ready' );
+ }
+ }
+ }
+
+ return $job;
+ }
+
+ /**
+ * Acknowledge that a job was completed
+ *
+ * @param Job $job
+ * @return bool
+ */
+ public function ack( Job $job ) {
+ return $this->get( $job->getType() )->ack( $job );
+ }
+
+ /**
+ * Register the "root job" of a given job into the queue for de-duplication.
+ * This should only be called right *after* all the new jobs have been inserted.
+ *
+ * @param Job $job
+ * @return bool
+ */
+ public function deduplicateRootJob( Job $job ) {
+ return $this->get( $job->getType() )->deduplicateRootJob( $job );
+ }
+
+ /**
+ * Wait for any slaves or backup queue servers to catch up.
+ *
+ * This does nothing for certain queue classes.
+ *
+ * @return void
+ * @throws MWException
+ */
+ public function waitForBackups() {
+ global $wgJobTypeConf;
+
+ wfProfileIn( __METHOD__ );
+ // Try to avoid doing this more than once per queue storage medium
+ foreach ( $wgJobTypeConf as $type => $conf ) {
+ $this->get( $type )->waitForBackups();
+ }
+ wfProfileOut( __METHOD__ );
+ }
+
+ /**
+ * Get the list of queue types
+ *
+ * @return array List of strings
+ */
+ public function getQueueTypes() {
+ return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
+ }
+
+ /**
+ * Get the list of default queue types
+ *
+ * @return array List of strings
+ */
+ public function getDefaultQueueTypes() {
+ global $wgJobTypesExcludedFromDefaultQueue;
+
+ return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
+ }
+
+ /**
+ * Check if there are any queues with jobs (this is cached)
+ *
+ * @param int $type JobQueueGroup::TYPE_* constant
+ * @return bool
+ * @since 1.23
+ */
+ public function queuesHaveJobs( $type = self::TYPE_ANY ) {
+ global $wgMemc;
+
+ $key = wfMemcKey( 'jobqueue', 'queueshavejobs', $type );
+
+ $value = $wgMemc->get( $key );
+ if ( $value === false ) {
+ $queues = $this->getQueuesWithJobs();
+ if ( $type == self::TYPE_DEFAULT ) {
+ $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
+ }
+ $value = count( $queues ) ? 'true' : 'false';
+ $wgMemc->add( $key, $value, 15 );
+ }
+
+ return ( $value === 'true' );
+ }
+
+ /**
+ * Get the list of job types that have non-empty queues
+ *
+ * @return array List of job types that have non-empty queues
+ */
+ public function getQueuesWithJobs() {
+ $types = array();
+ foreach ( $this->getCoalescedQueues() as $info ) {
+ $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() );
+ if ( is_array( $nonEmpty ) ) { // batching features supported
+ $types = array_merge( $types, $nonEmpty );
+ } else { // we have to go through the queues in the bucket one-by-one
+ foreach ( $info['types'] as $type ) {
+ if ( !$this->get( $type )->isEmpty() ) {
+ $types[] = $type;
+ }
+ }
+ }
+ }
+
+ return $types;
+ }
+
+ /**
+ * Get the size of the queus for a list of job types
+ *
+ * @return array Map of (job type => size)
+ */
+ public function getQueueSizes() {
+ $sizeMap = array();
+ foreach ( $this->getCoalescedQueues() as $info ) {
+ $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() );
+ if ( is_array( $sizes ) ) { // batching features supported
+ $sizeMap = $sizeMap + $sizes;
+ } else { // we have to go through the queues in the bucket one-by-one
+ foreach ( $info['types'] as $type ) {
+ $sizeMap[$type] = $this->get( $type )->getSize();
+ }
+ }
+ }
+
+ return $sizeMap;
+ }
+
+ /**
+ * @return array
+ */
+ protected function getCoalescedQueues() {
+ global $wgJobTypeConf;
+
+ if ( $this->coalescedQueues === null ) {
+ $this->coalescedQueues = array();
+ foreach ( $wgJobTypeConf as $type => $conf ) {
+ $queue = JobQueue::factory(
+ array( 'wiki' => $this->wiki, 'type' => 'null' ) + $conf );
+ $loc = $queue->getCoalesceLocationInternal();
+ if ( !isset( $this->coalescedQueues[$loc] ) ) {
+ $this->coalescedQueues[$loc]['queue'] = $queue;
+ $this->coalescedQueues[$loc]['types'] = array();
+ }
+ if ( $type === 'default' ) {
+ $this->coalescedQueues[$loc]['types'] = array_merge(
+ $this->coalescedQueues[$loc]['types'],
+ array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
+ );
+ } else {
+ $this->coalescedQueues[$loc]['types'][] = $type;
+ }
+ }
+ }
+
+ return $this->coalescedQueues;
+ }
+
+ /**
+ * Execute any due periodic queue maintenance tasks for all queues.
+ *
+ * A task is "due" if the time ellapsed since the last run is greater than
+ * the defined run period. Concurrent calls to this function will cause tasks
+ * to be attempted twice, so they may need their own methods of mutual exclusion.
+ *
+ * @return int Number of tasks run
+ */
+ public function executeReadyPeriodicTasks() {
+ global $wgMemc;
+
+ list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+ $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' );
+ $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp)
+
+ $count = 0;
+ $tasksRun = array(); // (queue => task => UNIX timestamp)
+ foreach ( $this->getQueueTypes() as $type ) {
+ $queue = $this->get( $type );
+ foreach ( $queue->getPeriodicTasks() as $task => $definition ) {
+ if ( $definition['period'] <= 0 ) {
+ continue; // disabled
+ } elseif ( !isset( $lastRuns[$type][$task] )
+ || $lastRuns[$type][$task] < ( time() - $definition['period'] )
+ ) {
+ try {
+ if ( call_user_func( $definition['callback'] ) !== null ) {
+ $tasksRun[$type][$task] = time();
+ ++$count;
+ }
+ } catch ( JobQueueError $e ) {
+ MWExceptionHandler::logException( $e );
+ }
+ }
+ }
+ // The tasks may have recycled jobs or release delayed jobs into the queue
+ if ( isset( $tasksRun[$type] ) && !$queue->isEmpty() ) {
+ JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
+ }
+ }
+
+ if ( $count === 0 ) {
+ return $count; // nothing to update
+ }
+
+ $wgMemc->merge( $key, function ( $cache, $key, $lastRuns ) use ( $tasksRun ) {
+ if ( is_array( $lastRuns ) ) {
+ foreach ( $tasksRun as $type => $tasks ) {
+ foreach ( $tasks as $task => $timestamp ) {
+ if ( !isset( $lastRuns[$type][$task] )
+ || $timestamp > $lastRuns[$type][$task]
+ ) {
+ $lastRuns[$type][$task] = $timestamp;
+ }
+ }
+ }
+ } else {
+ $lastRuns = $tasksRun;
+ }
+
+ return $lastRuns;
+ } );
+
+ return $count;
+ }
+
+ /**
+ * @param string $name
+ * @return mixed
+ */
+ private function getCachedConfigVar( $name ) {
+ global $wgConf, $wgMemc;
+
+ if ( $this->wiki === wfWikiID() ) {
+ return $GLOBALS[$name]; // common case
+ } else {
+ list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+ $key = wfForeignMemcKey( $db, $prefix, 'configvalue', $name );
+ $value = $wgMemc->get( $key ); // ('v' => ...) or false
+ if ( is_array( $value ) ) {
+ return $value['v'];
+ } else {
+ $value = $wgConf->getConfig( $this->wiki, $name );
+ $wgMemc->set( $key, array( 'v' => $value ), 86400 + mt_rand( 0, 86400 ) );
+
+ return $value;
+ }
+ }
+ }
+}
diff --git a/includes/jobqueue/JobQueueRedis.php b/includes/jobqueue/JobQueueRedis.php
new file mode 100644
index 00000000..3519eac8
--- /dev/null
+++ b/includes/jobqueue/JobQueueRedis.php
@@ -0,0 +1,865 @@
+<?php
+/**
+ * Redis-backed job queue code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class to handle job queues stored in Redis
+ *
+ * This is faster, less resource intensive, queue that JobQueueDB.
+ * All data for a queue using this class is placed into one redis server.
+ *
+ * There are eight main redis keys used to track jobs:
+ * - l-unclaimed : A list of job IDs used for ready unclaimed jobs
+ * - z-claimed : A sorted set of (job ID, UNIX timestamp as score) used for job retries
+ * - z-abandoned : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs
+ * - z-delayed : A sorted set of (job ID, UNIX timestamp as score) used for delayed jobs
+ * - h-idBySha1 : A hash of (SHA1 => job ID) for unclaimed jobs used for de-duplication
+ * - h-sha1ById : A hash of (job ID => SHA1) for unclaimed jobs used for de-duplication
+ * - h-attempts : A hash of (job ID => attempt count) used for job claiming/retries
+ * - h-data : A hash of (job ID => serialized blobs) for job storage
+ * A job ID can be in only one of z-delayed, l-unclaimed, z-claimed, and z-abandoned.
+ * If an ID appears in any of those lists, it should have a h-data entry for its ID.
+ * If a job has a SHA1 de-duplication value and its ID is in l-unclaimed or z-delayed, then
+ * there should be no other such jobs with that SHA1. Every h-idBySha1 entry has an h-sha1ById
+ * entry and every h-sha1ById must refer to an ID that is l-unclaimed. If a job has its
+ * ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID.
+ *
+ * Additionally, "rootjob:* keys track "root jobs" used for additional de-duplication.
+ * Aside from root job keys, all keys have no expiry, and are only removed when jobs are run.
+ * All the keys are prefixed with the relevant wiki ID information.
+ *
+ * This class requires Redis 2.6 as it makes use Lua scripts for fast atomic operations.
+ * Additionally, it should be noted that redis has different persistence modes, such
+ * as rdb snapshots, journaling, and no persistent. Appropriate configuration should be
+ * made on the servers based on what queues are using it and what tolerance they have.
+ *
+ * @ingroup JobQueue
+ * @ingroup Redis
+ * @since 1.22
+ */
+class JobQueueRedis extends JobQueue {
+ /** @var RedisConnectionPool */
+ protected $redisPool;
+
+ /** @var string Server address */
+ protected $server;
+ /** @var string Compression method to use */
+ protected $compression;
+ /** @var bool */
+ protected $daemonized;
+
+ const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days)
+
+ /** @var string Key to prefix the queue keys with (used for testing) */
+ protected $key;
+
+ /**
+ * @param array $params Possible keys:
+ * - redisConfig : An array of parameters to RedisConnectionPool::__construct().
+ * Note that the serializer option is ignored as "none" is always used.
+ * - redisServer : A hostname/port combination or the absolute path of a UNIX socket.
+ * If a hostname is specified but no port, the standard port number
+ * 6379 will be used. Required.
+ * - compression : The type of compression to use; one of (none,gzip).
+ * - daemonized : Set to true if the redisJobRunnerService runs in the background.
+ * This will disable job recycling/undelaying from the MediaWiki side
+ * to avoid redundance and out-of-sync configuration.
+ */
+ public function __construct( array $params ) {
+ parent::__construct( $params );
+ $params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua
+ $this->server = $params['redisServer'];
+ $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none';
+ $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
+ $this->daemonized = !empty( $params['daemonized'] );
+ }
+
+ protected function supportedOrders() {
+ return array( 'timestamp', 'fifo' );
+ }
+
+ protected function optimalOrder() {
+ return 'fifo';
+ }
+
+ protected function supportsDelayedJobs() {
+ return true;
+ }
+
+ /**
+ * @see JobQueue::doIsEmpty()
+ * @return bool
+ * @throws MWException
+ */
+ protected function doIsEmpty() {
+ return $this->doGetSize() == 0;
+ }
+
+ /**
+ * @see JobQueue::doGetSize()
+ * @return int
+ * @throws MWException
+ */
+ protected function doGetSize() {
+ $conn = $this->getConnection();
+ try {
+ return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+ }
+
+ /**
+ * @see JobQueue::doGetAcquiredCount()
+ * @return int
+ * @throws JobQueueError
+ */
+ protected function doGetAcquiredCount() {
+ if ( $this->claimTTL <= 0 ) {
+ return 0; // no acknowledgements
+ }
+ $conn = $this->getConnection();
+ try {
+ $conn->multi( Redis::PIPELINE );
+ $conn->zSize( $this->getQueueKey( 'z-claimed' ) );
+ $conn->zSize( $this->getQueueKey( 'z-abandoned' ) );
+
+ return array_sum( $conn->exec() );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+ }
+
+ /**
+ * @see JobQueue::doGetDelayedCount()
+ * @return int
+ * @throws JobQueueError
+ */
+ protected function doGetDelayedCount() {
+ if ( !$this->checkDelay ) {
+ return 0; // no delayed jobs
+ }
+ $conn = $this->getConnection();
+ try {
+ return $conn->zSize( $this->getQueueKey( 'z-delayed' ) );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+ }
+
+ /**
+ * @see JobQueue::doGetAbandonedCount()
+ * @return int
+ * @throws JobQueueError
+ */
+ protected function doGetAbandonedCount() {
+ if ( $this->claimTTL <= 0 ) {
+ return 0; // no acknowledgements
+ }
+ $conn = $this->getConnection();
+ try {
+ return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+ }
+
+ /**
+ * @see JobQueue::doBatchPush()
+ * @param array $jobs
+ * @param int $flags
+ * @return void
+ * @throws JobQueueError
+ */
+ protected function doBatchPush( array $jobs, $flags ) {
+ // Convert the jobs into field maps (de-duplicated against each other)
+ $items = array(); // (job ID => job fields map)
+ foreach ( $jobs as $job ) {
+ $item = $this->getNewJobFields( $job );
+ if ( strlen( $item['sha1'] ) ) { // hash identifier => de-duplicate
+ $items[$item['sha1']] = $item;
+ } else {
+ $items[$item['uuid']] = $item;
+ }
+ }
+
+ if ( !count( $items ) ) {
+ return; // nothing to do
+ }
+
+ $conn = $this->getConnection();
+ try {
+ // Actually push the non-duplicate jobs into the queue...
+ if ( $flags & self::QOS_ATOMIC ) {
+ $batches = array( $items ); // all or nothing
+ } else {
+ $batches = array_chunk( $items, 500 ); // avoid tying up the server
+ }
+ $failed = 0;
+ $pushed = 0;
+ foreach ( $batches as $itemBatch ) {
+ $added = $this->pushBlobs( $conn, $itemBatch );
+ if ( is_int( $added ) ) {
+ $pushed += $added;
+ } else {
+ $failed += count( $itemBatch );
+ }
+ }
+ if ( $failed > 0 ) {
+ wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." );
+
+ throw new RedisException( "Could not insert {$failed} {$this->type} job(s)." );
+ }
+ JobQueue::incrStats( 'job-insert', $this->type, count( $items ), $this->wiki );
+ JobQueue::incrStats( 'job-insert-duplicate', $this->type,
+ count( $items ) - $failed - $pushed, $this->wiki );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+ }
+
+ /**
+ * @param RedisConnRef $conn
+ * @param array $items List of results from JobQueueRedis::getNewJobFields()
+ * @return int Number of jobs inserted (duplicates are ignored)
+ * @throws RedisException
+ */
+ protected function pushBlobs( RedisConnRef $conn, array $items ) {
+ $args = array(); // ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] )
+ foreach ( $items as $item ) {
+ $args[] = (string)$item['uuid'];
+ $args[] = (string)$item['sha1'];
+ $args[] = (string)$item['rtimestamp'];
+ $args[] = (string)$this->serialize( $item );
+ }
+ static $script =
+<<<LUA
+ local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData = unpack(KEYS)
+ if #ARGV % 4 ~= 0 then return redis.error_reply('Unmatched arguments') end
+ local pushed = 0
+ for i = 1,#ARGV,4 do
+ local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3]
+ if sha1 == '' or redis.call('hExists',kIdBySha1,sha1) == 0 then
+ if 1*rtimestamp > 0 then
+ -- Insert into delayed queue (release time as score)
+ redis.call('zAdd',kDelayed,rtimestamp,id)
+ else
+ -- Insert into unclaimed queue
+ redis.call('lPush',kUnclaimed,id)
+ end
+ if sha1 ~= '' then
+ redis.call('hSet',kSha1ById,id,sha1)
+ redis.call('hSet',kIdBySha1,sha1,id)
+ end
+ redis.call('hSet',kData,id,blob)
+ pushed = pushed + 1
+ end
+ end
+ return pushed
+LUA;
+ return $conn->luaEval( $script,
+ array_merge(
+ array(
+ $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
+ $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
+ $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
+ $this->getQueueKey( 'z-delayed' ), # KEYS[4]
+ $this->getQueueKey( 'h-data' ), # KEYS[5]
+ ),
+ $args
+ ),
+ 5 # number of first argument(s) that are keys
+ );
+ }
+
+ /**
+ * @see JobQueue::doPop()
+ * @return Job|bool
+ * @throws JobQueueError
+ */
+ protected function doPop() {
+ $job = false;
+
+ // Push ready delayed jobs into the queue every 10 jobs to spread the load.
+ // This is also done as a periodic task, but we don't want too much done at once.
+ if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) {
+ $this->recyclePruneAndUndelayJobs();
+ }
+
+ $conn = $this->getConnection();
+ try {
+ do {
+ if ( $this->claimTTL > 0 ) {
+ // Keep the claimed job list down for high-traffic queues
+ if ( mt_rand( 0, 99 ) == 0 ) {
+ $this->recyclePruneAndUndelayJobs();
+ }
+ $blob = $this->popAndAcquireBlob( $conn );
+ } else {
+ $blob = $this->popAndDeleteBlob( $conn );
+ }
+ if ( !is_string( $blob ) ) {
+ break; // no jobs; nothing to do
+ }
+
+ JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki );
+ $item = $this->unserialize( $blob );
+ if ( $item === false ) {
+ wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." );
+ continue;
+ }
+
+ // If $item is invalid, recyclePruneAndUndelayJobs() will cleanup as needed
+ $job = $this->getJobFromFields( $item ); // may be false
+ } while ( !$job ); // job may be false if invalid
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+
+ return $job;
+ }
+
+ /**
+ * @param RedisConnRef $conn
+ * @return array Serialized string or false
+ * @throws RedisException
+ */
+ protected function popAndDeleteBlob( RedisConnRef $conn ) {
+ static $script =
+<<<LUA
+ local kUnclaimed, kSha1ById, kIdBySha1, kData = unpack(KEYS)
+ -- Pop an item off the queue
+ local id = redis.call('rpop',kUnclaimed)
+ if not id then return false end
+ -- Get the job data and remove it
+ local item = redis.call('hGet',kData,id)
+ redis.call('hDel',kData,id)
+ -- Allow new duplicates of this job
+ local sha1 = redis.call('hGet',kSha1ById,id)
+ if sha1 then redis.call('hDel',kIdBySha1,sha1) end
+ redis.call('hDel',kSha1ById,id)
+ -- Return the job data
+ return item
+LUA;
+ return $conn->luaEval( $script,
+ array(
+ $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
+ $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
+ $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
+ $this->getQueueKey( 'h-data' ), # KEYS[4]
+ ),
+ 4 # number of first argument(s) that are keys
+ );
+ }
+
+ /**
+ * @param RedisConnRef $conn
+ * @return array Serialized string or false
+ * @throws RedisException
+ */
+ protected function popAndAcquireBlob( RedisConnRef $conn ) {
+ static $script =
+<<<LUA
+ local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS)
+ -- Pop an item off the queue
+ local id = redis.call('rPop',kUnclaimed)
+ if not id then return false end
+ -- Allow new duplicates of this job
+ local sha1 = redis.call('hGet',kSha1ById,id)
+ if sha1 then redis.call('hDel',kIdBySha1,sha1) end
+ redis.call('hDel',kSha1ById,id)
+ -- Mark the jobs as claimed and return it
+ redis.call('zAdd',kClaimed,ARGV[1],id)
+ redis.call('hIncrBy',kAttempts,id,1)
+ return redis.call('hGet',kData,id)
+LUA;
+ return $conn->luaEval( $script,
+ array(
+ $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
+ $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
+ $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
+ $this->getQueueKey( 'z-claimed' ), # KEYS[4]
+ $this->getQueueKey( 'h-attempts' ), # KEYS[5]
+ $this->getQueueKey( 'h-data' ), # KEYS[6]
+ time(), # ARGV[1] (injected to be replication-safe)
+ ),
+ 6 # number of first argument(s) that are keys
+ );
+ }
+
+ /**
+ * @see JobQueue::doAck()
+ * @param Job $job
+ * @return Job|bool
+ * @throws MWException|JobQueueError
+ */
+ protected function doAck( Job $job ) {
+ if ( !isset( $job->metadata['uuid'] ) ) {
+ throw new MWException( "Job of type '{$job->getType()}' has no UUID." );
+ }
+ if ( $this->claimTTL > 0 ) {
+ $conn = $this->getConnection();
+ try {
+ static $script =
+<<<LUA
+ local kClaimed, kAttempts, kData = unpack(KEYS)
+ -- Unmark the job as claimed
+ redis.call('zRem',kClaimed,ARGV[1])
+ redis.call('hDel',kAttempts,ARGV[1])
+ -- Delete the job data itself
+ return redis.call('hDel',kData,ARGV[1])
+LUA;
+ $res = $conn->luaEval( $script,
+ array(
+ $this->getQueueKey( 'z-claimed' ), # KEYS[1]
+ $this->getQueueKey( 'h-attempts' ), # KEYS[2]
+ $this->getQueueKey( 'h-data' ), # KEYS[3]
+ $job->metadata['uuid'] # ARGV[1]
+ ),
+ 3 # number of first argument(s) that are keys
+ );
+
+ if ( !$res ) {
+ wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." );
+
+ return false;
+ }
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * @see JobQueue::doDeduplicateRootJob()
+ * @param Job $job
+ * @return bool
+ * @throws MWException|JobQueueError
+ */
+ protected function doDeduplicateRootJob( Job $job ) {
+ if ( !$job->hasRootJobParams() ) {
+ throw new MWException( "Cannot register root job; missing parameters." );
+ }
+ $params = $job->getRootJobParams();
+
+ $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
+
+ $conn = $this->getConnection();
+ try {
+ $timestamp = $conn->get( $key ); // current last timestamp of this job
+ if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
+ return true; // a newer version of this root job was enqueued
+ }
+
+ // Update the timestamp of the last root job started at the location...
+ return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+ }
+
+ /**
+ * @see JobQueue::doIsRootJobOldDuplicate()
+ * @param Job $job
+ * @return bool
+ * @throws JobQueueError
+ */
+ protected function doIsRootJobOldDuplicate( Job $job ) {
+ if ( !$job->hasRootJobParams() ) {
+ return false; // job has no de-deplication info
+ }
+ $params = $job->getRootJobParams();
+
+ $conn = $this->getConnection();
+ try {
+ // Get the last time this root job was enqueued
+ $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+
+ // Check if a new root job was started at the location after this one's...
+ return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
+ }
+
+ /**
+ * @see JobQueue::doDelete()
+ * @return bool
+ * @throws JobQueueError
+ */
+ protected function doDelete() {
+ static $props = array( 'l-unclaimed', 'z-claimed', 'z-abandoned',
+ 'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' );
+
+ $conn = $this->getConnection();
+ try {
+ $keys = array();
+ foreach ( $props as $prop ) {
+ $keys[] = $this->getQueueKey( $prop );
+ }
+
+ return ( $conn->delete( $keys ) !== false );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+ }
+
+ /**
+ * @see JobQueue::getAllQueuedJobs()
+ * @return Iterator
+ */
+ public function getAllQueuedJobs() {
+ $conn = $this->getConnection();
+ try {
+ $that = $this;
+
+ return new MappedIterator(
+ $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ),
+ function ( $uid ) use ( $that, $conn ) {
+ return $that->getJobFromUidInternal( $uid, $conn );
+ },
+ array( 'accept' => function ( $job ) {
+ return is_object( $job );
+ } )
+ );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+ }
+
+ /**
+ * @see JobQueue::getAllQueuedJobs()
+ * @return Iterator
+ */
+ public function getAllDelayedJobs() {
+ $conn = $this->getConnection();
+ try {
+ $that = $this;
+
+ return new MappedIterator( // delayed jobs
+ $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ),
+ function ( $uid ) use ( $that, $conn ) {
+ return $that->getJobFromUidInternal( $uid, $conn );
+ },
+ array( 'accept' => function ( $job ) {
+ return is_object( $job );
+ } )
+ );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+ }
+
+ public function getCoalesceLocationInternal() {
+ return "RedisServer:" . $this->server;
+ }
+
+ protected function doGetSiblingQueuesWithJobs( array $types ) {
+ return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) );
+ }
+
+ protected function doGetSiblingQueueSizes( array $types ) {
+ $sizes = array(); // (type => size)
+ $types = array_values( $types ); // reindex
+ $conn = $this->getConnection();
+ try {
+ $conn->multi( Redis::PIPELINE );
+ foreach ( $types as $type ) {
+ $conn->lSize( $this->getQueueKey( 'l-unclaimed', $type ) );
+ }
+ $res = $conn->exec();
+ if ( is_array( $res ) ) {
+ foreach ( $res as $i => $size ) {
+ $sizes[$types[$i]] = $size;
+ }
+ }
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+
+ return $sizes;
+ }
+
+ /**
+ * This function should not be called outside JobQueueRedis
+ *
+ * @param string $uid
+ * @param RedisConnRef $conn
+ * @return Job|bool Returns false if the job does not exist
+ * @throws MWException|JobQueueError
+ */
+ public function getJobFromUidInternal( $uid, RedisConnRef $conn ) {
+ try {
+ $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid );
+ if ( $data === false ) {
+ return false; // not found
+ }
+ $item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) );
+ if ( !is_array( $item ) ) { // this shouldn't happen
+ throw new MWException( "Could not find job with ID '$uid'." );
+ }
+ $title = Title::makeTitle( $item['namespace'], $item['title'] );
+ $job = Job::factory( $item['type'], $title, $item['params'] );
+ $job->metadata['uuid'] = $item['uuid'];
+
+ return $job;
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+ }
+
+ /**
+ * Recycle or destroy any jobs that have been claimed for too long
+ * and release any ready delayed jobs into the queue
+ *
+ * @return int Number of jobs recycled/deleted/undelayed
+ * @throws MWException|JobQueueError
+ */
+ public function recyclePruneAndUndelayJobs() {
+ $count = 0;
+ // For each job item that can be retried, we need to add it back to the
+ // main queue and remove it from the list of currenty claimed job items.
+ // For those that cannot, they are marked as dead and kept around for
+ // investigation and manual job restoration but are eventually deleted.
+ $conn = $this->getConnection();
+ try {
+ $now = time();
+ static $script =
+<<<LUA
+ local kClaimed, kAttempts, kUnclaimed, kData, kAbandoned, kDelayed = unpack(KEYS)
+ local released,abandoned,pruned,undelayed = 0,0,0,0
+ -- Get all non-dead jobs that have an expired claim on them.
+ -- The score for each item is the last claim timestamp (UNIX).
+ local staleClaims = redis.call('zRangeByScore',kClaimed,0,ARGV[1])
+ for k,id in ipairs(staleClaims) do
+ local timestamp = redis.call('zScore',kClaimed,id)
+ local attempts = redis.call('hGet',kAttempts,id)
+ if attempts < ARGV[3] then
+ -- Claim expired and retries left: re-enqueue the job
+ redis.call('lPush',kUnclaimed,id)
+ redis.call('hIncrBy',kAttempts,id,1)
+ released = released + 1
+ else
+ -- Claim expired and no retries left: mark the job as dead
+ redis.call('zAdd',kAbandoned,timestamp,id)
+ abandoned = abandoned + 1
+ end
+ redis.call('zRem',kClaimed,id)
+ end
+ -- Get all of the dead jobs that have been marked as dead for too long.
+ -- The score for each item is the last claim timestamp (UNIX).
+ local deadClaims = redis.call('zRangeByScore',kAbandoned,0,ARGV[2])
+ for k,id in ipairs(deadClaims) do
+ -- Stale and out of retries: remove any traces of the job
+ redis.call('zRem',kAbandoned,id)
+ redis.call('hDel',kAttempts,id)
+ redis.call('hDel',kData,id)
+ pruned = pruned + 1
+ end
+ -- Get the list of ready delayed jobs, sorted by readiness (UNIX timestamp)
+ local ids = redis.call('zRangeByScore',kDelayed,0,ARGV[4])
+ -- Migrate the jobs from the "delayed" set to the "unclaimed" list
+ for k,id in ipairs(ids) do
+ redis.call('lPush',kUnclaimed,id)
+ redis.call('zRem',kDelayed,id)
+ end
+ undelayed = #ids
+ return {released,abandoned,pruned,undelayed}
+LUA;
+ $res = $conn->luaEval( $script,
+ array(
+ $this->getQueueKey( 'z-claimed' ), # KEYS[1]
+ $this->getQueueKey( 'h-attempts' ), # KEYS[2]
+ $this->getQueueKey( 'l-unclaimed' ), # KEYS[3]
+ $this->getQueueKey( 'h-data' ), # KEYS[4]
+ $this->getQueueKey( 'z-abandoned' ), # KEYS[5]
+ $this->getQueueKey( 'z-delayed' ), # KEYS[6]
+ $now - $this->claimTTL, # ARGV[1]
+ $now - self::MAX_AGE_PRUNE, # ARGV[2]
+ $this->maxTries, # ARGV[3]
+ $now # ARGV[4]
+ ),
+ 6 # number of first argument(s) that are keys
+ );
+ if ( $res ) {
+ list( $released, $abandoned, $pruned, $undelayed ) = $res;
+ $count += $released + $pruned + $undelayed;
+ JobQueue::incrStats( 'job-recycle', $this->type, $released, $this->wiki );
+ JobQueue::incrStats( 'job-abandon', $this->type, $abandoned, $this->wiki );
+ JobQueue::incrStats( 'job-undelay', $this->type, $undelayed, $this->wiki );
+ }
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+
+ return $count;
+ }
+
+ /**
+ * @return array
+ */
+ protected function doGetPeriodicTasks() {
+ if ( $this->daemonized ) {
+ return array(); // managed in the runner loop
+ }
+ $periods = array( 3600 ); // standard cleanup (useful on config change)
+ if ( $this->claimTTL > 0 ) {
+ $periods[] = ceil( $this->claimTTL / 2 ); // avoid bad timing
+ }
+ if ( $this->checkDelay ) {
+ $periods[] = 300; // 5 minutes
+ }
+ $period = min( $periods );
+ $period = max( $period, 30 ); // sanity
+
+ return array(
+ 'recyclePruneAndUndelayJobs' => array(
+ 'callback' => array( $this, 'recyclePruneAndUndelayJobs' ),
+ 'period' => $period,
+ )
+ );
+ }
+
+ /**
+ * @param IJobSpecification $job
+ * @return array
+ */
+ protected function getNewJobFields( IJobSpecification $job ) {
+ return array(
+ // Fields that describe the nature of the job
+ 'type' => $job->getType(),
+ 'namespace' => $job->getTitle()->getNamespace(),
+ 'title' => $job->getTitle()->getDBkey(),
+ 'params' => $job->getParams(),
+ // Some jobs cannot run until a "release timestamp"
+ 'rtimestamp' => $job->getReleaseTimestamp() ?: 0,
+ // Additional job metadata
+ 'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ),
+ 'sha1' => $job->ignoreDuplicates()
+ ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 )
+ : '',
+ 'timestamp' => time() // UNIX timestamp
+ );
+ }
+
+ /**
+ * @param array $fields
+ * @return Job|bool
+ */
+ protected function getJobFromFields( array $fields ) {
+ $title = Title::makeTitleSafe( $fields['namespace'], $fields['title'] );
+ if ( $title ) {
+ $job = Job::factory( $fields['type'], $title, $fields['params'] );
+ $job->metadata['uuid'] = $fields['uuid'];
+
+ return $job;
+ }
+
+ return false;
+ }
+
+ /**
+ * @param array $fields
+ * @return string Serialized and possibly compressed version of $fields
+ */
+ protected function serialize( array $fields ) {
+ $blob = serialize( $fields );
+ if ( $this->compression === 'gzip'
+ && strlen( $blob ) >= 1024
+ && function_exists( 'gzdeflate' )
+ ) {
+ $object = (object)array( 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' );
+ $blobz = serialize( $object );
+
+ return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob;
+ } else {
+ return $blob;
+ }
+ }
+
+ /**
+ * @param string $blob
+ * @return array|bool Unserialized version of $blob or false
+ */
+ protected function unserialize( $blob ) {
+ $fields = unserialize( $blob );
+ if ( is_object( $fields ) ) {
+ if ( $fields->enc === 'gzip' && function_exists( 'gzinflate' ) ) {
+ $fields = unserialize( gzinflate( $fields->blob ) );
+ } else {
+ $fields = false;
+ }
+ }
+
+ return is_array( $fields ) ? $fields : false;
+ }
+
+ /**
+ * Get a connection to the server that handles all sub-queues for this queue
+ *
+ * @return RedisConnRef
+ * @throws JobQueueConnectionError
+ */
+ protected function getConnection() {
+ $conn = $this->redisPool->getConnection( $this->server );
+ if ( !$conn ) {
+ throw new JobQueueConnectionError( "Unable to connect to redis server." );
+ }
+
+ return $conn;
+ }
+
+ /**
+ * @param RedisConnRef $conn
+ * @param RedisException $e
+ * @throws JobQueueError
+ */
+ protected function throwRedisException( RedisConnRef $conn, $e ) {
+ $this->redisPool->handleError( $conn, $e );
+ throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" );
+ }
+
+ /**
+ * @param string $prop
+ * @param string|null $type
+ * @return string
+ */
+ private function getQueueKey( $prop, $type = null ) {
+ $type = is_string( $type ) ? $type : $this->type;
+ list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+ if ( strlen( $this->key ) ) { // namespaced queue (for testing)
+ return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $this->key, $prop );
+ } else {
+ return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $prop );
+ }
+ }
+
+ /**
+ * @param string $key
+ * @return void
+ */
+ public function setTestingPrefix( $key ) {
+ $this->key = $key;
+ }
+}
diff --git a/includes/jobqueue/JobRunner.php b/includes/jobqueue/JobRunner.php
new file mode 100644
index 00000000..8cccedaf
--- /dev/null
+++ b/includes/jobqueue/JobRunner.php
@@ -0,0 +1,350 @@
+<?php
+/**
+ * Job queue runner utility methods
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+
+/**
+ * Job queue runner utility methods
+ *
+ * @ingroup JobQueue
+ * @since 1.24
+ */
+class JobRunner {
+ /** @var callable|null Debug output handler */
+ protected $debug;
+
+ /**
+ * @param callable $debug Optional debug output handler
+ */
+ public function setDebugHandler( $debug ) {
+ $this->debug = $debug;
+ }
+
+ /**
+ * Run jobs of the specified number/type for the specified time
+ *
+ * The response map has a 'job' field that lists status of each job, including:
+ * - type : the job type
+ * - status : ok/failed
+ * - error : any error message string
+ * - time : the job run time in ms
+ * The response map also has:
+ * - backoffs : the (job type => seconds) map of backoff times
+ * - elapsed : the total time spent running tasks in ms
+ * - reached : the reason the script finished, one of (none-ready, job-limit, time-limit)
+ *
+ * This method outputs status information only if a debug handler was set.
+ * Any exceptions are caught and logged, but are not reported as output.
+ *
+ * @param array $options Map of parameters:
+ * - type : the job type (or false for the default types)
+ * - maxJobs : maximum number of jobs to run
+ * - maxTime : maximum time in seconds before stopping
+ * - throttle : whether to respect job backoff configuration
+ * @return array Summary response that can easily be JSON serialized
+ */
+ public function run( array $options ) {
+ $response = array( 'jobs' => array(), 'reached' => 'none-ready' );
+
+ $type = isset( $options['type'] ) ? $options['type'] : false;
+ $maxJobs = isset( $options['maxJobs'] ) ? $options['maxJobs'] : false;
+ $maxTime = isset( $options['maxTime'] ) ? $options['maxTime'] : false;
+ $noThrottle = isset( $options['throttle'] ) && !$options['throttle'];
+
+ $group = JobQueueGroup::singleton();
+ // Handle any required periodic queue maintenance
+ $count = $group->executeReadyPeriodicTasks();
+ if ( $count > 0 ) {
+ $this->runJobsLog( "Executed $count periodic queue task(s)." );
+ }
+
+ // Flush any pending DB writes for sanity
+ wfGetLBFactory()->commitMasterChanges();
+
+ // Some jobs types should not run until a certain timestamp
+ $backoffs = array(); // map of (type => UNIX expiry)
+ $backoffDeltas = array(); // map of (type => seconds)
+ $wait = 'wait'; // block to read backoffs the first time
+
+ $jobsRun = 0;
+ $timeMsTotal = 0;
+ $flags = JobQueueGroup::USE_CACHE;
+ $startTime = microtime( true ); // time since jobs started running
+ $lastTime = microtime( true ); // time since last slave check
+ do {
+ // Sync the persistent backoffs with concurrent runners
+ $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
+ $blacklist = $noThrottle ? array() : array_keys( $backoffs );
+ $wait = 'nowait'; // less important now
+
+ if ( $type === false ) {
+ $job = $group->pop( JobQueueGroup::TYPE_DEFAULT, $flags, $blacklist );
+ } elseif ( in_array( $type, $blacklist ) ) {
+ $job = false; // requested queue in backoff state
+ } else {
+ $job = $group->pop( $type ); // job from a single queue
+ }
+
+ if ( $job ) { // found a job
+ $jType = $job->getType();
+
+ // Back off of certain jobs for a while (for throttling and for errors)
+ $ttw = $this->getBackoffTimeToWait( $job );
+ if ( $ttw > 0 ) {
+ // Always add the delta for other runners in case the time running the
+ // job negated the backoff for each individually but not collectively.
+ $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
+ ? $backoffDeltas[$jType] + $ttw
+ : $ttw;
+ $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
+ }
+
+ $this->runJobsLog( $job->toString() . " STARTING" );
+
+ // Run the job...
+ wfProfileIn( __METHOD__ . '-' . get_class( $job ) );
+ $jobStartTime = microtime( true );
+ try {
+ ++$jobsRun;
+ $status = $job->run();
+ $error = $job->getLastError();
+ wfGetLBFactory()->commitMasterChanges();
+ } catch ( MWException $e ) {
+ MWExceptionHandler::rollbackMasterChangesAndLog( $e );
+ $status = false;
+ $error = get_class( $e ) . ': ' . $e->getMessage();
+ MWExceptionHandler::logException( $e );
+ }
+ $timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 );
+ wfProfileOut( __METHOD__ . '-' . get_class( $job ) );
+ $timeMsTotal += $timeMs;
+
+ // Mark the job as done on success or when the job cannot be retried
+ if ( $status !== false || !$job->allowRetries() ) {
+ $group->ack( $job ); // done
+ }
+
+ // Back off of certain jobs for a while (for throttling and for errors)
+ if ( $status === false && mt_rand( 0, 49 ) == 0 ) {
+ $ttw = max( $ttw, 30 ); // too many errors
+ $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
+ ? $backoffDeltas[$jType] + $ttw
+ : $ttw;
+ }
+
+ if ( $status === false ) {
+ $this->runJobsLog( $job->toString() . " t=$timeMs error={$error}" );
+ } else {
+ $this->runJobsLog( $job->toString() . " t=$timeMs good" );
+ }
+
+ $response['jobs'][] = array(
+ 'type' => $jType,
+ 'status' => ( $status === false ) ? 'failed' : 'ok',
+ 'error' => $error,
+ 'time' => $timeMs
+ );
+
+ // Break out if we hit the job count or wall time limits...
+ if ( $maxJobs && $jobsRun >= $maxJobs ) {
+ $response['reached'] = 'job-limit';
+ break;
+ } elseif ( $maxTime && ( microtime( true ) - $startTime ) > $maxTime ) {
+ $response['reached'] = 'time-limit';
+ break;
+ }
+
+ // Don't let any of the main DB slaves get backed up
+ $timePassed = microtime( true ) - $lastTime;
+ if ( $timePassed >= 5 || $timePassed < 0 ) {
+ wfWaitForSlaves( $lastTime );
+ $lastTime = microtime( true );
+ }
+ // Don't let any queue slaves/backups fall behind
+ if ( $jobsRun > 0 && ( $jobsRun % 100 ) == 0 ) {
+ $group->waitForBackups();
+ }
+
+ // Bail if near-OOM instead of in a job
+ $this->assertMemoryOK();
+ }
+ } while ( $job ); // stop when there are no jobs
+
+ // Sync the persistent backoffs for the next runJobs.php pass
+ if ( $backoffDeltas ) {
+ $this->syncBackoffDeltas( $backoffs, $backoffDeltas, 'wait' );
+ }
+
+ $response['backoffs'] = $backoffs;
+ $response['elapsed'] = $timeMsTotal;
+
+ return $response;
+ }
+
+ /**
+ * @param Job $job
+ * @return int Seconds for this runner to avoid doing more jobs of this type
+ * @see $wgJobBackoffThrottling
+ */
+ private function getBackoffTimeToWait( Job $job ) {
+ global $wgJobBackoffThrottling;
+
+ if ( !isset( $wgJobBackoffThrottling[$job->getType()] ) ||
+ $job instanceof DuplicateJob // no work was done
+ ) {
+ return 0; // not throttled
+ }
+
+ $itemsPerSecond = $wgJobBackoffThrottling[$job->getType()];
+ if ( $itemsPerSecond <= 0 ) {
+ return 0; // not throttled
+ }
+
+ $seconds = 0;
+ if ( $job->workItemCount() > 0 ) {
+ $exactSeconds = $job->workItemCount() / $itemsPerSecond;
+ // use randomized rounding
+ $seconds = floor( $exactSeconds );
+ $remainder = $exactSeconds - $seconds;
+ $seconds += ( mt_rand() / mt_getrandmax() < $remainder ) ? 1 : 0;
+ }
+
+ return (int)$seconds;
+ }
+
+ /**
+ * Get the previous backoff expiries from persistent storage
+ * On I/O or lock acquisition failure this returns the original $backoffs.
+ *
+ * @param array $backoffs Map of (job type => UNIX timestamp)
+ * @param string $mode Lock wait mode - "wait" or "nowait"
+ * @return array Map of (job type => backoff expiry timestamp)
+ */
+ private function loadBackoffs( array $backoffs, $mode = 'wait' ) {
+ $section = new ProfileSection( __METHOD__ );
+
+ $file = wfTempDir() . '/mw-runJobs-backoffs.json';
+ if ( is_file( $file ) ) {
+ $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
+ $handle = fopen( $file, 'rb' );
+ if ( !flock( $handle, LOCK_SH | $noblock ) ) {
+ fclose( $handle );
+ return $backoffs; // don't wait on lock
+ }
+ $content = stream_get_contents( $handle );
+ flock( $handle, LOCK_UN );
+ fclose( $handle );
+ $ctime = microtime( true );
+ $cBackoffs = json_decode( $content, true ) ?: array();
+ foreach ( $cBackoffs as $type => $timestamp ) {
+ if ( $timestamp < $ctime ) {
+ unset( $cBackoffs[$type] );
+ }
+ }
+ } else {
+ $cBackoffs = array();
+ }
+
+ return $cBackoffs;
+ }
+
+ /**
+ * Merge the current backoff expiries from persistent storage
+ *
+ * The $deltas map is set to an empty array on success.
+ * On I/O or lock acquisition failure this returns the original $backoffs.
+ *
+ * @param array $backoffs Map of (job type => UNIX timestamp)
+ * @param array $deltas Map of (job type => seconds)
+ * @param string $mode Lock wait mode - "wait" or "nowait"
+ * @return array The new backoffs account for $backoffs and the latest file data
+ */
+ private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) {
+ $section = new ProfileSection( __METHOD__ );
+
+ if ( !$deltas ) {
+ return $this->loadBackoffs( $backoffs, $mode );
+ }
+
+ $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
+ $file = wfTempDir() . '/mw-runJobs-backoffs.json';
+ $handle = fopen( $file, 'wb+' );
+ if ( !flock( $handle, LOCK_EX | $noblock ) ) {
+ fclose( $handle );
+ return $backoffs; // don't wait on lock
+ }
+ $ctime = microtime( true );
+ $content = stream_get_contents( $handle );
+ $cBackoffs = json_decode( $content, true ) ?: array();
+ foreach ( $deltas as $type => $seconds ) {
+ $cBackoffs[$type] = isset( $cBackoffs[$type] ) && $cBackoffs[$type] >= $ctime
+ ? $cBackoffs[$type] + $seconds
+ : $ctime + $seconds;
+ }
+ foreach ( $cBackoffs as $type => $timestamp ) {
+ if ( $timestamp < $ctime ) {
+ unset( $cBackoffs[$type] );
+ }
+ }
+ ftruncate( $handle, 0 );
+ fwrite( $handle, json_encode( $cBackoffs ) );
+ flock( $handle, LOCK_UN );
+ fclose( $handle );
+
+ $deltas = array();
+
+ return $cBackoffs;
+ }
+
+ /**
+ * Make sure that this script is not too close to the memory usage limit.
+ * It is better to die in between jobs than OOM right in the middle of one.
+ * @throws MWException
+ */
+ private function assertMemoryOK() {
+ static $maxBytes = null;
+ if ( $maxBytes === null ) {
+ $m = array();
+ if ( preg_match( '!^(\d+)(k|m|g|)$!i', ini_get( 'memory_limit' ), $m ) ) {
+ list( , $num, $unit ) = $m;
+ $conv = array( 'g' => 1073741824, 'm' => 1048576, 'k' => 1024, '' => 1 );
+ $maxBytes = $num * $conv[strtolower( $unit )];
+ } else {
+ $maxBytes = 0;
+ }
+ }
+ $usedBytes = memory_get_usage();
+ if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) {
+ throw new MWException( "Detected excessive memory usage ($usedBytes/$maxBytes)." );
+ }
+ }
+
+ /**
+ * Log the job message
+ * @param string $msg The message to log
+ */
+ private function runJobsLog( $msg ) {
+ if ( $this->debug ) {
+ call_user_func_array( $this->debug, array( wfTimestamp( TS_DB ) . " $msg\n" ) );
+ }
+ wfDebugLog( 'runJobs', $msg );
+ }
+}
diff --git a/includes/jobqueue/JobSpecification.php b/includes/jobqueue/JobSpecification.php
new file mode 100644
index 00000000..9fa7747f
--- /dev/null
+++ b/includes/jobqueue/JobSpecification.php
@@ -0,0 +1,189 @@
+<?php
+/**
+ * Job queue task description base code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+
+/**
+ * Job queue task description interface
+ *
+ * @ingroup JobQueue
+ * @since 1.23
+ */
+interface IJobSpecification {
+ /**
+ * @return string Job type
+ */
+ public function getType();
+
+ /**
+ * @return array
+ */
+ public function getParams();
+
+ /**
+ * @return int|null UNIX timestamp to delay running this job until, otherwise null
+ */
+ public function getReleaseTimestamp();
+
+ /**
+ * @return bool Whether only one of each identical set of jobs should be run
+ */
+ public function ignoreDuplicates();
+
+ /**
+ * Subclasses may need to override this to make duplication detection work.
+ * The resulting map conveys everything that makes the job unique. This is
+ * only checked if ignoreDuplicates() returns true, meaning that duplicate
+ * jobs are supposed to be ignored.
+ *
+ * @return array Map of key/values
+ */
+ public function getDeduplicationInfo();
+
+ /**
+ * @return Title Descriptive title (this can simply be informative)
+ */
+ public function getTitle();
+}
+
+/**
+ * Job queue task description base code
+ *
+ * Example usage:
+ * <code>
+ * $job = new JobSpecification(
+ * 'null',
+ * array( 'lives' => 1, 'usleep' => 100, 'pi' => 3.141569 ),
+ * array( 'removeDuplicates' => 1 ),
+ * Title::makeTitle( NS_SPECIAL, 'nullity' )
+ * );
+ * JobQueueGroup::singleton()->push( $job )
+ * </code>
+ *
+ * @ingroup JobQueue
+ * @since 1.23
+ */
+class JobSpecification implements IJobSpecification {
+ /** @var string */
+ protected $type;
+
+ /** @var array Array of job parameters or false if none */
+ protected $params;
+
+ /** @var Title */
+ protected $title;
+
+ /** @var bool Expensive jobs may set this to true */
+ protected $ignoreDuplicates;
+
+ /**
+ * @param string $type
+ * @param array $params Map of key/values
+ * @param array $opts Map of key/values
+ * @param Title $title Optional descriptive title
+ */
+ public function __construct(
+ $type, array $params, array $opts = array(), Title $title = null
+ ) {
+ $this->validateParams( $params );
+
+ $this->type = $type;
+ $this->params = $params;
+ $this->title = $title ?: Title::newMainPage();
+ $this->ignoreDuplicates = !empty( $opts['removeDuplicates'] );
+ }
+
+ /**
+ * @param array $params
+ */
+ protected function validateParams( array $params ) {
+ foreach ( $params as $p => $v ) {
+ if ( is_array( $v ) ) {
+ $this->validateParams( $v );
+ } elseif ( !is_scalar( $v ) && $v !== null ) {
+ throw new UnexpectedValueException( "Job parameter $p is not JSON serializable." );
+ }
+ }
+ }
+
+ /**
+ * @return string
+ */
+ public function getType() {
+ return $this->type;
+ }
+
+ /**
+ * @return Title
+ */
+ public function getTitle() {
+ return $this->title;
+ }
+
+ /**
+ * @return array
+ */
+ public function getParams() {
+ return $this->params;
+ }
+
+ /**
+ * @return int|null UNIX timestamp to delay running this job until, otherwise null
+ */
+ public function getReleaseTimestamp() {
+ return isset( $this->params['jobReleaseTimestamp'] )
+ ? wfTimestampOrNull( TS_UNIX, $this->params['jobReleaseTimestamp'] )
+ : null;
+ }
+
+ /**
+ * @return bool Whether only one of each identical set of jobs should be run
+ */
+ public function ignoreDuplicates() {
+ return $this->ignoreDuplicates;
+ }
+
+ /**
+ * Subclasses may need to override this to make duplication detection work.
+ * The resulting map conveys everything that makes the job unique. This is
+ * only checked if ignoreDuplicates() returns true, meaning that duplicate
+ * jobs are supposed to be ignored.
+ *
+ * @return array Map of key/values
+ */
+ public function getDeduplicationInfo() {
+ $info = array(
+ 'type' => $this->getType(),
+ 'namespace' => $this->getTitle()->getNamespace(),
+ 'title' => $this->getTitle()->getDBkey(),
+ 'params' => $this->getParams()
+ );
+ if ( is_array( $info['params'] ) ) {
+ // Identical jobs with different "root" jobs should count as duplicates
+ unset( $info['params']['rootJobSignature'] );
+ unset( $info['params']['rootJobTimestamp'] );
+ // Likewise for jobs with different delay times
+ unset( $info['params']['jobReleaseTimestamp'] );
+ }
+
+ return $info;
+ }
+}
diff --git a/includes/jobqueue/README b/includes/jobqueue/README
new file mode 100644
index 00000000..c11d5a78
--- /dev/null
+++ b/includes/jobqueue/README
@@ -0,0 +1,81 @@
+/*!
+\ingroup JobQueue
+\page jobqueue_design Job queue design
+
+Notes on the Job queuing system architecture.
+
+\section intro Introduction
+
+The data model consist of the following main components:
+* The Job object represents a particular deferred task that happens in the
+ background. All jobs subclass the Job object and put the main logic in the
+ function called run().
+* The JobQueue object represents a particular queue of jobs of a certain type.
+ For example there may be a queue for email jobs and a queue for squid purge
+ jobs.
+
+\section jobqueue Job queues
+
+Each job type has its own queue and is associated to a storage medium. One
+queue might save its jobs in redis while another one uses would use a database.
+
+Storage medium are defined in a queue class. Before using it, you must
+define in $wgJobTypeConf a mapping of the job type to a queue class.
+
+The factory class JobQueueGroup provides helper functions:
+- getting the queue for a given job
+- route new job insertions to the proper queue
+
+The following queue classes are available:
+* JobQueueDB (stores jobs in the `job` table in a database)
+* JobQueueRedis (stores jobs in a redis server)
+
+All queue classes support some basic operations (though some may be no-ops):
+* enqueueing a batch of jobs
+* dequeueing a single job
+* acknowledging a job is completed
+* checking if the queue is empty
+
+Some queue classes (like JobQueueDB) may dequeue jobs in random order while other
+queues might dequeue jobs in exact FIFO order. Callers should thus not assume jobs
+are executed in FIFO order.
+
+Also note that not all queue classes will have the same reliability guarantees.
+In-memory queues may lose data when restarted depending on snapshot and journal
+settings (including journal fsync() frequency). Some queue types may totally remove
+jobs when dequeued while leaving the ack() function as a no-op; if a job is
+dequeued by a job runner, which crashes before completion, the job will be
+lost. Some jobs, like purging squid caches after a template change, may not
+require durable queues, whereas other jobs might be more important.
+
+\section aggregator Job queue aggregator
+
+The aggregators are used by nextJobDB.php, which is a script that will return a
+random ready queue (on any wiki in the farm) that can be used with runJobs.php.
+This can be used in conjunction with any scripts that handle wiki farm job queues.
+Note that $wgLocalDatabases defines what wikis are in the wiki farm.
+
+Since each job type has its own queue, and wiki-farms may have many wikis,
+there might be a large number of queues to keep track of. To avoid wasting
+large amounts of time polling empty queues, aggregators exists to keep track
+of which queues are ready.
+
+The following queue aggregator classes are available:
+* JobQueueAggregatorMemc (uses $wgMemc to track ready queues)
+* JobQueueAggregatorRedis (uses a redis server to track ready queues)
+
+Some aggregators cache data for a few minutes while others may be always up to date.
+This can be an important factor for jobs that need a low pickup time (or latency).
+
+\section jobs Jobs
+
+Callers should also try to make jobs maintain correctness when executed twice.
+This is useful for queues that actually implement ack(), since they may recycle
+dequeued but un-acknowledged jobs back into the queue to be attempted again. If
+a runner dequeues a job, runs it, but then crashes before calling ack(), the
+job may be returned to the queue and run a second time. Jobs like cache purging can
+happen several times without any correctness problems. However, a pathological case
+would be if a bug causes the problem to systematically keep repeating. For example,
+a job may always throw a DB error at the end of run(). This problem is trickier to
+solve and more obnoxious for things like email jobs, for example. For such jobs,
+it might be useful to use a queue that does not retry jobs.
diff --git a/includes/jobqueue/aggregator/JobQueueAggregator.php b/includes/jobqueue/aggregator/JobQueueAggregator.php
new file mode 100644
index 00000000..8600eed9
--- /dev/null
+++ b/includes/jobqueue/aggregator/JobQueueAggregator.php
@@ -0,0 +1,162 @@
+<?php
+/**
+ * Job queue aggregator code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class to handle tracking information about all queues
+ *
+ * @ingroup JobQueue
+ * @since 1.21
+ */
+abstract class JobQueueAggregator {
+ /** @var JobQueueAggregator */
+ protected static $instance = null;
+
+ /**
+ * @param array $params
+ */
+ protected function __construct( array $params ) {
+ }
+
+ /**
+ * @throws MWException
+ * @return JobQueueAggregator
+ */
+ final public static function singleton() {
+ global $wgJobQueueAggregator;
+
+ if ( !isset( self::$instance ) ) {
+ $class = $wgJobQueueAggregator['class'];
+ $obj = new $class( $wgJobQueueAggregator );
+ if ( !( $obj instanceof JobQueueAggregator ) ) {
+ throw new MWException( "Class '$class' is not a JobQueueAggregator class." );
+ }
+ self::$instance = $obj;
+ }
+
+ return self::$instance;
+ }
+
+ /**
+ * Destroy the singleton instance
+ *
+ * @return void
+ */
+ final public static function destroySingleton() {
+ self::$instance = null;
+ }
+
+ /**
+ * Mark a queue as being empty
+ *
+ * @param string $wiki
+ * @param string $type
+ * @return bool Success
+ */
+ final public function notifyQueueEmpty( $wiki, $type ) {
+ wfProfileIn( __METHOD__ );
+ $ok = $this->doNotifyQueueEmpty( $wiki, $type );
+ wfProfileOut( __METHOD__ );
+
+ return $ok;
+ }
+
+ /**
+ * @see JobQueueAggregator::notifyQueueEmpty()
+ */
+ abstract protected function doNotifyQueueEmpty( $wiki, $type );
+
+ /**
+ * Mark a queue as being non-empty
+ *
+ * @param string $wiki
+ * @param string $type
+ * @return bool Success
+ */
+ final public function notifyQueueNonEmpty( $wiki, $type ) {
+ wfProfileIn( __METHOD__ );
+ $ok = $this->doNotifyQueueNonEmpty( $wiki, $type );
+ wfProfileOut( __METHOD__ );
+
+ return $ok;
+ }
+
+ /**
+ * @see JobQueueAggregator::notifyQueueNonEmpty()
+ */
+ abstract protected function doNotifyQueueNonEmpty( $wiki, $type );
+
+ /**
+ * Get the list of all of the queues with jobs
+ *
+ * @return array (job type => (list of wiki IDs))
+ */
+ final public function getAllReadyWikiQueues() {
+ wfProfileIn( __METHOD__ );
+ $res = $this->doGetAllReadyWikiQueues();
+ wfProfileOut( __METHOD__ );
+
+ return $res;
+ }
+
+ /**
+ * @see JobQueueAggregator::getAllReadyWikiQueues()
+ */
+ abstract protected function doGetAllReadyWikiQueues();
+
+ /**
+ * Purge all of the aggregator information
+ *
+ * @return bool Success
+ */
+ final public function purge() {
+ wfProfileIn( __METHOD__ );
+ $res = $this->doPurge();
+ wfProfileOut( __METHOD__ );
+
+ return $res;
+ }
+
+ /**
+ * @see JobQueueAggregator::purge()
+ */
+ abstract protected function doPurge();
+
+ /**
+ * Get all databases that have a pending job.
+ * This poll all the queues and is this expensive.
+ *
+ * @return array (job type => (list of wiki IDs))
+ */
+ protected function findPendingWikiQueues() {
+ global $wgLocalDatabases;
+
+ $pendingDBs = array(); // (job type => (db list))
+ foreach ( $wgLocalDatabases as $db ) {
+ foreach ( JobQueueGroup::singleton( $db )->getQueuesWithJobs() as $type ) {
+ $pendingDBs[$type][] = $db;
+ }
+ }
+
+ return $pendingDBs;
+ }
+}
diff --git a/includes/jobqueue/aggregator/JobQueueAggregatorMemc.php b/includes/jobqueue/aggregator/JobQueueAggregatorMemc.php
new file mode 100644
index 00000000..ae266ef3
--- /dev/null
+++ b/includes/jobqueue/aggregator/JobQueueAggregatorMemc.php
@@ -0,0 +1,125 @@
+<?php
+/**
+ * Job queue aggregator code that uses BagOStuff.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class to handle tracking information about all queues using BagOStuff
+ *
+ * @ingroup JobQueue
+ * @since 1.21
+ */
+class JobQueueAggregatorMemc extends JobQueueAggregator {
+ /** @var BagOStuff */
+ protected $cache;
+
+ protected $cacheTTL; // integer; seconds
+
+ /**
+ * @param array $params Possible keys:
+ * - objectCache : Name of an object cache registered in $wgObjectCaches.
+ * This defaults to the one specified by $wgMainCacheType.
+ * - cacheTTL : Seconds to cache the aggregate data before regenerating.
+ */
+ protected function __construct( array $params ) {
+ parent::__construct( $params );
+ $this->cache = isset( $params['objectCache'] )
+ ? wfGetCache( $params['objectCache'] )
+ : wfGetMainCache();
+ $this->cacheTTL = isset( $params['cacheTTL'] ) ? $params['cacheTTL'] : 180; // 3 min
+ }
+
+ /**
+ * @see JobQueueAggregator::doNotifyQueueEmpty()
+ */
+ protected function doNotifyQueueEmpty( $wiki, $type ) {
+ $key = $this->getReadyQueueCacheKey();
+ // Delist the queue from the "ready queue" list
+ if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock
+ $curInfo = $this->cache->get( $key );
+ if ( is_array( $curInfo ) && isset( $curInfo['pendingDBs'][$type] ) ) {
+ if ( in_array( $wiki, $curInfo['pendingDBs'][$type] ) ) {
+ $curInfo['pendingDBs'][$type] = array_diff(
+ $curInfo['pendingDBs'][$type], array( $wiki ) );
+ $this->cache->set( $key, $curInfo );
+ }
+ }
+ $this->cache->delete( "$key:lock" ); // unlock
+ }
+
+ return true;
+ }
+
+ /**
+ * @see JobQueueAggregator::doNotifyQueueNonEmpty()
+ */
+ protected function doNotifyQueueNonEmpty( $wiki, $type ) {
+ return true; // updated periodically
+ }
+
+ /**
+ * @see JobQueueAggregator::doAllGetReadyWikiQueues()
+ */
+ protected function doGetAllReadyWikiQueues() {
+ $key = $this->getReadyQueueCacheKey();
+ // If the cache entry wasn't present, is stale, or in .1% of cases otherwise,
+ // regenerate the cache. Use any available stale cache if another process is
+ // currently regenerating the pending DB information.
+ $pendingDbInfo = $this->cache->get( $key );
+ if ( !is_array( $pendingDbInfo )
+ || ( time() - $pendingDbInfo['timestamp'] ) > $this->cacheTTL
+ || mt_rand( 0, 999 ) == 0
+ ) {
+ if ( $this->cache->add( "$key:rebuild", 1, 1800 ) ) { // lock
+ $pendingDbInfo = array(
+ 'pendingDBs' => $this->findPendingWikiQueues(),
+ 'timestamp' => time()
+ );
+ for ( $attempts = 1; $attempts <= 25; ++$attempts ) {
+ if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock
+ $this->cache->set( $key, $pendingDbInfo );
+ $this->cache->delete( "$key:lock" ); // unlock
+ break;
+ }
+ }
+ $this->cache->delete( "$key:rebuild" ); // unlock
+ }
+ }
+
+ return is_array( $pendingDbInfo )
+ ? $pendingDbInfo['pendingDBs']
+ : array(); // cache is both empty and locked
+ }
+
+ /**
+ * @see JobQueueAggregator::doPurge()
+ */
+ protected function doPurge() {
+ return $this->cache->delete( $this->getReadyQueueCacheKey() );
+ }
+
+ /**
+ * @return string
+ */
+ private function getReadyQueueCacheKey() {
+ return "jobqueue:aggregator:ready-queues:v1"; // global
+ }
+}
diff --git a/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php b/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php
new file mode 100644
index 00000000..db9e764c
--- /dev/null
+++ b/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php
@@ -0,0 +1,218 @@
+<?php
+/**
+ * Job queue aggregator code that uses PhpRedis.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class to handle tracking information about all queues using PhpRedis
+ *
+ * @ingroup JobQueue
+ * @ingroup Redis
+ * @since 1.21
+ */
+class JobQueueAggregatorRedis extends JobQueueAggregator {
+ /** @var RedisConnectionPool */
+ protected $redisPool;
+
+ /** @var array List of Redis server addresses */
+ protected $servers;
+
+ /**
+ * @param array $params Possible keys:
+ * - redisConfig : An array of parameters to RedisConnectionPool::__construct().
+ * - redisServers : Array of server entries, the first being the primary and the
+ * others being fallback servers. Each entry is either a hostname/port
+ * combination or the absolute path of a UNIX socket.
+ * If a hostname is specified but no port, the standard port number
+ * 6379 will be used. Required.
+ */
+ protected function __construct( array $params ) {
+ parent::__construct( $params );
+ $this->servers = isset( $params['redisServers'] )
+ ? $params['redisServers']
+ : array( $params['redisServer'] ); // b/c
+ $params['redisConfig']['serializer'] = 'none';
+ $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
+ }
+
+ protected function doNotifyQueueEmpty( $wiki, $type ) {
+ $conn = $this->getConnection();
+ if ( !$conn ) {
+ return false;
+ }
+ try {
+ $conn->hDel( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ) );
+
+ return true;
+ } catch ( RedisException $e ) {
+ $this->handleException( $conn, $e );
+
+ return false;
+ }
+ }
+
+ protected function doNotifyQueueNonEmpty( $wiki, $type ) {
+ $conn = $this->getConnection();
+ if ( !$conn ) {
+ return false;
+ }
+ try {
+ $conn->multi( Redis::PIPELINE );
+ $conn->hSetNx( $this->getQueueTypesKey(), $type, 'enabled' );
+ $conn->hSet( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ), time() );
+ $conn->exec();
+
+ return true;
+ } catch ( RedisException $e ) {
+ $this->handleException( $conn, $e );
+
+ return false;
+ }
+ }
+
+ protected function doGetAllReadyWikiQueues() {
+ $conn = $this->getConnection();
+ if ( !$conn ) {
+ return array();
+ }
+ try {
+ $map = $conn->hGetAll( $this->getReadyQueueKey() );
+
+ if ( is_array( $map ) && isset( $map['_epoch'] ) ) {
+ unset( $map['_epoch'] ); // ignore
+ $pendingDBs = array(); // (type => list of wikis)
+ foreach ( $map as $key => $time ) {
+ list( $type, $wiki ) = $this->dencQueueName( $key );
+ $pendingDBs[$type][] = $wiki;
+ }
+ } else {
+ // Avoid duplicated effort
+ $rand = wfRandomString( 32 );
+ $conn->multi( Redis::MULTI );
+ $conn->setex( "{$rand}:lock", 3600, 1 );
+ $conn->renamenx( "{$rand}:lock", $this->getReadyQueueKey() . ":lock" );
+ if ( $conn->exec() !== array( true, true ) ) { // lock
+ $conn->delete( "{$rand}:lock" );
+ return array(); // already in progress
+ }
+
+ $pendingDBs = $this->findPendingWikiQueues(); // (type => list of wikis)
+
+ $conn->multi( Redis::PIPELINE );
+ $now = time();
+ $map = array( '_epoch' => time() ); // dummy key for empty Redis collections
+ foreach ( $pendingDBs as $type => $wikis ) {
+ $conn->hSetNx( $this->getQueueTypesKey(), $type, 'enabled' );
+ foreach ( $wikis as $wiki ) {
+ $map[$this->encQueueName( $type, $wiki )] = $now;
+ }
+ }
+ $conn->hMSet( $this->getReadyQueueKey(), $map );
+ $conn->exec();
+
+ $conn->delete( $this->getReadyQueueKey() . ":lock" ); // unlock
+ }
+
+ return $pendingDBs;
+ } catch ( RedisException $e ) {
+ $this->handleException( $conn, $e );
+
+ return array();
+ }
+ }
+
+ protected function doPurge() {
+ $conn = $this->getConnection();
+ if ( !$conn ) {
+ return false;
+ }
+ try {
+ $conn->delete( $this->getReadyQueueKey() );
+ // leave key at getQueueTypesKey() alone
+ } catch ( RedisException $e ) {
+ $this->handleException( $conn, $e );
+
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Get a connection to the server that handles all sub-queues for this queue
+ *
+ * @return RedisConnRef|bool Returns false on failure
+ * @throws MWException
+ */
+ protected function getConnection() {
+ $conn = false;
+ foreach ( $this->servers as $server ) {
+ $conn = $this->redisPool->getConnection( $server );
+ if ( $conn ) {
+ break;
+ }
+ }
+
+ return $conn;
+ }
+
+ /**
+ * @param RedisConnRef $conn
+ * @param RedisException $e
+ * @return void
+ */
+ protected function handleException( RedisConnRef $conn, $e ) {
+ $this->redisPool->handleError( $conn, $e );
+ }
+
+ /**
+ * @return string
+ */
+ private function getReadyQueueKey() {
+ return "jobqueue:aggregator:h-ready-queues:v2"; // global
+ }
+
+ /**
+ * @return string
+ */
+ private function getQueueTypesKey() {
+ return "jobqueue:aggregator:h-queue-types:v2"; // global
+ }
+
+ /**
+ * @param string $type
+ * @param string $wiki
+ * @return string
+ */
+ private function encQueueName( $type, $wiki ) {
+ return rawurlencode( $type ) . '/' . rawurlencode( $wiki );
+ }
+
+ /**
+ * @param string $name
+ * @return string
+ */
+ private function dencQueueName( $name ) {
+ list( $type, $wiki ) = explode( '/', $name, 2 );
+
+ return array( rawurldecode( $type ), rawurldecode( $wiki ) );
+ }
+}
diff --git a/includes/jobqueue/jobs/AssembleUploadChunksJob.php b/includes/jobqueue/jobs/AssembleUploadChunksJob.php
new file mode 100644
index 00000000..9e9bda6f
--- /dev/null
+++ b/includes/jobqueue/jobs/AssembleUploadChunksJob.php
@@ -0,0 +1,136 @@
+<?php
+/**
+ * Assemble the segments of a chunked upload.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup Upload
+ */
+
+/**
+ * Assemble the segments of a chunked upload.
+ *
+ * @ingroup Upload
+ */
+class AssembleUploadChunksJob extends Job {
+ public function __construct( $title, $params ) {
+ parent::__construct( 'AssembleUploadChunks', $title, $params );
+ $this->removeDuplicates = true;
+ }
+
+ public function run() {
+ $scope = RequestContext::importScopedSession( $this->params['session'] );
+ $context = RequestContext::getMain();
+ try {
+ $user = $context->getUser();
+ if ( !$user->isLoggedIn() ) {
+ $this->setLastError( "Could not load the author user from session." );
+
+ return false;
+ }
+
+ if ( count( $_SESSION ) === 0 ) {
+ // Empty session probably indicates that we didn't associate
+ // with the session correctly. Note that being able to load
+ // the user does not necessarily mean the session was loaded.
+ // Most likely cause by suhosin.session.encrypt = On.
+ $this->setLastError( "Error associating with user session. " .
+ "Try setting suhosin.session.encrypt = Off" );
+
+ return false;
+ }
+
+ UploadBase::setSessionStatus(
+ $this->params['filekey'],
+ array( 'result' => 'Poll', 'stage' => 'assembling', 'status' => Status::newGood() )
+ );
+
+ $upload = new UploadFromChunks( $user );
+ $upload->continueChunks(
+ $this->params['filename'],
+ $this->params['filekey'],
+ $context->getRequest()
+ );
+
+ // Combine all of the chunks into a local file and upload that to a new stash file
+ $status = $upload->concatenateChunks();
+ if ( !$status->isGood() ) {
+ UploadBase::setSessionStatus(
+ $this->params['filekey'],
+ array( 'result' => 'Failure', 'stage' => 'assembling', 'status' => $status )
+ );
+ $this->setLastError( $status->getWikiText() );
+
+ return false;
+ }
+
+ // We have a new filekey for the fully concatenated file
+ $newFileKey = $upload->getLocalFile()->getFileKey();
+
+ // Remove the old stash file row and first chunk file
+ $upload->stash->removeFileNoAuth( $this->params['filekey'] );
+
+ // Build the image info array while we have the local reference handy
+ $apiMain = new ApiMain(); // dummy object (XXX)
+ $imageInfo = $upload->getImageInfo( $apiMain->getResult() );
+
+ // Cleanup any temporary local file
+ $upload->cleanupTempFile();
+
+ // Cache the info so the user doesn't have to wait forever to get the final info
+ UploadBase::setSessionStatus(
+ $this->params['filekey'],
+ array(
+ 'result' => 'Success',
+ 'stage' => 'assembling',
+ 'filekey' => $newFileKey,
+ 'imageinfo' => $imageInfo,
+ 'status' => Status::newGood()
+ )
+ );
+ } catch ( MWException $e ) {
+ UploadBase::setSessionStatus(
+ $this->params['filekey'],
+ array(
+ 'result' => 'Failure',
+ 'stage' => 'assembling',
+ 'status' => Status::newFatal( 'api-error-stashfailed' )
+ )
+ );
+ $this->setLastError( get_class( $e ) . ": " . $e->getText() );
+ // To be extra robust.
+ MWExceptionHandler::rollbackMasterChangesAndLog( $e );
+
+ return false;
+ }
+
+ return true;
+ }
+
+ public function getDeduplicationInfo() {
+ $info = parent::getDeduplicationInfo();
+ if ( is_array( $info['params'] ) ) {
+ $info['params'] = array( 'filekey' => $info['params']['filekey'] );
+ }
+
+ return $info;
+ }
+
+ public function allowRetries() {
+ return false;
+ }
+}
diff --git a/includes/jobqueue/jobs/DoubleRedirectJob.php b/includes/jobqueue/jobs/DoubleRedirectJob.php
new file mode 100644
index 00000000..2561f2f1
--- /dev/null
+++ b/includes/jobqueue/jobs/DoubleRedirectJob.php
@@ -0,0 +1,250 @@
+<?php
+/**
+ * Job to fix double redirects after moving a page.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+
+/**
+ * Job to fix double redirects after moving a page
+ *
+ * @ingroup JobQueue
+ */
+class DoubleRedirectJob extends Job {
+ /** @var string Reason for the change, 'maintenance' or 'move'. Suffix fo
+ * message key 'double-redirect-fixed-'.
+ */
+ private $reason;
+
+ /** @var Title The title which has changed, redirects pointing to this
+ * title are fixed
+ */
+ private $redirTitle;
+
+ /** @var User */
+ private static $user;
+
+ /**
+ * Insert jobs into the job queue to fix redirects to the given title
+ * @param string $reason The reason for the fix, see message
+ * "double-redirect-fixed-<reason>"
+ * @param Title $redirTitle The title which has changed, redirects
+ * pointing to this title are fixed
+ * @param bool $destTitle Not used
+ */
+ public static function fixRedirects( $reason, $redirTitle, $destTitle = false ) {
+ # Need to use the master to get the redirect table updated in the same transaction
+ $dbw = wfGetDB( DB_MASTER );
+ $res = $dbw->select(
+ array( 'redirect', 'page' ),
+ array( 'page_namespace', 'page_title' ),
+ array(
+ 'page_id = rd_from',
+ 'rd_namespace' => $redirTitle->getNamespace(),
+ 'rd_title' => $redirTitle->getDBkey()
+ ), __METHOD__ );
+ if ( !$res->numRows() ) {
+ return;
+ }
+ $jobs = array();
+ foreach ( $res as $row ) {
+ $title = Title::makeTitle( $row->page_namespace, $row->page_title );
+ if ( !$title ) {
+ continue;
+ }
+
+ $jobs[] = new self( $title, array(
+ 'reason' => $reason,
+ 'redirTitle' => $redirTitle->getPrefixedDBkey() ) );
+ # Avoid excessive memory usage
+ if ( count( $jobs ) > 10000 ) {
+ JobQueueGroup::singleton()->push( $jobs );
+ $jobs = array();
+ }
+ }
+ JobQueueGroup::singleton()->push( $jobs );
+ }
+
+ /**
+ * @param Title $title
+ * @param array|bool $params
+ */
+ function __construct( $title, $params = false ) {
+ parent::__construct( 'fixDoubleRedirect', $title, $params );
+ $this->reason = $params['reason'];
+ $this->redirTitle = Title::newFromText( $params['redirTitle'] );
+ }
+
+ /**
+ * @return bool
+ */
+ function run() {
+ if ( !$this->redirTitle ) {
+ $this->setLastError( 'Invalid title' );
+
+ return false;
+ }
+
+ $targetRev = Revision::newFromTitle( $this->title, false, Revision::READ_LATEST );
+ if ( !$targetRev ) {
+ wfDebug( __METHOD__ . ": target redirect already deleted, ignoring\n" );
+
+ return true;
+ }
+ $content = $targetRev->getContent();
+ $currentDest = $content ? $content->getRedirectTarget() : null;
+ if ( !$currentDest || !$currentDest->equals( $this->redirTitle ) ) {
+ wfDebug( __METHOD__ . ": Redirect has changed since the job was queued\n" );
+
+ return true;
+ }
+
+ // Check for a suppression tag (used e.g. in periodically archived discussions)
+ $mw = MagicWord::get( 'staticredirect' );
+ if ( $content->matchMagicWord( $mw ) ) {
+ wfDebug( __METHOD__ . ": skipping: suppressed with __STATICREDIRECT__\n" );
+
+ return true;
+ }
+
+ // Find the current final destination
+ $newTitle = self::getFinalDestination( $this->redirTitle );
+ if ( !$newTitle ) {
+ wfDebug( __METHOD__ .
+ ": skipping: single redirect, circular redirect or invalid redirect destination\n" );
+
+ return true;
+ }
+ if ( $newTitle->equals( $this->redirTitle ) ) {
+ // The redirect is already right, no need to change it
+ // This can happen if the page was moved back (say after vandalism)
+ wfDebug( __METHOD__ . " : skipping, already good\n" );
+ }
+
+ // Preserve fragment (bug 14904)
+ $newTitle = Title::makeTitle( $newTitle->getNamespace(), $newTitle->getDBkey(),
+ $currentDest->getFragment(), $newTitle->getInterwiki() );
+
+ // Fix the text
+ $newContent = $content->updateRedirect( $newTitle );
+
+ if ( $newContent->equals( $content ) ) {
+ $this->setLastError( 'Content unchanged???' );
+
+ return false;
+ }
+
+ $user = $this->getUser();
+ if ( !$user ) {
+ $this->setLastError( 'Invalid user' );
+
+ return false;
+ }
+
+ // Save it
+ global $wgUser;
+ $oldUser = $wgUser;
+ $wgUser = $user;
+ $article = WikiPage::factory( $this->title );
+
+ // Messages: double-redirect-fixed-move, double-redirect-fixed-maintenance
+ $reason = wfMessage( 'double-redirect-fixed-' . $this->reason,
+ $this->redirTitle->getPrefixedText(), $newTitle->getPrefixedText()
+ )->inContentLanguage()->text();
+ $article->doEditContent( $newContent, $reason, EDIT_UPDATE | EDIT_SUPPRESS_RC, false, $user );
+ $wgUser = $oldUser;
+
+ return true;
+ }
+
+ /**
+ * Get the final destination of a redirect
+ *
+ * @param Title $title
+ *
+ * @return bool If the specified title is not a redirect, or if it is a circular redirect
+ */
+ public static function getFinalDestination( $title ) {
+ $dbw = wfGetDB( DB_MASTER );
+
+ // Circular redirect check
+ $seenTitles = array();
+ $dest = false;
+
+ while ( true ) {
+ $titleText = $title->getPrefixedDBkey();
+ if ( isset( $seenTitles[$titleText] ) ) {
+ wfDebug( __METHOD__, "Circular redirect detected, aborting\n" );
+
+ return false;
+ }
+ $seenTitles[$titleText] = true;
+
+ if ( $title->isExternal() ) {
+ // If the target is interwiki, we have to break early (bug 40352).
+ // Otherwise it will look up a row in the local page table
+ // with the namespace/page of the interwiki target which can cause
+ // unexpected results (e.g. X -> foo:Bar -> Bar -> .. )
+ break;
+ }
+
+ $row = $dbw->selectRow(
+ array( 'redirect', 'page' ),
+ array( 'rd_namespace', 'rd_title', 'rd_interwiki' ),
+ array(
+ 'rd_from=page_id',
+ 'page_namespace' => $title->getNamespace(),
+ 'page_title' => $title->getDBkey()
+ ), __METHOD__ );
+ if ( !$row ) {
+ # No redirect from here, chain terminates
+ break;
+ } else {
+ $dest = $title = Title::makeTitle(
+ $row->rd_namespace,
+ $row->rd_title,
+ '',
+ $row->rd_interwiki
+ );
+ }
+ }
+
+ return $dest;
+ }
+
+ /**
+ * Get a user object for doing edits, from a request-lifetime cache
+ * False will be returned if the user name specified in the
+ * 'double-redirect-fixer' message is invalid.
+ *
+ * @return User|bool
+ */
+ function getUser() {
+ if ( !self::$user ) {
+ $username = wfMessage( 'double-redirect-fixer' )->inContentLanguage()->text();
+ self::$user = User::newFromName( $username );
+ # User::newFromName() can return false on a badly configured wiki.
+ if ( self::$user && !self::$user->isLoggedIn() ) {
+ self::$user->addToDatabase();
+ }
+ }
+
+ return self::$user;
+ }
+}
diff --git a/includes/jobqueue/jobs/DuplicateJob.php b/includes/jobqueue/jobs/DuplicateJob.php
new file mode 100644
index 00000000..1fa6cefe
--- /dev/null
+++ b/includes/jobqueue/jobs/DuplicateJob.php
@@ -0,0 +1,59 @@
+<?php
+/**
+ * No-op job that does nothing.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup Cache
+ */
+
+/**
+ * No-op job that does nothing. Used to represent duplicates.
+ *
+ * @ingroup JobQueue
+ */
+final class DuplicateJob extends Job {
+ /**
+ * Callers should use DuplicateJob::newFromJob() instead
+ *
+ * @param Title $title
+ * @param array $params Job parameters
+ */
+ function __construct( $title, $params ) {
+ parent::__construct( 'duplicate', $title, $params );
+ }
+
+ /**
+ * Get a duplicate no-op version of a job
+ *
+ * @param Job $job
+ * @return Job
+ */
+ public static function newFromJob( Job $job ) {
+ $djob = new self( $job->getTitle(), $job->getParams() );
+ $djob->command = $job->getType();
+ $djob->params = is_array( $djob->params ) ? $djob->params : array();
+ $djob->params = array( 'isDuplicate' => true ) + $djob->params;
+ $djob->metadata = $job->metadata;
+
+ return $djob;
+ }
+
+ public function run() {
+ return true;
+ }
+}
diff --git a/includes/jobqueue/jobs/EmaillingJob.php b/includes/jobqueue/jobs/EmaillingJob.php
new file mode 100644
index 00000000..df8ae63e
--- /dev/null
+++ b/includes/jobqueue/jobs/EmaillingJob.php
@@ -0,0 +1,46 @@
+<?php
+/**
+ * Old job for notification emails.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+
+/**
+ * Old job used for sending single notification emails;
+ * kept for backwards-compatibility
+ *
+ * @ingroup JobQueue
+ */
+class EmaillingJob extends Job {
+ function __construct( $title, $params ) {
+ parent::__construct( 'sendMail', Title::newMainPage(), $params );
+ }
+
+ function run() {
+ $status = UserMailer::send(
+ $this->params['to'],
+ $this->params['from'],
+ $this->params['subj'],
+ $this->params['body'],
+ $this->params['replyto']
+ );
+
+ return $status->isOK();
+ }
+}
diff --git a/includes/jobqueue/jobs/EnotifNotifyJob.php b/includes/jobqueue/jobs/EnotifNotifyJob.php
new file mode 100644
index 00000000..1ed99a58
--- /dev/null
+++ b/includes/jobqueue/jobs/EnotifNotifyJob.php
@@ -0,0 +1,57 @@
+<?php
+/**
+ * Job for notification emails.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+
+/**
+ * Job for email notification mails
+ *
+ * @ingroup JobQueue
+ */
+class EnotifNotifyJob extends Job {
+ function __construct( $title, $params ) {
+ parent::__construct( 'enotifNotify', $title, $params );
+ }
+
+ function run() {
+ $enotif = new EmailNotification();
+ // Get the user from ID (rename safe). Anons are 0, so defer to name.
+ if ( isset( $this->params['editorID'] ) && $this->params['editorID'] ) {
+ $editor = User::newFromId( $this->params['editorID'] );
+ // B/C, only the name might be given.
+ } else {
+ # @todo FIXME: newFromName could return false on a badly configured wiki.
+ $editor = User::newFromName( $this->params['editor'], false );
+ }
+ $enotif->actuallyNotifyOnPageChange(
+ $editor,
+ $this->title,
+ $this->params['timestamp'],
+ $this->params['summary'],
+ $this->params['minorEdit'],
+ $this->params['oldid'],
+ $this->params['watchers'],
+ $this->params['pageStatus']
+ );
+
+ return true;
+ }
+}
diff --git a/includes/jobqueue/jobs/HTMLCacheUpdateJob.php b/includes/jobqueue/jobs/HTMLCacheUpdateJob.php
new file mode 100644
index 00000000..4d1e72c9
--- /dev/null
+++ b/includes/jobqueue/jobs/HTMLCacheUpdateJob.php
@@ -0,0 +1,162 @@
+<?php
+/**
+ * HTML cache invalidation of all pages linking to a given title.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup Cache
+ */
+
+/**
+ * Job to purge the cache for all pages that link to or use another page or file
+ *
+ * This job comes in a few variants:
+ * - a) Recursive jobs to purge caches for backlink pages for a given title.
+ * These jobs have have (recursive:true,table:<table>) set.
+ * - b) Jobs to purge caches for a set of titles (the job title is ignored).
+ * These jobs have have (pages:(<page ID>:(<namespace>,<title>),...) set.
+ *
+ * @ingroup JobQueue
+ */
+class HTMLCacheUpdateJob extends Job {
+ function __construct( $title, $params = '' ) {
+ parent::__construct( 'htmlCacheUpdate', $title, $params );
+ // Base backlink purge jobs can be de-duplicated
+ $this->removeDuplicates = ( !isset( $params['range'] ) && !isset( $params['pages'] ) );
+ }
+
+ function run() {
+ global $wgUpdateRowsPerJob, $wgUpdateRowsPerQuery;
+
+ static $expected = array( 'recursive', 'pages' ); // new jobs have one of these
+
+ $oldRangeJob = false;
+ if ( !array_intersect( array_keys( $this->params ), $expected ) ) {
+ // B/C for older job params formats that lack these fields:
+ // a) base jobs with just ("table") and b) range jobs with ("table","start","end")
+ if ( isset( $this->params['start'] ) && isset( $this->params['end'] ) ) {
+ $oldRangeJob = true;
+ } else {
+ $this->params['recursive'] = true; // base job
+ }
+ }
+
+ // Job to purge all (or a range of) backlink pages for a page
+ if ( !empty( $this->params['recursive'] ) ) {
+ // Convert this into no more than $wgUpdateRowsPerJob HTMLCacheUpdateJob per-title
+ // jobs and possibly a recursive HTMLCacheUpdateJob job for the rest of the backlinks
+ $jobs = BacklinkJobUtils::partitionBacklinkJob(
+ $this,
+ $wgUpdateRowsPerJob,
+ $wgUpdateRowsPerQuery, // jobs-per-title
+ // Carry over information for de-duplication
+ array( 'params' => $this->getRootJobParams() )
+ );
+ JobQueueGroup::singleton()->push( $jobs );
+ // Job to purge pages for for a set of titles
+ } elseif ( isset( $this->params['pages'] ) ) {
+ $this->invalidateTitles( $this->params['pages'] );
+ // B/C for job to purge a range of backlink pages for a given page
+ } elseif ( $oldRangeJob ) {
+ $titleArray = $this->title->getBacklinkCache()->getLinks(
+ $this->params['table'], $this->params['start'], $this->params['end'] );
+
+ $pages = array(); // same format BacklinkJobUtils uses
+ foreach ( $titleArray as $tl ) {
+ $pages[$tl->getArticleId()] = array( $tl->getNamespace(), $tl->getDbKey() );
+ }
+
+ $jobs = array();
+ foreach ( array_chunk( $pages, $wgUpdateRowsPerJob ) as $pageChunk ) {
+ $jobs[] = new HTMLCacheUpdateJob( $this->title,
+ array(
+ 'table' => $this->params['table'],
+ 'pages' => $pageChunk
+ ) + $this->getRootJobParams() // carry over information for de-duplication
+ );
+ }
+ JobQueueGroup::singleton()->push( $jobs );
+ }
+
+ return true;
+ }
+
+ /**
+ * @param array $pages Map of (page ID => (namespace, DB key)) entries
+ */
+ protected function invalidateTitles( array $pages ) {
+ global $wgUpdateRowsPerQuery, $wgUseFileCache, $wgUseSquid;
+
+ // Get all page IDs in this query into an array
+ $pageIds = array_keys( $pages );
+ if ( !$pageIds ) {
+ return;
+ }
+
+ $dbw = wfGetDB( DB_MASTER );
+
+ // The page_touched field will need to be bumped for these pages.
+ // Only bump it to the present time if no "rootJobTimestamp" was known.
+ // If it is known, it can be used instead, which avoids invalidating output
+ // that was in fact generated *after* the relevant dependency change time
+ // (e.g. template edit). This is particularily useful since refreshLinks jobs
+ // save back parser output and usually run along side htmlCacheUpdate jobs;
+ // their saved output would be invalidated by using the current timestamp.
+ if ( isset( $this->params['rootJobTimestamp'] ) ) {
+ $touchTimestamp = $this->params['rootJobTimestamp'];
+ } else {
+ $touchTimestamp = wfTimestampNow();
+ }
+
+ // Update page_touched (skipping pages already touched since the root job).
+ // Check $wgUpdateRowsPerQuery for sanity; batch jobs are sized by that already.
+ foreach ( array_chunk( $pageIds, $wgUpdateRowsPerQuery ) as $batch ) {
+ $dbw->update( 'page',
+ array( 'page_touched' => $dbw->timestamp( $touchTimestamp ) ),
+ array( 'page_id' => $batch,
+ // don't invalidated pages that were already invalidated
+ "page_touched < " . $dbw->addQuotes( $dbw->timestamp( $touchTimestamp ) )
+ ),
+ __METHOD__
+ );
+ }
+ // Get the list of affected pages (races only mean something else did the purge)
+ $titleArray = TitleArray::newFromResult( $dbw->select(
+ 'page',
+ array( 'page_namespace', 'page_title' ),
+ array( 'page_id' => $pageIds, 'page_touched' => $dbw->timestamp( $touchTimestamp ) ),
+ __METHOD__
+ ) );
+
+ // Update squid
+ if ( $wgUseSquid ) {
+ $u = SquidUpdate::newFromTitles( $titleArray );
+ $u->doUpdate();
+ }
+
+ // Update file cache
+ if ( $wgUseFileCache ) {
+ foreach ( $titleArray as $title ) {
+ HTMLFileCache::clearFileCache( $title );
+ }
+ }
+ }
+
+ public function workItemCount() {
+ return isset( $this->params['pages'] ) ? count( $this->params['pages'] ) : 1;
+ }
+}
diff --git a/includes/jobqueue/jobs/NullJob.php b/includes/jobqueue/jobs/NullJob.php
new file mode 100644
index 00000000..66291e9d
--- /dev/null
+++ b/includes/jobqueue/jobs/NullJob.php
@@ -0,0 +1,76 @@
+<?php
+/**
+ * Degenerate job that does nothing.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup Cache
+ */
+
+/**
+ * Degenerate job that does nothing, but can optionally replace itself
+ * in the queue and/or sleep for a brief time period. These can be used
+ * to represent "no-op" jobs or test lock contention and performance.
+ *
+ * @par Example:
+ * Inserting a null job in the configured job queue:
+ * @code
+ * $ php maintenance/eval.php
+ * > $queue = JobQueueGroup::singleton();
+ * > $job = new NullJob( Title::newMainPage(), array( 'lives' => 10 ) );
+ * > $queue->push( $job );
+ * @endcode
+ * You can then confirm the job has been enqueued by using the showJobs.php
+ * maintenance utility:
+ * @code
+ * $ php maintenance/showJobs.php --group
+ * null: 1 queue; 0 claimed (0 active, 0 abandoned)
+ * $
+ * @endcode
+ *
+ * @ingroup JobQueue
+ */
+class NullJob extends Job {
+ /**
+ * @param Title $title
+ * @param array $params Job parameters (lives, usleep)
+ */
+ function __construct( $title, $params ) {
+ parent::__construct( 'null', $title, $params );
+ if ( !isset( $this->params['lives'] ) ) {
+ $this->params['lives'] = 1;
+ }
+ if ( !isset( $this->params['usleep'] ) ) {
+ $this->params['usleep'] = 0;
+ }
+ $this->removeDuplicates = !empty( $this->params['removeDuplicates'] );
+ }
+
+ public function run() {
+ if ( $this->params['usleep'] > 0 ) {
+ usleep( $this->params['usleep'] );
+ }
+ if ( $this->params['lives'] > 1 ) {
+ $params = $this->params;
+ $params['lives']--;
+ $job = new self( $this->title, $params );
+ JobQueueGroup::singleton()->push( $job );
+ }
+
+ return true;
+ }
+}
diff --git a/includes/jobqueue/jobs/PublishStashedFileJob.php b/includes/jobqueue/jobs/PublishStashedFileJob.php
new file mode 100644
index 00000000..918a392d
--- /dev/null
+++ b/includes/jobqueue/jobs/PublishStashedFileJob.php
@@ -0,0 +1,150 @@
+<?php
+/**
+ * Upload a file from the upload stash into the local file repo.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup Upload
+ */
+
+/**
+ * Upload a file from the upload stash into the local file repo.
+ *
+ * @ingroup Upload
+ */
+class PublishStashedFileJob extends Job {
+ public function __construct( $title, $params ) {
+ parent::__construct( 'PublishStashedFile', $title, $params );
+ $this->removeDuplicates = true;
+ }
+
+ public function run() {
+ $scope = RequestContext::importScopedSession( $this->params['session'] );
+ $context = RequestContext::getMain();
+ try {
+ $user = $context->getUser();
+ if ( !$user->isLoggedIn() ) {
+ $this->setLastError( "Could not load the author user from session." );
+
+ return false;
+ }
+
+ if ( count( $_SESSION ) === 0 ) {
+ // Empty session probably indicates that we didn't associate
+ // with the session correctly. Note that being able to load
+ // the user does not necessarily mean the session was loaded.
+ // Most likely cause by suhosin.session.encrypt = On.
+ $this->setLastError( "Error associating with user session. " .
+ "Try setting suhosin.session.encrypt = Off" );
+
+ return false;
+ }
+
+ UploadBase::setSessionStatus(
+ $this->params['filekey'],
+ array( 'result' => 'Poll', 'stage' => 'publish', 'status' => Status::newGood() )
+ );
+
+ $upload = new UploadFromStash( $user );
+ // @todo initialize() causes a GET, ideally we could frontload the antivirus
+ // checks and anything else to the stash stage (which includes concatenation and
+ // the local file is thus already there). That way, instead of GET+PUT, there could
+ // just be a COPY operation from the stash to the public zone.
+ $upload->initialize( $this->params['filekey'], $this->params['filename'] );
+
+ // Check if the local file checks out (this is generally a no-op)
+ $verification = $upload->verifyUpload();
+ if ( $verification['status'] !== UploadBase::OK ) {
+ $status = Status::newFatal( 'verification-error' );
+ $status->value = array( 'verification' => $verification );
+ UploadBase::setSessionStatus(
+ $this->params['filekey'],
+ array( 'result' => 'Failure', 'stage' => 'publish', 'status' => $status )
+ );
+ $this->setLastError( "Could not verify upload." );
+
+ return false;
+ }
+
+ // Upload the stashed file to a permanent location
+ $status = $upload->performUpload(
+ $this->params['comment'],
+ $this->params['text'],
+ $this->params['watch'],
+ $user
+ );
+ if ( !$status->isGood() ) {
+ UploadBase::setSessionStatus(
+ $this->params['filekey'],
+ array( 'result' => 'Failure', 'stage' => 'publish', 'status' => $status )
+ );
+ $this->setLastError( $status->getWikiText() );
+
+ return false;
+ }
+
+ // Build the image info array while we have the local reference handy
+ $apiMain = new ApiMain(); // dummy object (XXX)
+ $imageInfo = $upload->getImageInfo( $apiMain->getResult() );
+
+ // Cleanup any temporary local file
+ $upload->cleanupTempFile();
+
+ // Cache the info so the user doesn't have to wait forever to get the final info
+ UploadBase::setSessionStatus(
+ $this->params['filekey'],
+ array(
+ 'result' => 'Success',
+ 'stage' => 'publish',
+ 'filename' => $upload->getLocalFile()->getName(),
+ 'imageinfo' => $imageInfo,
+ 'status' => Status::newGood()
+ )
+ );
+ } catch ( MWException $e ) {
+ UploadBase::setSessionStatus(
+ $this->params['filekey'],
+ array(
+ 'result' => 'Failure',
+ 'stage' => 'publish',
+ 'status' => Status::newFatal( 'api-error-publishfailed' )
+ )
+ );
+ $this->setLastError( get_class( $e ) . ": " . $e->getText() );
+ // To prevent potential database referential integrity issues.
+ // See bug 32551.
+ MWExceptionHandler::rollbackMasterChangesAndLog( $e );
+
+ return false;
+ }
+
+ return true;
+ }
+
+ public function getDeduplicationInfo() {
+ $info = parent::getDeduplicationInfo();
+ if ( is_array( $info['params'] ) ) {
+ $info['params'] = array( 'filekey' => $info['params']['filekey'] );
+ }
+
+ return $info;
+ }
+
+ public function allowRetries() {
+ return false;
+ }
+}
diff --git a/includes/jobqueue/jobs/RefreshLinksJob.php b/includes/jobqueue/jobs/RefreshLinksJob.php
new file mode 100644
index 00000000..f82af273
--- /dev/null
+++ b/includes/jobqueue/jobs/RefreshLinksJob.php
@@ -0,0 +1,199 @@
+<?php
+/**
+ * Job to update link tables for pages
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+
+/**
+ * Job to update link tables for pages
+ *
+ * This job comes in a few variants:
+ * - a) Recursive jobs to update links for backlink pages for a given title.
+ * These jobs have have (recursive:true,table:<table>) set.
+ * - b) Jobs to update links for a set of pages (the job title is ignored).
+ * These jobs have have (pages:(<page ID>:(<namespace>,<title>),...) set.
+ * - c) Jobs to update links for a single page (the job title)
+ * These jobs need no extra fields set.
+ *
+ * @ingroup JobQueue
+ */
+class RefreshLinksJob extends Job {
+ const PARSE_THRESHOLD_SEC = 1.0;
+
+ function __construct( $title, $params = '' ) {
+ parent::__construct( 'refreshLinks', $title, $params );
+ // Base backlink update jobs and per-title update jobs can be de-duplicated.
+ // If template A changes twice before any jobs run, a clean queue will have:
+ // (A base, A base)
+ // The second job is ignored by the queue on insertion.
+ // Suppose, many pages use template A, and that template itself uses template B.
+ // An edit to both will first create two base jobs. A clean FIFO queue will have:
+ // (A base, B base)
+ // When these jobs run, the queue will have per-title and remnant partition jobs:
+ // (titleX,titleY,titleZ,...,A remnant,titleM,titleN,titleO,...,B remnant)
+ // Some these jobs will be the same, and will automatically be ignored by
+ // the queue upon insertion. Some title jobs will run before the duplicate is
+ // inserted, so the work will still be done twice in those cases. More titles
+ // can be de-duplicated as the remnant jobs continue to be broken down. This
+ // works best when $wgUpdateRowsPerJob, and either the pages have few backlinks
+ // and/or the backlink sets for pages A and B are almost identical.
+ $this->removeDuplicates = !isset( $params['range'] )
+ && ( !isset( $params['pages'] ) || count( $params['pages'] ) == 1 );
+ }
+
+ function run() {
+ global $wgUpdateRowsPerJob;
+
+ // Job to update all (or a range of) backlink pages for a page
+ if ( !empty( $this->params['recursive'] ) ) {
+ // Carry over information for de-duplication
+ $extraParams = $this->getRootJobParams();
+ // Avoid slave lag when fetching templates.
+ // When the outermost job is run, we know that the caller that enqueued it must have
+ // committed the relevant changes to the DB by now. At that point, record the master
+ // position and pass it along as the job recursively breaks into smaller range jobs.
+ // Hopefully, when leaf jobs are popped, the slaves will have reached that position.
+ if ( isset( $this->params['masterPos'] ) ) {
+ $extraParams['masterPos'] = $this->params['masterPos'];
+ } elseif ( wfGetLB()->getServerCount() > 1 ) {
+ $extraParams['masterPos'] = wfGetLB()->getMasterPos();
+ } else {
+ $extraParams['masterPos'] = false;
+ }
+ // Convert this into no more than $wgUpdateRowsPerJob RefreshLinks per-title
+ // jobs and possibly a recursive RefreshLinks job for the rest of the backlinks
+ $jobs = BacklinkJobUtils::partitionBacklinkJob(
+ $this,
+ $wgUpdateRowsPerJob,
+ 1, // job-per-title
+ array( 'params' => $extraParams )
+ );
+ JobQueueGroup::singleton()->push( $jobs );
+ // Job to update link tables for for a set of titles
+ } elseif ( isset( $this->params['pages'] ) ) {
+ foreach ( $this->params['pages'] as $pageId => $nsAndKey ) {
+ list( $ns, $dbKey ) = $nsAndKey;
+ $this->runForTitle( Title::makeTitleSafe( $ns, $dbKey ) );
+ }
+ // Job to update link tables for a given title
+ } else {
+ $this->runForTitle( $this->title );
+ }
+
+ return true;
+ }
+
+ protected function runForTitle( Title $title = null ) {
+ $linkCache = LinkCache::singleton();
+ $linkCache->clear();
+
+ if ( is_null( $title ) ) {
+ $this->setLastError( "refreshLinks: Invalid title" );
+ return false;
+ }
+
+ // Wait for the DB of the current/next slave DB handle to catch up to the master.
+ // This way, we get the correct page_latest for templates or files that just changed
+ // milliseconds ago, having triggered this job to begin with.
+ if ( isset( $this->params['masterPos'] ) && $this->params['masterPos'] !== false ) {
+ wfGetLB()->waitFor( $this->params['masterPos'] );
+ }
+
+ $page = WikiPage::factory( $title );
+
+ // Fetch the current revision...
+ $revision = Revision::newFromTitle( $title, false, Revision::READ_NORMAL );
+ if ( !$revision ) {
+ $this->setLastError( "refreshLinks: Article not found {$title->getPrefixedDBkey()}" );
+ return false; // XXX: what if it was just deleted?
+ }
+ $content = $revision->getContent( Revision::RAW );
+ if ( !$content ) {
+ // If there is no content, pretend the content is empty
+ $content = $revision->getContentHandler()->makeEmptyContent();
+ }
+
+ $parserOutput = false;
+ $parserOptions = $page->makeParserOptions( 'canonical' );
+ // If page_touched changed after this root job (with a good slave lag skew factor),
+ // then it is likely that any views of the pages already resulted in re-parses which
+ // are now in cache. This can be reused to avoid expensive parsing in some cases.
+ if ( isset( $this->params['rootJobTimestamp'] ) ) {
+ $skewedTimestamp = wfTimestamp( TS_UNIX, $this->params['rootJobTimestamp'] ) + 5;
+ if ( $page->getLinksTimestamp() > wfTimestamp( TS_MW, $skewedTimestamp ) ) {
+ // Something already updated the backlinks since this job was made
+ return true;
+ }
+ if ( $page->getTouched() > wfTimestamp( TS_MW, $skewedTimestamp ) ) {
+ $parserOutput = ParserCache::singleton()->getDirty( $page, $parserOptions );
+ if ( $parserOutput && $parserOutput->getCacheTime() <= $skewedTimestamp ) {
+ $parserOutput = false; // too stale
+ }
+ }
+ }
+ // Fetch the current revision and parse it if necessary...
+ if ( $parserOutput == false ) {
+ $start = microtime( true );
+ // Revision ID must be passed to the parser output to get revision variables correct
+ $parserOutput = $content->getParserOutput(
+ $title, $revision->getId(), $parserOptions, false );
+ $ellapsed = microtime( true ) - $start;
+ // If it took a long time to render, then save this back to the cache to avoid
+ // wasted CPU by other apaches or job runners. We don't want to always save to
+ // cache as this cause cause high cache I/O and LRU churn when a template changes.
+ if ( $ellapsed >= self::PARSE_THRESHOLD_SEC
+ && $page->isParserCacheUsed( $parserOptions, $revision->getId() )
+ && $parserOutput->isCacheable()
+ ) {
+ $ctime = wfTimestamp( TS_MW, (int)$start ); // cache time
+ ParserCache::singleton()->save(
+ $parserOutput, $page, $parserOptions, $ctime, $revision->getId()
+ );
+ }
+ }
+
+ $updates = $content->getSecondaryDataUpdates( $title, null, false, $parserOutput );
+ DataUpdate::runUpdates( $updates );
+
+ InfoAction::invalidateCache( $title );
+
+ return true;
+ }
+
+ public function getDeduplicationInfo() {
+ $info = parent::getDeduplicationInfo();
+ if ( is_array( $info['params'] ) ) {
+ // Don't let highly unique "masterPos" values ruin duplicate detection
+ unset( $info['params']['masterPos'] );
+ // For per-pages jobs, the job title is that of the template that changed
+ // (or similar), so remove that since it ruins duplicate detection
+ if ( isset( $info['pages'] ) ) {
+ unset( $info['namespace'] );
+ unset( $info['title'] );
+ }
+ }
+
+ return $info;
+ }
+
+ public function workItemCount() {
+ return isset( $this->params['pages'] ) ? count( $this->params['pages'] ) : 1;
+ }
+}
diff --git a/includes/jobqueue/jobs/RefreshLinksJob2.php b/includes/jobqueue/jobs/RefreshLinksJob2.php
new file mode 100644
index 00000000..97405aeb
--- /dev/null
+++ b/includes/jobqueue/jobs/RefreshLinksJob2.php
@@ -0,0 +1,141 @@
+<?php
+/**
+ * Job to update links for a given title.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+
+/**
+ * Background job to update links for titles in certain backlink range by page ID.
+ * Newer version for high use templates. This is deprecated by RefreshLinksPartitionJob.
+ *
+ * @ingroup JobQueue
+ * @deprecated since 1.23
+ */
+class RefreshLinksJob2 extends Job {
+ function __construct( $title, $params ) {
+ parent::__construct( 'refreshLinks2', $title, $params );
+ // Base jobs for large templates can easily be de-duplicated
+ $this->removeDuplicates = !isset( $params['start'] ) && !isset( $params['end'] );
+ }
+
+ /**
+ * Run a refreshLinks2 job
+ * @return bool Success
+ */
+ function run() {
+ global $wgUpdateRowsPerJob;
+
+ $linkCache = LinkCache::singleton();
+ $linkCache->clear();
+
+ if ( is_null( $this->title ) ) {
+ $this->error = "refreshLinks2: Invalid title";
+ return false;
+ }
+
+ // Back compat for pre-r94435 jobs
+ $table = isset( $this->params['table'] ) ? $this->params['table'] : 'templatelinks';
+
+ // Avoid slave lag when fetching templates.
+ // When the outermost job is run, we know that the caller that enqueued it must have
+ // committed the relevant changes to the DB by now. At that point, record the master
+ // position and pass it along as the job recursively breaks into smaller range jobs.
+ // Hopefully, when leaf jobs are popped, the slaves will have reached that position.
+ if ( isset( $this->params['masterPos'] ) ) {
+ $masterPos = $this->params['masterPos'];
+ } elseif ( wfGetLB()->getServerCount() > 1 ) {
+ $masterPos = wfGetLB()->getMasterPos();
+ } else {
+ $masterPos = false;
+ }
+
+ $tbc = $this->title->getBacklinkCache();
+
+ $jobs = array(); // jobs to insert
+ if ( isset( $this->params['start'] ) && isset( $this->params['end'] ) ) {
+ # This is a partition job to trigger the insertion of leaf jobs...
+ $jobs = array_merge( $jobs, $this->getSingleTitleJobs( $table, $masterPos ) );
+ } else {
+ # This is a base job to trigger the insertion of partitioned jobs...
+ if ( $tbc->getNumLinks( $table, $wgUpdateRowsPerJob + 1 ) <= $wgUpdateRowsPerJob ) {
+ # Just directly insert the single per-title jobs
+ $jobs = array_merge( $jobs, $this->getSingleTitleJobs( $table, $masterPos ) );
+ } else {
+ # Insert the partition jobs to make per-title jobs
+ foreach ( $tbc->partition( $table, $wgUpdateRowsPerJob ) as $batch ) {
+ list( $start, $end ) = $batch;
+ $jobs[] = new RefreshLinksJob2( $this->title,
+ array(
+ 'table' => $table,
+ 'start' => $start,
+ 'end' => $end,
+ 'masterPos' => $masterPos,
+ ) + $this->getRootJobParams() // carry over information for de-duplication
+ );
+ }
+ }
+ }
+
+ if ( count( $jobs ) ) {
+ JobQueueGroup::singleton()->push( $jobs );
+ }
+
+ return true;
+ }
+
+ /**
+ * @param string $table
+ * @param mixed $masterPos
+ * @return array
+ */
+ protected function getSingleTitleJobs( $table, $masterPos ) {
+ # The "start"/"end" fields are not set for the base jobs
+ $start = isset( $this->params['start'] ) ? $this->params['start'] : false;
+ $end = isset( $this->params['end'] ) ? $this->params['end'] : false;
+ $titles = $this->title->getBacklinkCache()->getLinks( $table, $start, $end );
+ # Convert into single page refresh links jobs.
+ # This handles well when in sapi mode and is useful in any case for job
+ # de-duplication. If many pages use template A, and that template itself
+ # uses template B, then an edit to both will create many duplicate jobs.
+ # Roughly speaking, for each page, one of the "RefreshLinksJob" jobs will
+ # get run first, and when it does, it will remove the duplicates. Of course,
+ # one page could have its job popped when the other page's job is still
+ # buried within the logic of a refreshLinks2 job.
+ $jobs = array();
+ foreach ( $titles as $title ) {
+ $jobs[] = new RefreshLinksJob( $title,
+ array( 'masterPos' => $masterPos ) + $this->getRootJobParams()
+ ); // carry over information for de-duplication
+ }
+ return $jobs;
+ }
+
+ /**
+ * @return array
+ */
+ public function getDeduplicationInfo() {
+ $info = parent::getDeduplicationInfo();
+ // Don't let highly unique "masterPos" values ruin duplicate detection
+ if ( is_array( $info['params'] ) ) {
+ unset( $info['params']['masterPos'] );
+ }
+ return $info;
+ }
+}
diff --git a/includes/jobqueue/jobs/UploadFromUrlJob.php b/includes/jobqueue/jobs/UploadFromUrlJob.php
new file mode 100644
index 00000000..a09db15a
--- /dev/null
+++ b/includes/jobqueue/jobs/UploadFromUrlJob.php
@@ -0,0 +1,187 @@
+<?php
+/**
+ * Job for asynchronous upload-by-url.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+
+/**
+ * Job for asynchronous upload-by-url.
+ *
+ * This job is in fact an interface to UploadFromUrl, which is designed such
+ * that it does not require any globals. If it does, fix it elsewhere, do not
+ * add globals in here.
+ *
+ * @ingroup JobQueue
+ */
+class UploadFromUrlJob extends Job {
+ const SESSION_KEYNAME = 'wsUploadFromUrlJobData';
+
+ /** @var UploadFromUrl */
+ public $upload;
+
+ /** @var User */
+ protected $user;
+
+ public function __construct( $title, $params ) {
+ parent::__construct( 'uploadFromUrl', $title, $params );
+ }
+
+ public function run() {
+ global $wgCopyUploadAsyncTimeout;
+ # Initialize this object and the upload object
+ $this->upload = new UploadFromUrl();
+ $this->upload->initialize(
+ $this->title->getText(),
+ $this->params['url'],
+ false
+ );
+ $this->user = User::newFromName( $this->params['userName'] );
+
+ # Fetch the file
+ $opts = array();
+ if ( $wgCopyUploadAsyncTimeout ) {
+ $opts['timeout'] = $wgCopyUploadAsyncTimeout;
+ }
+ $status = $this->upload->fetchFile( $opts );
+ if ( !$status->isOk() ) {
+ $this->leaveMessage( $status );
+
+ return true;
+ }
+
+ # Verify upload
+ $result = $this->upload->verifyUpload();
+ if ( $result['status'] != UploadBase::OK ) {
+ $status = $this->upload->convertVerifyErrorToStatus( $result );
+ $this->leaveMessage( $status );
+
+ return true;
+ }
+
+ # Check warnings
+ if ( !$this->params['ignoreWarnings'] ) {
+ $warnings = $this->upload->checkWarnings();
+ if ( $warnings ) {
+
+ # Stash the upload
+ $key = $this->upload->stashFile();
+
+ // @todo FIXME: This has been broken for a while.
+ // User::leaveUserMessage() does not exist.
+ if ( $this->params['leaveMessage'] ) {
+ $this->user->leaveUserMessage(
+ wfMessage( 'upload-warning-subj' )->text(),
+ wfMessage( 'upload-warning-msg',
+ $key,
+ $this->params['url'] )->text()
+ );
+ } else {
+ wfSetupSession( $this->params['sessionId'] );
+ $this->storeResultInSession( 'Warning',
+ 'warnings', $warnings );
+ session_write_close();
+ }
+
+ return true;
+ }
+ }
+
+ # Perform the upload
+ $status = $this->upload->performUpload(
+ $this->params['comment'],
+ $this->params['pageText'],
+ $this->params['watch'],
+ $this->user
+ );
+ $this->leaveMessage( $status );
+
+ return true;
+ }
+
+ /**
+ * Leave a message on the user talk page or in the session according to
+ * $params['leaveMessage'].
+ *
+ * @param Status $status
+ */
+ protected function leaveMessage( $status ) {
+ if ( $this->params['leaveMessage'] ) {
+ if ( $status->isGood() ) {
+ // @todo FIXME: user->leaveUserMessage does not exist.
+ $this->user->leaveUserMessage( wfMessage( 'upload-success-subj' )->text(),
+ wfMessage( 'upload-success-msg',
+ $this->upload->getTitle()->getText(),
+ $this->params['url']
+ )->text() );
+ } else {
+ // @todo FIXME: user->leaveUserMessage does not exist.
+ $this->user->leaveUserMessage( wfMessage( 'upload-failure-subj' )->text(),
+ wfMessage( 'upload-failure-msg',
+ $status->getWikiText(),
+ $this->params['url']
+ )->text() );
+ }
+ } else {
+ wfSetupSession( $this->params['sessionId'] );
+ if ( $status->isOk() ) {
+ $this->storeResultInSession( 'Success',
+ 'filename', $this->upload->getLocalFile()->getName() );
+ } else {
+ $this->storeResultInSession( 'Failure',
+ 'errors', $status->getErrorsArray() );
+ }
+ session_write_close();
+ }
+ }
+
+ /**
+ * Store a result in the session data. Note that the caller is responsible
+ * for appropriate session_start and session_write_close calls.
+ *
+ * @param string $result The result (Success|Warning|Failure)
+ * @param string $dataKey The key of the extra data
+ * @param mixed $dataValue The extra data itself
+ */
+ protected function storeResultInSession( $result, $dataKey, $dataValue ) {
+ $session =& self::getSessionData( $this->params['sessionKey'] );
+ $session['result'] = $result;
+ $session[$dataKey] = $dataValue;
+ }
+
+ /**
+ * Initialize the session data. Sets the intial result to queued.
+ */
+ public function initializeSessionData() {
+ $session =& self::getSessionData( $this->params['sessionKey'] );
+ $$session['result'] = 'Queued';
+ }
+
+ /**
+ * @param string $key
+ * @return mixed
+ */
+ public static function &getSessionData( $key ) {
+ if ( !isset( $_SESSION[self::SESSION_KEYNAME][$key] ) ) {
+ $_SESSION[self::SESSION_KEYNAME][$key] = array();
+ }
+
+ return $_SESSION[self::SESSION_KEYNAME][$key];
+ }
+}
diff --git a/includes/jobqueue/utils/BacklinkJobUtils.php b/includes/jobqueue/utils/BacklinkJobUtils.php
new file mode 100644
index 00000000..c8e5df66
--- /dev/null
+++ b/includes/jobqueue/utils/BacklinkJobUtils.php
@@ -0,0 +1,122 @@
+<?php
+/**
+ * Job to update links for a given title.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class with Backlink related Job helper methods
+ *
+ * @ingroup JobQueue
+ * @since 1.23
+ */
+class BacklinkJobUtils {
+ /**
+ * Break down $job into approximately ($bSize/$cSize) leaf jobs and a single partition
+ * job that covers the remaining backlink range (if needed). Jobs for the first $bSize
+ * titles are collated ($cSize per job) into leaf jobs to do actual work. All the
+ * resulting jobs are of the same class as $job. No partition job is returned if the
+ * range covered by $job was less than $bSize, as the leaf jobs have full coverage.
+ *
+ * The leaf jobs have the 'pages' param set to a (<page ID>:(<namespace>,<DB key>),...)
+ * map so that the run() function knows what pages to act on. The leaf jobs will keep
+ * the same job title as the parent job (e.g. $job).
+ *
+ * The partition jobs have the 'range' parameter set to a map of the format
+ * (start:<integer>, end:<integer>, batchSize:<integer>, subranges:((<start>,<end>),...)),
+ * the 'table' parameter set to that of $job, and the 'recursive' parameter set to true.
+ * This method can be called on the resulting job to repeat the process again.
+ *
+ * The job provided ($job) must have the 'recursive' parameter set to true and the 'table'
+ * parameter must be set to a backlink table. The job title will be used as the title to
+ * find backlinks for. Any 'range' parameter must follow the same format as mentioned above.
+ * This should be managed by recursive calls to this method.
+ *
+ * The first jobs return are always the leaf jobs. This lets the caller use push() to
+ * put them directly into the queue and works well if the queue is FIFO. In such a queue,
+ * the leaf jobs have to get finished first before anything can resolve the next partition
+ * job, which keeps the queue very small.
+ *
+ * $opts includes:
+ * - params : extra job parameters to include in each job
+ *
+ * @param Job $job
+ * @param int $bSize BacklinkCache partition size; usually $wgUpdateRowsPerJob
+ * @param int $cSize Max titles per leaf job; Usually 1 or a modest value
+ * @param array $opts Optional parameter map
+ * @return Job[] List of Job objects
+ */
+ public static function partitionBacklinkJob( Job $job, $bSize, $cSize, $opts = array() ) {
+ $class = get_class( $job );
+ $title = $job->getTitle();
+ $params = $job->getParams();
+
+ if ( isset( $params['pages'] ) || empty( $params['recursive'] ) ) {
+ $ranges = array(); // sanity; this is a leaf node
+ wfWarn( __METHOD__ . " called on {$job->getType()} leaf job (explosive recursion)." );
+ } elseif ( isset( $params['range'] ) ) {
+ // This is a range job to trigger the insertion of partitioned/title jobs...
+ $ranges = $params['range']['subranges'];
+ $realBSize = $params['range']['batchSize'];
+ } else {
+ // This is a base job to trigger the insertion of partitioned jobs...
+ $ranges = $title->getBacklinkCache()->partition( $params['table'], $bSize );
+ $realBSize = $bSize;
+ }
+
+ $extraParams = isset( $opts['params'] ) ? $opts['params'] : array();
+
+ $jobs = array();
+ // Combine the first range (of size $bSize) backlinks into leaf jobs
+ if ( isset( $ranges[0] ) ) {
+ list( $start, $end ) = $ranges[0];
+ $titles = $title->getBacklinkCache()->getLinks( $params['table'], $start, $end );
+ foreach ( array_chunk( iterator_to_array( $titles ), $cSize ) as $titleBatch ) {
+ $pages = array();
+ foreach ( $titleBatch as $tl ) {
+ $pages[$tl->getArticleId()] = array( $tl->getNamespace(), $tl->getDBKey() );
+ }
+ $jobs[] = new $class(
+ $title, // maintain parent job title
+ array( 'pages' => $pages ) + $extraParams
+ );
+ }
+ }
+ // Take all of the remaining ranges and build a partition job from it
+ if ( isset( $ranges[1] ) ) {
+ $jobs[] = new $class(
+ $title, // maintain parent job title
+ array(
+ 'recursive' => true,
+ 'table' => $params['table'],
+ 'range' => array(
+ 'start' => $ranges[1][0],
+ 'end' => $ranges[count( $ranges ) - 1][1],
+ 'batchSize' => $realBSize,
+ 'subranges' => array_slice( $ranges, 1 )
+ ),
+ ) + $extraParams
+ );
+ }
+
+ return $jobs;
+ }
+}