From c1f9b1f7b1b77776192048005dcc66dcf3df2bfb Mon Sep 17 00:00:00 2001 From: Pierre Schmitz Date: Sat, 27 Dec 2014 15:41:37 +0100 Subject: Update to MediaWiki 1.24.1 --- includes/job/Job.php | 321 -------- includes/job/JobQueue.php | 706 ----------------- includes/job/JobQueueDB.php | 816 -------------------- includes/job/JobQueueFederated.php | 473 ------------ includes/job/JobQueueGroup.php | 427 ---------- includes/job/JobQueueRedis.php | 856 --------------------- includes/job/README | 81 -- includes/job/aggregator/JobQueueAggregator.php | 156 ---- includes/job/aggregator/JobQueueAggregatorMemc.php | 124 --- .../job/aggregator/JobQueueAggregatorRedis.php | 193 ----- includes/job/jobs/AssembleUploadChunksJob.php | 127 --- includes/job/jobs/DoubleRedirectJob.php | 221 ------ includes/job/jobs/DuplicateJob.php | 59 -- includes/job/jobs/EmaillingJob.php | 47 -- includes/job/jobs/EnotifNotifyJob.php | 58 -- includes/job/jobs/HTMLCacheUpdateJob.php | 263 ------- includes/job/jobs/NullJob.php | 76 -- includes/job/jobs/PublishStashedFileJob.php | 140 ---- includes/job/jobs/RefreshLinksJob.php | 222 ------ includes/job/jobs/UploadFromUrlJob.php | 184 ----- 20 files changed, 5550 deletions(-) delete mode 100644 includes/job/Job.php delete mode 100644 includes/job/JobQueue.php delete mode 100644 includes/job/JobQueueDB.php delete mode 100644 includes/job/JobQueueFederated.php delete mode 100644 includes/job/JobQueueGroup.php delete mode 100644 includes/job/JobQueueRedis.php delete mode 100644 includes/job/README delete mode 100644 includes/job/aggregator/JobQueueAggregator.php delete mode 100644 includes/job/aggregator/JobQueueAggregatorMemc.php delete mode 100644 includes/job/aggregator/JobQueueAggregatorRedis.php delete mode 100644 includes/job/jobs/AssembleUploadChunksJob.php delete mode 100644 includes/job/jobs/DoubleRedirectJob.php delete mode 100644 includes/job/jobs/DuplicateJob.php delete mode 100644 includes/job/jobs/EmaillingJob.php delete mode 100644 includes/job/jobs/EnotifNotifyJob.php delete mode 100644 includes/job/jobs/HTMLCacheUpdateJob.php delete mode 100644 includes/job/jobs/NullJob.php delete mode 100644 includes/job/jobs/PublishStashedFileJob.php delete mode 100644 includes/job/jobs/RefreshLinksJob.php delete mode 100644 includes/job/jobs/UploadFromUrlJob.php (limited to 'includes/job') diff --git a/includes/job/Job.php b/includes/job/Job.php deleted file mode 100644 index ab7df5d2..00000000 --- a/includes/job/Job.php +++ /dev/null @@ -1,321 +0,0 @@ -push( $jobs ); - } - - /** - * Insert a group of jobs into the queue. - * - * Same as batchInsert() but does not commit and can thus - * be rolled-back as part of a larger transaction. However, - * large batches of jobs can cause slave lag. - * - * @param array $jobs of Job objects - * @return bool - * @deprecated since 1.21 - */ - public static function safeBatchInsert( $jobs ) { - return JobQueueGroup::singleton()->push( $jobs, JobQueue::QOS_ATOMIC ); - } - - /** - * Pop a job of a certain type. This tries less hard than pop() to - * actually find a job; it may be adversely affected by concurrent job - * runners. - * - * @param $type string - * @return Job|bool Returns false if there are no jobs - * @deprecated since 1.21 - */ - public static function pop_type( $type ) { - return JobQueueGroup::singleton()->get( $type )->pop(); - } - - /** - * Pop a job off the front of the queue. - * This is subject to $wgJobTypesExcludedFromDefaultQueue. - * - * @return Job or false if there's no jobs - * @deprecated since 1.21 - */ - public static function pop() { - return JobQueueGroup::singleton()->pop(); - } - - /*------------------------------------------------------------------------- - * Non-static functions - *------------------------------------------------------------------------*/ - - /** - * @param $command - * @param $title - * @param $params array|bool - * @param $id int - */ - public function __construct( $command, $title, $params = false, $id = 0 ) { - $this->command = $command; - $this->title = $title; - $this->params = $params; - $this->id = $id; - - $this->removeDuplicates = false; // expensive jobs may set this to true - } - - /** - * @return integer May be 0 for jobs stored outside the DB - * @deprecated since 1.22 - */ - public function getId() { - return $this->id; - } - - /** - * @return string - */ - public function getType() { - return $this->command; - } - - /** - * @return Title - */ - public function getTitle() { - return $this->title; - } - - /** - * @return array - */ - public function getParams() { - return $this->params; - } - - /** - * @return integer|null UNIX timestamp to delay running this job until, otherwise null - * @since 1.22 - */ - public function getReleaseTimestamp() { - return isset( $this->params['jobReleaseTimestamp'] ) - ? wfTimestampOrNull( TS_UNIX, $this->params['jobReleaseTimestamp'] ) - : null; - } - - /** - * @return bool Whether only one of each identical set of jobs should be run - */ - public function ignoreDuplicates() { - return $this->removeDuplicates; - } - - /** - * @return bool Whether this job can be retried on failure by job runners - * @since 1.21 - */ - public function allowRetries() { - return true; - } - - /** - * Subclasses may need to override this to make duplication detection work. - * The resulting map conveys everything that makes the job unique. This is - * only checked if ignoreDuplicates() returns true, meaning that duplicate - * jobs are supposed to be ignored. - * - * @return Array Map of key/values - * @since 1.21 - */ - public function getDeduplicationInfo() { - $info = array( - 'type' => $this->getType(), - 'namespace' => $this->getTitle()->getNamespace(), - 'title' => $this->getTitle()->getDBkey(), - 'params' => $this->getParams() - ); - if ( is_array( $info['params'] ) ) { - // Identical jobs with different "root" jobs should count as duplicates - unset( $info['params']['rootJobSignature'] ); - unset( $info['params']['rootJobTimestamp'] ); - // Likewise for jobs with different delay times - unset( $info['params']['jobReleaseTimestamp'] ); - } - return $info; - } - - /** - * @see JobQueue::deduplicateRootJob() - * @param string $key A key that identifies the task - * @return Array - * @since 1.21 - */ - public static function newRootJobParams( $key ) { - return array( - 'rootJobSignature' => sha1( $key ), - 'rootJobTimestamp' => wfTimestampNow() - ); - } - - /** - * @see JobQueue::deduplicateRootJob() - * @return Array - * @since 1.21 - */ - public function getRootJobParams() { - return array( - 'rootJobSignature' => isset( $this->params['rootJobSignature'] ) - ? $this->params['rootJobSignature'] - : null, - 'rootJobTimestamp' => isset( $this->params['rootJobTimestamp'] ) - ? $this->params['rootJobTimestamp'] - : null - ); - } - - /** - * @see JobQueue::deduplicateRootJob() - * @return bool - * @since 1.22 - */ - public function hasRootJobParams() { - return isset( $this->params['rootJobSignature'] ) - && isset( $this->params['rootJobTimestamp'] ); - } - - /** - * Insert a single job into the queue. - * @return bool true on success - * @deprecated since 1.21 - */ - public function insert() { - return JobQueueGroup::singleton()->push( $this ); - } - - /** - * @return string - */ - public function toString() { - $paramString = ''; - if ( $this->params ) { - foreach ( $this->params as $key => $value ) { - if ( $paramString != '' ) { - $paramString .= ' '; - } - if ( is_array( $value ) ) { - $value = "array(" . count( $value ) . ")"; - } elseif ( is_object( $value ) && !method_exists( $value, '__toString' ) ) { - $value = "object(" . get_class( $value ) . ")"; - } - $value = (string)$value; - if ( mb_strlen( $value ) > 1024 ) { - $value = "string(" . mb_strlen( $value ) . ")"; - } - - $paramString .= "$key=$value"; - } - } - - if ( is_object( $this->title ) ) { - $s = "{$this->command} " . $this->title->getPrefixedDBkey(); - if ( $paramString !== '' ) { - $s .= ' ' . $paramString; - } - return $s; - } else { - return "{$this->command} $paramString"; - } - } - - protected function setLastError( $error ) { - $this->error = $error; - } - - public function getLastError() { - return $this->error; - } -} diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php deleted file mode 100644 index 6556ee85..00000000 --- a/includes/job/JobQueue.php +++ /dev/null @@ -1,706 +0,0 @@ -wiki = $params['wiki']; - $this->type = $params['type']; - $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0; - $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3; - if ( isset( $params['order'] ) && $params['order'] !== 'any' ) { - $this->order = $params['order']; - } else { - $this->order = $this->optimalOrder(); - } - if ( !in_array( $this->order, $this->supportedOrders() ) ) { - throw new MWException( __CLASS__ . " does not support '{$this->order}' order." ); - } - $this->checkDelay = !empty( $params['checkDelay'] ); - if ( $this->checkDelay && !$this->supportsDelayedJobs() ) { - throw new MWException( __CLASS__ . " does not support delayed jobs." ); - } - $this->dupCache = wfGetCache( CACHE_ANYTHING ); - } - - /** - * Get a job queue object of the specified type. - * $params includes: - * - class : What job class to use (determines job type) - * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki) - * - type : The name of the job types this queue handles - * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random". - * If "fifo" is used, the queue will effectively be FIFO. Note that job - * completion will not appear to be exactly FIFO if there are multiple - * job runners since jobs can take different times to finish once popped. - * If "timestamp" is used, the queue will at least be loosely ordered - * by timestamp, allowing for some jobs to be popped off out of order. - * If "random" is used, pop() will pick jobs in random order. - * Note that it may only be weakly random (e.g. a lottery of the oldest X). - * If "any" is choosen, the queue will use whatever order is the fastest. - * This might be useful for improving concurrency for job acquisition. - * - claimTTL : If supported, the queue will recycle jobs that have been popped - * but not acknowledged as completed after this many seconds. Recycling - * of jobs simple means re-inserting them into the queue. Jobs can be - * attempted up to three times before being discarded. - * - checkDelay : If supported, respect Job::getReleaseTimestamp() in the push functions. - * This lets delayed jobs wait in a staging area until a given timestamp is - * reached, at which point they will enter the queue. If this is not enabled - * or not supported, an exception will be thrown on delayed job insertion. - * - * Queue classes should throw an exception if they do not support the options given. - * - * @param $params array - * @return JobQueue - * @throws MWException - */ - final public static function factory( array $params ) { - $class = $params['class']; - if ( !class_exists( $class ) ) { - throw new MWException( "Invalid job queue class '$class'." ); - } - $obj = new $class( $params ); - if ( !( $obj instanceof self ) ) { - throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." ); - } - return $obj; - } - - /** - * @return string Wiki ID - */ - final public function getWiki() { - return $this->wiki; - } - - /** - * @return string Job type that this queue handles - */ - final public function getType() { - return $this->type; - } - - /** - * @return string One of (random, timestamp, fifo, undefined) - */ - final public function getOrder() { - return $this->order; - } - - /** - * @return bool Whether delayed jobs are enabled - * @since 1.22 - */ - final public function delayedJobsEnabled() { - return $this->checkDelay; - } - - /** - * Get the allowed queue orders for configuration validation - * - * @return Array Subset of (random, timestamp, fifo, undefined) - */ - abstract protected function supportedOrders(); - - /** - * Get the default queue order to use if configuration does not specify one - * - * @return string One of (random, timestamp, fifo, undefined) - */ - abstract protected function optimalOrder(); - - /** - * Find out if delayed jobs are supported for configuration validation - * - * @return boolean Whether delayed jobs are supported - */ - protected function supportsDelayedJobs() { - return false; // not implemented - } - - /** - * Quickly check if the queue has no available (unacquired, non-delayed) jobs. - * Queue classes should use caching if they are any slower without memcached. - * - * If caching is used, this might return false when there are actually no jobs. - * If pop() is called and returns false then it should correct the cache. Also, - * calling flushCaches() first prevents this. However, this affect is typically - * not distinguishable from the race condition between isEmpty() and pop(). - * - * @return bool - * @throws JobQueueError - */ - final public function isEmpty() { - wfProfileIn( __METHOD__ ); - $res = $this->doIsEmpty(); - wfProfileOut( __METHOD__ ); - return $res; - } - - /** - * @see JobQueue::isEmpty() - * @return bool - */ - abstract protected function doIsEmpty(); - - /** - * Get the number of available (unacquired, non-delayed) jobs in the queue. - * Queue classes should use caching if they are any slower without memcached. - * - * If caching is used, this number might be out of date for a minute. - * - * @return integer - * @throws JobQueueError - */ - final public function getSize() { - wfProfileIn( __METHOD__ ); - $res = $this->doGetSize(); - wfProfileOut( __METHOD__ ); - return $res; - } - - /** - * @see JobQueue::getSize() - * @return integer - */ - abstract protected function doGetSize(); - - /** - * Get the number of acquired jobs (these are temporarily out of the queue). - * Queue classes should use caching if they are any slower without memcached. - * - * If caching is used, this number might be out of date for a minute. - * - * @return integer - * @throws JobQueueError - */ - final public function getAcquiredCount() { - wfProfileIn( __METHOD__ ); - $res = $this->doGetAcquiredCount(); - wfProfileOut( __METHOD__ ); - return $res; - } - - /** - * @see JobQueue::getAcquiredCount() - * @return integer - */ - abstract protected function doGetAcquiredCount(); - - /** - * Get the number of delayed jobs (these are temporarily out of the queue). - * Queue classes should use caching if they are any slower without memcached. - * - * If caching is used, this number might be out of date for a minute. - * - * @return integer - * @throws JobQueueError - * @since 1.22 - */ - final public function getDelayedCount() { - wfProfileIn( __METHOD__ ); - $res = $this->doGetDelayedCount(); - wfProfileOut( __METHOD__ ); - return $res; - } - - /** - * @see JobQueue::getDelayedCount() - * @return integer - */ - protected function doGetDelayedCount() { - return 0; // not implemented - } - - /** - * Get the number of acquired jobs that can no longer be attempted. - * Queue classes should use caching if they are any slower without memcached. - * - * If caching is used, this number might be out of date for a minute. - * - * @return integer - * @throws JobQueueError - */ - final public function getAbandonedCount() { - wfProfileIn( __METHOD__ ); - $res = $this->doGetAbandonedCount(); - wfProfileOut( __METHOD__ ); - return $res; - } - - /** - * @see JobQueue::getAbandonedCount() - * @return integer - */ - protected function doGetAbandonedCount() { - return 0; // not implemented - } - - /** - * Push a single jobs into the queue. - * This does not require $wgJobClasses to be set for the given job type. - * Outside callers should use JobQueueGroup::push() instead of this function. - * - * @param $jobs Job|Array - * @param $flags integer Bitfield (supports JobQueue::QOS_ATOMIC) - * @return bool Returns false on failure - * @throws JobQueueError - */ - final public function push( $jobs, $flags = 0 ) { - return $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags ); - } - - /** - * Push a batch of jobs into the queue. - * This does not require $wgJobClasses to be set for the given job type. - * Outside callers should use JobQueueGroup::push() instead of this function. - * - * @param array $jobs List of Jobs - * @param $flags integer Bitfield (supports JobQueue::QOS_ATOMIC) - * @return bool Returns false on failure - * @throws JobQueueError - */ - final public function batchPush( array $jobs, $flags = 0 ) { - if ( !count( $jobs ) ) { - return true; // nothing to do - } - - foreach ( $jobs as $job ) { - if ( $job->getType() !== $this->type ) { - throw new MWException( - "Got '{$job->getType()}' job; expected a '{$this->type}' job." ); - } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) { - throw new MWException( - "Got delayed '{$job->getType()}' job; delays are not supported." ); - } - } - - wfProfileIn( __METHOD__ ); - $ok = $this->doBatchPush( $jobs, $flags ); - wfProfileOut( __METHOD__ ); - return $ok; - } - - /** - * @see JobQueue::batchPush() - * @return bool - */ - abstract protected function doBatchPush( array $jobs, $flags ); - - /** - * Pop a job off of the queue. - * This requires $wgJobClasses to be set for the given job type. - * Outside callers should use JobQueueGroup::pop() instead of this function. - * - * @return Job|bool Returns false if there are no jobs - * @throws JobQueueError - */ - final public function pop() { - global $wgJobClasses; - - if ( $this->wiki !== wfWikiID() ) { - throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." ); - } elseif ( !isset( $wgJobClasses[$this->type] ) ) { - // Do not pop jobs if there is no class for the queue type - throw new MWException( "Unrecognized job type '{$this->type}'." ); - } - - wfProfileIn( __METHOD__ ); - $job = $this->doPop(); - wfProfileOut( __METHOD__ ); - - // Flag this job as an old duplicate based on its "root" job... - try { - if ( $job && $this->isRootJobOldDuplicate( $job ) ) { - JobQueue::incrStats( 'job-pop-duplicate', $this->type ); - $job = DuplicateJob::newFromJob( $job ); // convert to a no-op - } - } catch ( MWException $e ) {} // don't lose jobs over this - - return $job; - } - - /** - * @see JobQueue::pop() - * @return Job - */ - abstract protected function doPop(); - - /** - * Acknowledge that a job was completed. - * - * This does nothing for certain queue classes or if "claimTTL" is not set. - * Outside callers should use JobQueueGroup::ack() instead of this function. - * - * @param $job Job - * @return bool - * @throws JobQueueError - */ - final public function ack( Job $job ) { - if ( $job->getType() !== $this->type ) { - throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); - } - wfProfileIn( __METHOD__ ); - $ok = $this->doAck( $job ); - wfProfileOut( __METHOD__ ); - return $ok; - } - - /** - * @see JobQueue::ack() - * @return bool - */ - abstract protected function doAck( Job $job ); - - /** - * Register the "root job" of a given job into the queue for de-duplication. - * This should only be called right *after* all the new jobs have been inserted. - * This is used to turn older, duplicate, job entries into no-ops. The root job - * information will remain in the registry until it simply falls out of cache. - * - * This requires that $job has two special fields in the "params" array: - * - rootJobSignature : hash (e.g. SHA1) that identifies the task - * - rootJobTimestamp : TS_MW timestamp of this instance of the task - * - * A "root job" is a conceptual job that consist of potentially many smaller jobs - * that are actually inserted into the queue. For example, "refreshLinks" jobs are - * spawned when a template is edited. One can think of the task as "update links - * of pages that use template X" and an instance of that task as a "root job". - * However, what actually goes into the queue are potentially many refreshLinks2 jobs. - * Since these jobs include things like page ID ranges and DB master positions, and morph - * into smaller refreshLinks2 jobs recursively, simple duplicate detection (like job_sha1) - * for individual jobs being identical is not useful. - * - * In the case of "refreshLinks", if these jobs are still in the queue when the template - * is edited again, we want all of these old refreshLinks jobs for that template to become - * no-ops. This can greatly reduce server load, since refreshLinks jobs involves parsing. - * Essentially, the new batch of jobs belong to a new "root job" and the older ones to a - * previous "root job" for the same task of "update links of pages that use template X". - * - * This does nothing for certain queue classes. - * - * @param $job Job - * @return bool - * @throws JobQueueError - */ - final public function deduplicateRootJob( Job $job ) { - if ( $job->getType() !== $this->type ) { - throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); - } - wfProfileIn( __METHOD__ ); - $ok = $this->doDeduplicateRootJob( $job ); - wfProfileOut( __METHOD__ ); - return $ok; - } - - /** - * @see JobQueue::deduplicateRootJob() - * @param $job Job - * @return bool - */ - protected function doDeduplicateRootJob( Job $job ) { - if ( !$job->hasRootJobParams() ) { - throw new MWException( "Cannot register root job; missing parameters." ); - } - $params = $job->getRootJobParams(); - - $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); - // Callers should call batchInsert() and then this function so that if the insert - // fails, the de-duplication registration will be aborted. Since the insert is - // deferred till "transaction idle", do the same here, so that the ordering is - // maintained. Having only the de-duplication registration succeed would cause - // jobs to become no-ops without any actual jobs that made them redundant. - $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job - if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { - return true; // a newer version of this root job was enqueued - } - - // Update the timestamp of the last root job started at the location... - return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); - } - - /** - * Check if the "root" job of a given job has been superseded by a newer one - * - * @param $job Job - * @return bool - * @throws JobQueueError - */ - final protected function isRootJobOldDuplicate( Job $job ) { - if ( $job->getType() !== $this->type ) { - throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); - } - wfProfileIn( __METHOD__ ); - $isDuplicate = $this->doIsRootJobOldDuplicate( $job ); - wfProfileOut( __METHOD__ ); - return $isDuplicate; - } - - /** - * @see JobQueue::isRootJobOldDuplicate() - * @param Job $job - * @return bool - */ - protected function doIsRootJobOldDuplicate( Job $job ) { - if ( !$job->hasRootJobParams() ) { - return false; // job has no de-deplication info - } - $params = $job->getRootJobParams(); - - $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); - // Get the last time this root job was enqueued - $timestamp = $this->dupCache->get( $key ); - - // Check if a new root job was started at the location after this one's... - return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); - } - - /** - * @param string $signature Hash identifier of the root job - * @return string - */ - protected function getRootJobCacheKey( $signature ) { - list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); - return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature ); - } - - /** - * Deleted all unclaimed and delayed jobs from the queue - * - * @return bool Success - * @throws JobQueueError - * @since 1.22 - */ - final public function delete() { - wfProfileIn( __METHOD__ ); - $res = $this->doDelete(); - wfProfileOut( __METHOD__ ); - return $res; - } - - /** - * @see JobQueue::delete() - * @return bool Success - */ - protected function doDelete() { - throw new MWException( "This method is not implemented." ); - } - - /** - * Wait for any slaves or backup servers to catch up. - * - * This does nothing for certain queue classes. - * - * @return void - * @throws JobQueueError - */ - final public function waitForBackups() { - wfProfileIn( __METHOD__ ); - $this->doWaitForBackups(); - wfProfileOut( __METHOD__ ); - } - - /** - * @see JobQueue::waitForBackups() - * @return void - */ - protected function doWaitForBackups() {} - - /** - * Return a map of task names to task definition maps. - * A "task" is a fast periodic queue maintenance action. - * Mutually exclusive tasks must implement their own locking in the callback. - * - * Each task value is an associative array with: - * - name : the name of the task - * - callback : a PHP callable that performs the task - * - period : the period in seconds corresponding to the task frequency - * - * @return Array - */ - final public function getPeriodicTasks() { - $tasks = $this->doGetPeriodicTasks(); - foreach ( $tasks as $name => &$def ) { - $def['name'] = $name; - } - return $tasks; - } - - /** - * @see JobQueue::getPeriodicTasks() - * @return Array - */ - protected function doGetPeriodicTasks() { - return array(); - } - - /** - * Clear any process and persistent caches - * - * @return void - */ - final public function flushCaches() { - wfProfileIn( __METHOD__ ); - $this->doFlushCaches(); - wfProfileOut( __METHOD__ ); - } - - /** - * @see JobQueue::flushCaches() - * @return void - */ - protected function doFlushCaches() {} - - /** - * Get an iterator to traverse over all available jobs in this queue. - * This does not include jobs that are currently acquired or delayed. - * Note: results may be stale if the queue is concurrently modified. - * - * @return Iterator - * @throws JobQueueError - */ - abstract public function getAllQueuedJobs(); - - /** - * Get an iterator to traverse over all delayed jobs in this queue. - * Note: results may be stale if the queue is concurrently modified. - * - * @return Iterator - * @throws JobQueueError - * @since 1.22 - */ - public function getAllDelayedJobs() { - return new ArrayIterator( array() ); // not implemented - } - - /** - * Do not use this function outside of JobQueue/JobQueueGroup - * - * @return string - * @since 1.22 - */ - public function getCoalesceLocationInternal() { - return null; - } - - /** - * Check whether each of the given queues are empty. - * This is used for batching checks for queues stored at the same place. - * - * @param array $types List of queues types - * @return array|null (list of non-empty queue types) or null if unsupported - * @throws MWException - * @since 1.22 - */ - final public function getSiblingQueuesWithJobs( array $types ) { - $section = new ProfileSection( __METHOD__ ); - return $this->doGetSiblingQueuesWithJobs( $types ); - } - - /** - * @see JobQueue::getSiblingQueuesWithJobs() - * @param array $types List of queues types - * @return array|null (list of queue types) or null if unsupported - */ - protected function doGetSiblingQueuesWithJobs( array $types ) { - return null; // not supported - } - - /** - * Check the size of each of the given queues. - * For queues not served by the same store as this one, 0 is returned. - * This is used for batching checks for queues stored at the same place. - * - * @param array $types List of queues types - * @return array|null (job type => whether queue is empty) or null if unsupported - * @throws MWException - * @since 1.22 - */ - final public function getSiblingQueueSizes( array $types ) { - $section = new ProfileSection( __METHOD__ ); - return $this->doGetSiblingQueueSizes( $types ); - } - - /** - * @see JobQueue::getSiblingQueuesSize() - * @param array $types List of queues types - * @return array|null (list of queue types) or null if unsupported - */ - protected function doGetSiblingQueueSizes( array $types ) { - return null; // not supported - } - - /** - * Call wfIncrStats() for the queue overall and for the queue type - * - * @param string $key Event type - * @param string $type Job type - * @param integer $delta - * @since 1.22 - */ - public static function incrStats( $key, $type, $delta = 1 ) { - wfIncrStats( $key, $delta ); - wfIncrStats( "{$key}-{$type}", $delta ); - } - - /** - * Namespace the queue with a key to isolate it for testing - * - * @param $key string - * @return void - * @throws MWException - */ - public function setTestingPrefix( $key ) { - throw new MWException( "Queue namespacing not supported for this queue type." ); - } -} - -/** - * @ingroup JobQueue - * @since 1.22 - */ -class JobQueueError extends MWException {} -class JobQueueConnectionError extends JobQueueError {} diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php deleted file mode 100644 index c39083df..00000000 --- a/includes/job/JobQueueDB.php +++ /dev/null @@ -1,816 +0,0 @@ -cluster = isset( $params['cluster'] ) ? $params['cluster'] : false; - // Make sure that we don't use the SQL cache, which would be harmful - $this->cache = ( $wgMemc instanceof SqlBagOStuff ) ? new EmptyBagOStuff() : $wgMemc; - } - - protected function supportedOrders() { - return array( 'random', 'timestamp', 'fifo' ); - } - - protected function optimalOrder() { - return 'random'; - } - - /** - * @see JobQueue::doIsEmpty() - * @return bool - */ - protected function doIsEmpty() { - $key = $this->getCacheKey( 'empty' ); - - $isEmpty = $this->cache->get( $key ); - if ( $isEmpty === 'true' ) { - return true; - } elseif ( $isEmpty === 'false' ) { - return false; - } - - $dbr = $this->getSlaveDB(); - try { - $found = $dbr->selectField( // unclaimed job - 'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__ - ); - } catch ( DBError $e ) { - $this->throwDBException( $e ); - } - $this->cache->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG ); - - return !$found; - } - - /** - * @see JobQueue::doGetSize() - * @return integer - */ - protected function doGetSize() { - $key = $this->getCacheKey( 'size' ); - - $size = $this->cache->get( $key ); - if ( is_int( $size ) ) { - return $size; - } - - try { - $dbr = $this->getSlaveDB(); - $size = (int)$dbr->selectField( 'job', 'COUNT(*)', - array( 'job_cmd' => $this->type, 'job_token' => '' ), - __METHOD__ - ); - } catch ( DBError $e ) { - $this->throwDBException( $e ); - } - $this->cache->set( $key, $size, self::CACHE_TTL_SHORT ); - - return $size; - } - - /** - * @see JobQueue::doGetAcquiredCount() - * @return integer - */ - protected function doGetAcquiredCount() { - if ( $this->claimTTL <= 0 ) { - return 0; // no acknowledgements - } - - $key = $this->getCacheKey( 'acquiredcount' ); - - $count = $this->cache->get( $key ); - if ( is_int( $count ) ) { - return $count; - } - - $dbr = $this->getSlaveDB(); - try { - $count = (int)$dbr->selectField( 'job', 'COUNT(*)', - array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ), - __METHOD__ - ); - } catch ( DBError $e ) { - $this->throwDBException( $e ); - } - $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); - - return $count; - } - - /** - * @see JobQueue::doGetAbandonedCount() - * @return integer - * @throws MWException - */ - protected function doGetAbandonedCount() { - global $wgMemc; - - if ( $this->claimTTL <= 0 ) { - return 0; // no acknowledgements - } - - $key = $this->getCacheKey( 'abandonedcount' ); - - $count = $wgMemc->get( $key ); - if ( is_int( $count ) ) { - return $count; - } - - $dbr = $this->getSlaveDB(); - try { - $count = (int)$dbr->selectField( 'job', 'COUNT(*)', - array( - 'job_cmd' => $this->type, - "job_token != {$dbr->addQuotes( '' )}", - "job_attempts >= " . $dbr->addQuotes( $this->maxTries ) - ), - __METHOD__ - ); - } catch ( DBError $e ) { - $this->throwDBException( $e ); - } - $wgMemc->set( $key, $count, self::CACHE_TTL_SHORT ); - - return $count; - } - - /** - * @see JobQueue::doBatchPush() - * @param array $jobs - * @param $flags - * @throws DBError|Exception - * @return bool - */ - protected function doBatchPush( array $jobs, $flags ) { - $dbw = $this->getMasterDB(); - - $that = $this; - $method = __METHOD__; - $dbw->onTransactionIdle( - function() use ( $dbw, $that, $jobs, $flags, $method ) { - $that->doBatchPushInternal( $dbw, $jobs, $flags, $method ); - } - ); - - return true; - } - - /** - * This function should *not* be called outside of JobQueueDB - * - * @param DatabaseBase $dbw - * @param array $jobs - * @param int $flags - * @param string $method - * @return boolean - * @throws type - */ - public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) { - if ( !count( $jobs ) ) { - return true; - } - - $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated - $rowList = array(); // list of jobs for jobs that are are not de-duplicated - foreach ( $jobs as $job ) { - $row = $this->insertFields( $job ); - if ( $job->ignoreDuplicates() ) { - $rowSet[$row['job_sha1']] = $row; - } else { - $rowList[] = $row; - } - } - - if ( $flags & self::QOS_ATOMIC ) { - $dbw->begin( $method ); // wrap all the job additions in one transaction - } - try { - // Strip out any duplicate jobs that are already in the queue... - if ( count( $rowSet ) ) { - $res = $dbw->select( 'job', 'job_sha1', - array( - // No job_type condition since it's part of the job_sha1 hash - 'job_sha1' => array_keys( $rowSet ), - 'job_token' => '' // unclaimed - ), - $method - ); - foreach ( $res as $row ) { - wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" ); - unset( $rowSet[$row->job_sha1] ); // already enqueued - } - } - // Build the full list of job rows to insert - $rows = array_merge( $rowList, array_values( $rowSet ) ); - // Insert the job rows in chunks to avoid slave lag... - foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { - $dbw->insert( 'job', $rowBatch, $method ); - } - JobQueue::incrStats( 'job-insert', $this->type, count( $rows ) ); - JobQueue::incrStats( 'job-insert-duplicate', $this->type, - count( $rowSet ) + count( $rowList ) - count( $rows ) ); - } catch ( DBError $e ) { - if ( $flags & self::QOS_ATOMIC ) { - $dbw->rollback( $method ); - } - throw $e; - } - if ( $flags & self::QOS_ATOMIC ) { - $dbw->commit( $method ); - } - - $this->cache->set( $this->getCacheKey( 'empty' ), 'false', JobQueueDB::CACHE_TTL_LONG ); - - return true; - } - - /** - * @see JobQueue::doPop() - * @return Job|bool - */ - protected function doPop() { - if ( $this->cache->get( $this->getCacheKey( 'empty' ) ) === 'true' ) { - return false; // queue is empty - } - - $dbw = $this->getMasterDB(); - try { - $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction - $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting - $dbw->clearFlag( DBO_TRX ); // make each query its own transaction - $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) { - $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting - } ); - - $uuid = wfRandomString( 32 ); // pop attempt - $job = false; // job popped off - do { // retry when our row is invalid or deleted as a duplicate - // Try to reserve a row in the DB... - if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) { - $row = $this->claimOldest( $uuid ); - } else { // random first - $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs - $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand - $row = $this->claimRandom( $uuid, $rand, $gte ); - } - // Check if we found a row to reserve... - if ( !$row ) { - $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG ); - break; // nothing to do - } - JobQueue::incrStats( 'job-pop', $this->type ); - // Get the job object from the row... - $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title ); - if ( !$title ) { - $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); - wfDebug( "Row has invalid title '{$row->job_title}'." ); - continue; // try again - } - $job = Job::factory( $row->job_cmd, $title, - self::extractBlob( $row->job_params ), $row->job_id ); - $job->metadata['id'] = $row->job_id; - $job->id = $row->job_id; // XXX: work around broken subclasses - break; // done - } while ( true ); - } catch ( DBError $e ) { - $this->throwDBException( $e ); - } - - return $job; - } - - /** - * Reserve a row with a single UPDATE without holding row locks over RTTs... - * - * @param string $uuid 32 char hex string - * @param $rand integer Random unsigned integer (31 bits) - * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random) - * @return Row|false - */ - protected function claimRandom( $uuid, $rand, $gte ) { - $dbw = $this->getMasterDB(); - // Check cache to see if the queue has <= OFFSET items - $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) ); - - $row = false; // the row acquired - $invertedDirection = false; // whether one job_random direction was already scanned - // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT - // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is - // not replication safe. Due to http://bugs.mysql.com/bug.php?id=6980, subqueries cannot - // be used here with MySQL. - do { - if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows - // For small queues, using OFFSET will overshoot and return no rows more often. - // Instead, this uses job_random to pick a row (possibly checking both directions). - $ineq = $gte ? '>=' : '<='; - $dir = $gte ? 'ASC' : 'DESC'; - $row = $dbw->selectRow( 'job', '*', // find a random job - array( - 'job_cmd' => $this->type, - 'job_token' => '', // unclaimed - "job_random {$ineq} {$dbw->addQuotes( $rand )}" ), - __METHOD__, - array( 'ORDER BY' => "job_random {$dir}" ) - ); - if ( !$row && !$invertedDirection ) { - $gte = !$gte; - $invertedDirection = true; - continue; // try the other direction - } - } else { // table *may* have >= MAX_OFFSET rows - // Bug 42614: "ORDER BY job_random" with a job_random inequality causes high CPU - // in MySQL if there are many rows for some reason. This uses a small OFFSET - // instead of job_random for reducing excess claim retries. - $row = $dbw->selectRow( 'job', '*', // find a random job - array( - 'job_cmd' => $this->type, - 'job_token' => '', // unclaimed - ), - __METHOD__, - array( 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ) - ); - if ( !$row ) { - $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows - $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 ); - continue; // use job_random - } - } - if ( $row ) { // claim the job - $dbw->update( 'job', // update by PK - array( - 'job_token' => $uuid, - 'job_token_timestamp' => $dbw->timestamp(), - 'job_attempts = job_attempts+1' ), - array( 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ), - __METHOD__ - ); - // This might get raced out by another runner when claiming the previously - // selected row. The use of job_random should minimize this problem, however. - if ( !$dbw->affectedRows() ) { - $row = false; // raced out - } - } else { - break; // nothing to do - } - } while ( !$row ); - - return $row; - } - - /** - * Reserve a row with a single UPDATE without holding row locks over RTTs... - * - * @param string $uuid 32 char hex string - * @return Row|false - */ - protected function claimOldest( $uuid ) { - $dbw = $this->getMasterDB(); - - $row = false; // the row acquired - do { - if ( $dbw->getType() === 'mysql' ) { - // Per http://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the - // same table being changed in an UPDATE query in MySQL (gives Error: 1093). - // Oracle and Postgre have no such limitation. However, MySQL offers an - // alternative here by supporting ORDER BY + LIMIT for UPDATE queries. - $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " . - "SET " . - "job_token = {$dbw->addQuotes( $uuid ) }, " . - "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " . - "job_attempts = job_attempts+1 " . - "WHERE ( " . - "job_cmd = {$dbw->addQuotes( $this->type )} " . - "AND job_token = {$dbw->addQuotes( '' )} " . - ") ORDER BY job_id ASC LIMIT 1", - __METHOD__ - ); - } else { - // Use a subquery to find the job, within an UPDATE to claim it. - // This uses as much of the DB wrapper functions as possible. - $dbw->update( 'job', - array( - 'job_token' => $uuid, - 'job_token_timestamp' => $dbw->timestamp(), - 'job_attempts = job_attempts+1' ), - array( 'job_id = (' . - $dbw->selectSQLText( 'job', 'job_id', - array( 'job_cmd' => $this->type, 'job_token' => '' ), - __METHOD__, - array( 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ) ) . - ')' - ), - __METHOD__ - ); - } - // Fetch any row that we just reserved... - if ( $dbw->affectedRows() ) { - $row = $dbw->selectRow( 'job', '*', - array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__ - ); - if ( !$row ) { // raced out by duplicate job removal - wfDebug( "Row deleted as duplicate by another process." ); - } - } else { - break; // nothing to do - } - } while ( !$row ); - - return $row; - } - - /** - * @see JobQueue::doAck() - * @param Job $job - * @throws MWException - * @return Job|bool - */ - protected function doAck( Job $job ) { - if ( !isset( $job->metadata['id'] ) ) { - throw new MWException( "Job of type '{$job->getType()}' has no ID." ); - } - - $dbw = $this->getMasterDB(); - try { - $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction - $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting - $dbw->clearFlag( DBO_TRX ); // make each query its own transaction - $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) { - $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting - } ); - - // Delete a row with a single DELETE without holding row locks over RTTs... - $dbw->delete( 'job', - array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ ); - } catch ( DBError $e ) { - $this->throwDBException( $e ); - } - - return true; - } - - /** - * @see JobQueue::doDeduplicateRootJob() - * @param Job $job - * @throws MWException - * @return bool - */ - protected function doDeduplicateRootJob( Job $job ) { - $params = $job->getParams(); - if ( !isset( $params['rootJobSignature'] ) ) { - throw new MWException( "Cannot register root job; missing 'rootJobSignature'." ); - } elseif ( !isset( $params['rootJobTimestamp'] ) ) { - throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." ); - } - $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); - // Callers should call batchInsert() and then this function so that if the insert - // fails, the de-duplication registration will be aborted. Since the insert is - // deferred till "transaction idle", do the same here, so that the ordering is - // maintained. Having only the de-duplication registration succeed would cause - // jobs to become no-ops without any actual jobs that made them redundant. - $dbw = $this->getMasterDB(); - $cache = $this->dupCache; - $dbw->onTransactionIdle( function() use ( $cache, $params, $key, $dbw ) { - $timestamp = $cache->get( $key ); // current last timestamp of this job - if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { - return true; // a newer version of this root job was enqueued - } - - // Update the timestamp of the last root job started at the location... - return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); - } ); - - return true; - } - - /** - * @see JobQueue::doDelete() - * @return bool - */ - protected function doDelete() { - $dbw = $this->getMasterDB(); - try { - $dbw->delete( 'job', array( 'job_cmd' => $this->type ) ); - } catch ( DBError $e ) { - $this->throwDBException( $e ); - } - return true; - } - - /** - * @see JobQueue::doWaitForBackups() - * @return void - */ - protected function doWaitForBackups() { - wfWaitForSlaves(); - } - - /** - * @return Array - */ - protected function doGetPeriodicTasks() { - return array( - 'recycleAndDeleteStaleJobs' => array( - 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ), - 'period' => ceil( $this->claimTTL / 2 ) - ) - ); - } - - /** - * @return void - */ - protected function doFlushCaches() { - foreach ( array( 'empty', 'size', 'acquiredcount' ) as $type ) { - $this->cache->delete( $this->getCacheKey( $type ) ); - } - } - - /** - * @see JobQueue::getAllQueuedJobs() - * @return Iterator - */ - public function getAllQueuedJobs() { - $dbr = $this->getSlaveDB(); - try { - return new MappedIterator( - $dbr->select( 'job', '*', - array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ), - function( $row ) use ( $dbr ) { - $job = Job::factory( - $row->job_cmd, - Title::makeTitle( $row->job_namespace, $row->job_title ), - strlen( $row->job_params ) ? unserialize( $row->job_params ) : false, - $row->job_id - ); - $job->metadata['id'] = $row->job_id; - $job->id = $row->job_id; // XXX: work around broken subclasses - return $job; - } - ); - } catch ( DBError $e ) { - $this->throwDBException( $e ); - } - } - - public function getCoalesceLocationInternal() { - return $this->cluster - ? "DBCluster:{$this->cluster}:{$this->wiki}" - : "LBFactory:{$this->wiki}"; - } - - protected function doGetSiblingQueuesWithJobs( array $types ) { - $dbr = $this->getSlaveDB(); - $res = $dbr->select( 'job', 'DISTINCT job_cmd', - array( 'job_cmd' => $types ), __METHOD__ ); - - $types = array(); - foreach ( $res as $row ) { - $types[] = $row->job_cmd; - } - return $types; - } - - protected function doGetSiblingQueueSizes( array $types ) { - $dbr = $this->getSlaveDB(); - $res = $dbr->select( 'job', array( 'job_cmd', 'COUNT(*) AS count' ), - array( 'job_cmd' => $types ), __METHOD__, array( 'GROUP BY' => 'job_cmd' ) ); - - $sizes = array(); - foreach ( $res as $row ) { - $sizes[$row->job_cmd] = (int)$row->count; - } - return $sizes; - } - - /** - * Recycle or destroy any jobs that have been claimed for too long - * - * @return integer Number of jobs recycled/deleted - */ - public function recycleAndDeleteStaleJobs() { - $now = time(); - $count = 0; // affected rows - $dbw = $this->getMasterDB(); - - try { - if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) { - return $count; // already in progress - } - - // Remove claims on jobs acquired for too long if enabled... - if ( $this->claimTTL > 0 ) { - $claimCutoff = $dbw->timestamp( $now - $this->claimTTL ); - // Get the IDs of jobs that have be claimed but not finished after too long. - // These jobs can be recycled into the queue by expiring the claim. Selecting - // the IDs first means that the UPDATE can be done by primary key (less deadlocks). - $res = $dbw->select( 'job', 'job_id', - array( - 'job_cmd' => $this->type, - "job_token != {$dbw->addQuotes( '' )}", // was acquired - "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale - "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ), // retries left - __METHOD__ - ); - $ids = array_map( - function( $o ) { - return $o->job_id; - }, iterator_to_array( $res ) - ); - if ( count( $ids ) ) { - // Reset job_token for these jobs so that other runners will pick them up. - // Set the timestamp to the current time, as it is useful to now that the job - // was already tried before (the timestamp becomes the "released" time). - $dbw->update( 'job', - array( - 'job_token' => '', - 'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release - array( - 'job_id' => $ids ), - __METHOD__ - ); - $count += $dbw->affectedRows(); - JobQueue::incrStats( 'job-recycle', $this->type, $dbw->affectedRows() ); - $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG ); - } - } - - // Just destroy any stale jobs... - $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE ); - $conds = array( - 'job_cmd' => $this->type, - "job_token != {$dbw->addQuotes( '' )}", // was acquired - "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale - ); - if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times... - $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}"; - } - // Get the IDs of jobs that are considered stale and should be removed. Selecting - // the IDs first means that the UPDATE can be done by primary key (less deadlocks). - $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ ); - $ids = array_map( - function( $o ) { - return $o->job_id; - }, iterator_to_array( $res ) - ); - if ( count( $ids ) ) { - $dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ ); - $count += $dbw->affectedRows(); - JobQueue::incrStats( 'job-abandon', $this->type, $dbw->affectedRows() ); - } - - $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ ); - } catch ( DBError $e ) { - $this->throwDBException( $e ); - } - - return $count; - } - - /** - * @param $job Job - * @return array - */ - protected function insertFields( Job $job ) { - $dbw = $this->getMasterDB(); - return array( - // Fields that describe the nature of the job - 'job_cmd' => $job->getType(), - 'job_namespace' => $job->getTitle()->getNamespace(), - 'job_title' => $job->getTitle()->getDBkey(), - 'job_params' => self::makeBlob( $job->getParams() ), - // Additional job metadata - 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ), - 'job_timestamp' => $dbw->timestamp(), - 'job_sha1' => wfBaseConvert( - sha1( serialize( $job->getDeduplicationInfo() ) ), - 16, 36, 31 - ), - 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM ) - ); - } - - /** - * @return DBConnRef - */ - protected function getSlaveDB() { - try { - return $this->getDB( DB_SLAVE ); - } catch ( DBConnectionError $e ) { - throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); - } - } - - /** - * @return DBConnRef - */ - protected function getMasterDB() { - try { - return $this->getDB( DB_MASTER ); - } catch ( DBConnectionError $e ) { - throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); - } - } - - /** - * @param $index integer (DB_SLAVE/DB_MASTER) - * @return DBConnRef - */ - protected function getDB( $index ) { - $lb = ( $this->cluster !== false ) - ? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki ) - : wfGetLB( $this->wiki ); - return $lb->getConnectionRef( $index, array(), $this->wiki ); - } - - /** - * @return string - */ - private function getCacheKey( $property ) { - list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); - $cluster = is_string( $this->cluster ) ? $this->cluster : 'main'; - return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property ); - } - - /** - * @param $params - * @return string - */ - protected static function makeBlob( $params ) { - if ( $params !== false ) { - return serialize( $params ); - } else { - return ''; - } - } - - /** - * @param $blob - * @return bool|mixed - */ - protected static function extractBlob( $blob ) { - if ( (string)$blob !== '' ) { - return unserialize( $blob ); - } else { - return false; - } - } - - /** - * @param DBError $e - * @throws JobQueueError - */ - protected function throwDBException( DBError $e ) { - throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() ); - } -} diff --git a/includes/job/JobQueueFederated.php b/includes/job/JobQueueFederated.php deleted file mode 100644 index d3ce164a..00000000 --- a/includes/job/JobQueueFederated.php +++ /dev/null @@ -1,473 +0,0 @@ - weight) reverse sorted by weight */ - protected $partitionMap = array(); - /** @var Array (partition name => JobQueue) reverse sorted by weight */ - protected $partitionQueues = array(); - /** @var HashRing */ - protected $partitionPushRing; - /** @var BagOStuff */ - protected $cache; - - const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating - const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date - - /** - * @params include: - * - sectionsByWiki : A map of wiki IDs to section names. - * Wikis will default to using the section "default". - * - partitionsBySection : Map of section names to maps of (partition name => weight). - * A section called 'default' must be defined if not all wikis - * have explicitly defined sections. - * - configByPartition : Map of queue partition names to configuration arrays. - * These configuration arrays are passed to JobQueue::factory(). - * The options set here are overriden by those passed to this - * the federated queue itself (e.g. 'order' and 'claimTTL'). - * - partitionsNoPush : List of partition names that can handle pop() but not push(). - * This can be used to migrate away from a certain partition. - * @param array $params - */ - protected function __construct( array $params ) { - parent::__construct( $params ); - $section = isset( $params['sectionsByWiki'][$this->wiki] ) - ? $params['sectionsByWiki'][$this->wiki] - : 'default'; - if ( !isset( $params['partitionsBySection'][$section] ) ) { - throw new MWException( "No configuration for section '$section'." ); - } - // Get the full partition map - $this->partitionMap = $params['partitionsBySection'][$section]; - arsort( $this->partitionMap, SORT_NUMERIC ); - // Get the partitions jobs can actually be pushed to - $partitionPushMap = $this->partitionMap; - if ( isset( $params['partitionsNoPush'] ) ) { - foreach ( $params['partitionsNoPush'] as $partition ) { - unset( $partitionPushMap[$partition] ); - } - } - // Get the config to pass to merge into each partition queue config - $baseConfig = $params; - foreach ( array( 'class', 'sectionsByWiki', - 'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o ) - { - unset( $baseConfig[$o] ); - } - // Get the partition queue objects - foreach ( $this->partitionMap as $partition => $w ) { - if ( !isset( $params['configByPartition'][$partition] ) ) { - throw new MWException( "No configuration for partition '$partition'." ); - } - $this->partitionQueues[$partition] = JobQueue::factory( - $baseConfig + $params['configByPartition'][$partition] ); - } - // Get the ring of partitions to push jobs into - $this->partitionPushRing = new HashRing( $partitionPushMap ); - // Aggregate cache some per-queue values if there are multiple partition queues - $this->cache = count( $this->partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff(); - } - - protected function supportedOrders() { - // No FIFO due to partitioning, though "rough timestamp order" is supported - return array( 'undefined', 'random', 'timestamp' ); - } - - protected function optimalOrder() { - return 'undefined'; // defer to the partitions - } - - protected function supportsDelayedJobs() { - return true; // defer checks to the partitions - } - - protected function doIsEmpty() { - $key = $this->getCacheKey( 'empty' ); - - $isEmpty = $this->cache->get( $key ); - if ( $isEmpty === 'true' ) { - return true; - } elseif ( $isEmpty === 'false' ) { - return false; - } - - foreach ( $this->partitionQueues as $queue ) { - try { - if ( !$queue->doIsEmpty() ) { - $this->cache->add( $key, 'false', self::CACHE_TTL_LONG ); - return false; - } - } catch ( JobQueueError $e ) { - MWExceptionHandler::logException( $e ); - } - } - - $this->cache->add( $key, 'true', self::CACHE_TTL_LONG ); - return true; - } - - protected function doGetSize() { - return $this->getCrossPartitionSum( 'size', 'doGetSize' ); - } - - protected function doGetAcquiredCount() { - return $this->getCrossPartitionSum( 'acquiredcount', 'doGetAcquiredCount' ); - } - - protected function doGetDelayedCount() { - return $this->getCrossPartitionSum( 'delayedcount', 'doGetDelayedCount' ); - } - - protected function doGetAbandonedCount() { - return $this->getCrossPartitionSum( 'abandonedcount', 'doGetAbandonedCount' ); - } - - /** - * @param string $type - * @param string $method - * @return integer - */ - protected function getCrossPartitionSum( $type, $method ) { - $key = $this->getCacheKey( $type ); - - $count = $this->cache->get( $key ); - if ( is_int( $count ) ) { - return $count; - } - - $count = 0; - foreach ( $this->partitionQueues as $queue ) { - try { - $count += $queue->$method(); - } catch ( JobQueueError $e ) { - MWExceptionHandler::logException( $e ); - } - } - - $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); - return $count; - } - - protected function doBatchPush( array $jobs, $flags ) { - if ( !count( $jobs ) ) { - return true; // nothing to do - } - // Local ring variable that may be changed to point to a new ring on failure - $partitionRing = $this->partitionPushRing; - // Try to insert the jobs and update $partitionsTry on any failures - $jobsLeft = $this->tryJobInsertions( $jobs, $partitionRing, $flags ); - if ( count( $jobsLeft ) ) { // some jobs failed to insert? - // Try to insert the remaning jobs once more, ignoring the bad partitions - return !count( $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags ) ); - } - return true; - } - - /** - * @param array $jobs - * @param HashRing $partitionRing - * @param integer $flags - * @return array List of Job object that could not be inserted - */ - protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) { - $jobsLeft = array(); - - // Because jobs are spread across partitions, per-job de-duplication needs - // to use a consistent hash to avoid allowing duplicate jobs per partition. - // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded. - $uJobsByPartition = array(); // (partition name => job list) - foreach ( $jobs as $key => $job ) { - if ( $job->ignoreDuplicates() ) { - $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) ); - $uJobsByPartition[$partitionRing->getLocation( $sha1 )][] = $job; - unset( $jobs[$key] ); - } - } - // Get the batches of jobs that are not de-duplicated - if ( $flags & self::QOS_ATOMIC ) { - $nuJobBatches = array( $jobs ); // all or nothing - } else { - // Split the jobs into batches and spread them out over servers if there - // are many jobs. This helps keep the partitions even. Otherwise, send all - // the jobs to a single partition queue to avoids the extra connections. - $nuJobBatches = array_chunk( $jobs, 300 ); - } - - // Insert the de-duplicated jobs into the queues... - foreach ( $uJobsByPartition as $partition => $jobBatch ) { - $queue = $this->partitionQueues[$partition]; - try { - $ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); - } catch ( JobQueueError $e ) { - $ok = false; - MWExceptionHandler::logException( $e ); - } - if ( $ok ) { - $key = $this->getCacheKey( 'empty' ); - $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG ); - } else { - $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist - if ( !$partitionRing ) { - throw new JobQueueError( "Could not insert job(s), all partitions are down." ); - } - $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted - } - } - - // Insert the jobs that are not de-duplicated into the queues... - foreach ( $nuJobBatches as $jobBatch ) { - $partition = ArrayUtils::pickRandom( $partitionRing->getLocationWeights() ); - $queue = $this->partitionQueues[$partition]; - try { - $ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); - } catch ( JobQueueError $e ) { - $ok = false; - MWExceptionHandler::logException( $e ); - } - if ( $ok ) { - $key = $this->getCacheKey( 'empty' ); - $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG ); - } else { - $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist - if ( !$partitionRing ) { - throw new JobQueueError( "Could not insert job(s), all partitions are down." ); - } - $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted - } - } - - return $jobsLeft; - } - - protected function doPop() { - $key = $this->getCacheKey( 'empty' ); - - $isEmpty = $this->cache->get( $key ); - if ( $isEmpty === 'true' ) { - return false; - } - - $partitionsTry = $this->partitionMap; // (partition => weight) - - while ( count( $partitionsTry ) ) { - $partition = ArrayUtils::pickRandom( $partitionsTry ); - if ( $partition === false ) { - break; // all partitions at 0 weight - } - $queue = $this->partitionQueues[$partition]; - try { - $job = $queue->pop(); - } catch ( JobQueueError $e ) { - $job = false; - MWExceptionHandler::logException( $e ); - } - if ( $job ) { - $job->metadata['QueuePartition'] = $partition; - return $job; - } else { - unset( $partitionsTry[$partition] ); // blacklist partition - } - } - - $this->cache->set( $key, 'true', JobQueueDB::CACHE_TTL_LONG ); - return false; - } - - protected function doAck( Job $job ) { - if ( !isset( $job->metadata['QueuePartition'] ) ) { - throw new MWException( "The given job has no defined partition name." ); - } - return $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job ); - } - - protected function doIsRootJobOldDuplicate( Job $job ) { - $params = $job->getRootJobParams(); - $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 ); - try { - return $this->partitionQueues[$partitions[0]]->doIsRootJobOldDuplicate( $job ); - } catch ( JobQueueError $e ) { - if ( isset( $partitions[1] ) ) { // check fallback partition - return $this->partitionQueues[$partitions[1]]->doIsRootJobOldDuplicate( $job ); - } - } - return false; - } - - protected function doDeduplicateRootJob( Job $job ) { - $params = $job->getRootJobParams(); - $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 ); - try { - return $this->partitionQueues[$partitions[0]]->doDeduplicateRootJob( $job ); - } catch ( JobQueueError $e ) { - if ( isset( $partitions[1] ) ) { // check fallback partition - return $this->partitionQueues[$partitions[1]]->doDeduplicateRootJob( $job ); - } - } - return false; - } - - protected function doDelete() { - foreach ( $this->partitionQueues as $queue ) { - try { - $queue->doDelete(); - } catch ( JobQueueError $e ) { - MWExceptionHandler::logException( $e ); - } - } - } - - protected function doWaitForBackups() { - foreach ( $this->partitionQueues as $queue ) { - try { - $queue->waitForBackups(); - } catch ( JobQueueError $e ) { - MWExceptionHandler::logException( $e ); - } - } - } - - protected function doGetPeriodicTasks() { - $tasks = array(); - foreach ( $this->partitionQueues as $partition => $queue ) { - foreach ( $queue->getPeriodicTasks() as $task => $def ) { - $tasks["{$partition}:{$task}"] = $def; - } - } - return $tasks; - } - - protected function doFlushCaches() { - static $types = array( - 'empty', - 'size', - 'acquiredcount', - 'delayedcount', - 'abandonedcount' - ); - foreach ( $types as $type ) { - $this->cache->delete( $this->getCacheKey( $type ) ); - } - foreach ( $this->partitionQueues as $queue ) { - $queue->doFlushCaches(); - } - } - - public function getAllQueuedJobs() { - $iterator = new AppendIterator(); - foreach ( $this->partitionQueues as $queue ) { - $iterator->append( $queue->getAllQueuedJobs() ); - } - return $iterator; - } - - public function getAllDelayedJobs() { - $iterator = new AppendIterator(); - foreach ( $this->partitionQueues as $queue ) { - $iterator->append( $queue->getAllDelayedJobs() ); - } - return $iterator; - } - - public function getCoalesceLocationInternal() { - return "JobQueueFederated:wiki:{$this->wiki}" . - sha1( serialize( array_keys( $this->partitionMap ) ) ); - } - - protected function doGetSiblingQueuesWithJobs( array $types ) { - $result = array(); - foreach ( $this->partitionQueues as $queue ) { - try { - $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types ); - if ( is_array( $nonEmpty ) ) { - $result = array_unique( array_merge( $result, $nonEmpty ) ); - } else { - return null; // not supported on all partitions; bail - } - if ( count( $result ) == count( $types ) ) { - break; // short-circuit - } - } catch ( JobQueueError $e ) { - MWExceptionHandler::logException( $e ); - } - } - return array_values( $result ); - } - - protected function doGetSiblingQueueSizes( array $types ) { - $result = array(); - foreach ( $this->partitionQueues as $queue ) { - try { - $sizes = $queue->doGetSiblingQueueSizes( $types ); - if ( is_array( $sizes ) ) { - foreach ( $sizes as $type => $size ) { - $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size; - } - } else { - return null; // not supported on all partitions; bail - } - } catch ( JobQueueError $e ) { - MWExceptionHandler::logException( $e ); - } - } - return $result; - } - - public function setTestingPrefix( $key ) { - foreach ( $this->partitionQueues as $queue ) { - $queue->setTestingPrefix( $key ); - } - } - - /** - * @return string - */ - private function getCacheKey( $property ) { - list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); - return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property ); - } -} diff --git a/includes/job/JobQueueGroup.php b/includes/job/JobQueueGroup.php deleted file mode 100644 index fa7fee5f..00000000 --- a/includes/job/JobQueueGroup.php +++ /dev/null @@ -1,427 +0,0 @@ - (queue => JobQueue, types => list of types) */ - protected $coalescedQueues; - - const TYPE_DEFAULT = 1; // integer; jobs popped by default - const TYPE_ANY = 2; // integer; any job - - const USE_CACHE = 1; // integer; use process or persistent cache - const USE_PRIORITY = 2; // integer; respect deprioritization - - const PROC_CACHE_TTL = 15; // integer; seconds - - const CACHE_VERSION = 1; // integer; cache version - - /** - * @param string $wiki Wiki ID - */ - protected function __construct( $wiki ) { - $this->wiki = $wiki; - $this->cache = new ProcessCacheLRU( 10 ); - } - - /** - * @param string $wiki Wiki ID - * @return JobQueueGroup - */ - public static function singleton( $wiki = false ) { - $wiki = ( $wiki === false ) ? wfWikiID() : $wiki; - if ( !isset( self::$instances[$wiki] ) ) { - self::$instances[$wiki] = new self( $wiki ); - } - return self::$instances[$wiki]; - } - - /** - * Destroy the singleton instances - * - * @return void - */ - public static function destroySingletons() { - self::$instances = array(); - } - - /** - * Get the job queue object for a given queue type - * - * @param $type string - * @return JobQueue - */ - public function get( $type ) { - global $wgJobTypeConf; - - $conf = array( 'wiki' => $this->wiki, 'type' => $type ); - if ( isset( $wgJobTypeConf[$type] ) ) { - $conf = $conf + $wgJobTypeConf[$type]; - } else { - $conf = $conf + $wgJobTypeConf['default']; - } - - return JobQueue::factory( $conf ); - } - - /** - * Insert jobs into the respective queues of with the belong. - * - * This inserts the jobs into the queue specified by $wgJobTypeConf - * and updates the aggregate job queue information cache as needed. - * - * @param $jobs Job|array A single Job or a list of Jobs - * @throws MWException - * @return bool - */ - public function push( $jobs ) { - $jobs = is_array( $jobs ) ? $jobs : array( $jobs ); - - $jobsByType = array(); // (job type => list of jobs) - foreach ( $jobs as $job ) { - if ( $job instanceof Job ) { - $jobsByType[$job->getType()][] = $job; - } else { - throw new MWException( "Attempted to push a non-Job object into a queue." ); - } - } - - $ok = true; - foreach ( $jobsByType as $type => $jobs ) { - if ( $this->get( $type )->push( $jobs ) ) { - JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type ); - } else { - $ok = false; - } - } - - if ( $this->cache->has( 'queues-ready', 'list' ) ) { - $list = $this->cache->get( 'queues-ready', 'list' ); - if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) { - $this->cache->clear( 'queues-ready' ); - } - } - - return $ok; - } - - /** - * Pop a job off one of the job queues - * - * This pops a job off a queue as specified by $wgJobTypeConf and - * updates the aggregate job queue information cache as needed. - * - * @param $qtype integer|string JobQueueGroup::TYPE_DEFAULT or type string - * @param $flags integer Bitfield of JobQueueGroup::USE_* constants - * @return Job|bool Returns false on failure - */ - public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0 ) { - if ( is_string( $qtype ) ) { // specific job type - if ( ( $flags & self::USE_PRIORITY ) && $this->isQueueDeprioritized( $qtype ) ) { - return false; // back off - } - $job = $this->get( $qtype )->pop(); - if ( !$job ) { - JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype ); - } - return $job; - } else { // any job in the "default" jobs types - if ( $flags & self::USE_CACHE ) { - if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) { - $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() ); - } - $types = $this->cache->get( 'queues-ready', 'list' ); - } else { - $types = $this->getQueuesWithJobs(); - } - - if ( $qtype == self::TYPE_DEFAULT ) { - $types = array_intersect( $types, $this->getDefaultQueueTypes() ); - } - shuffle( $types ); // avoid starvation - - foreach ( $types as $type ) { // for each queue... - if ( ( $flags & self::USE_PRIORITY ) && $this->isQueueDeprioritized( $type ) ) { - continue; // back off - } - $job = $this->get( $type )->pop(); - if ( $job ) { // found - return $job; - } else { // not found - JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type ); - $this->cache->clear( 'queues-ready' ); - } - } - - return false; // no jobs found - } - } - - /** - * Acknowledge that a job was completed - * - * @param $job Job - * @return bool - */ - public function ack( Job $job ) { - return $this->get( $job->getType() )->ack( $job ); - } - - /** - * Register the "root job" of a given job into the queue for de-duplication. - * This should only be called right *after* all the new jobs have been inserted. - * - * @param $job Job - * @return bool - */ - public function deduplicateRootJob( Job $job ) { - return $this->get( $job->getType() )->deduplicateRootJob( $job ); - } - - /** - * Wait for any slaves or backup queue servers to catch up. - * - * This does nothing for certain queue classes. - * - * @return void - * @throws MWException - */ - public function waitForBackups() { - global $wgJobTypeConf; - - wfProfileIn( __METHOD__ ); - // Try to avoid doing this more than once per queue storage medium - foreach ( $wgJobTypeConf as $type => $conf ) { - $this->get( $type )->waitForBackups(); - } - wfProfileOut( __METHOD__ ); - } - - /** - * Get the list of queue types - * - * @return array List of strings - */ - public function getQueueTypes() { - return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) ); - } - - /** - * Get the list of default queue types - * - * @return array List of strings - */ - public function getDefaultQueueTypes() { - global $wgJobTypesExcludedFromDefaultQueue; - - return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue ); - } - - /** - * Get the list of job types that have non-empty queues - * - * @return Array List of job types that have non-empty queues - */ - public function getQueuesWithJobs() { - $types = array(); - foreach ( $this->getCoalescedQueues() as $info ) { - $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() ); - if ( is_array( $nonEmpty ) ) { // batching features supported - $types = array_merge( $types, $nonEmpty ); - } else { // we have to go through the queues in the bucket one-by-one - foreach ( $info['types'] as $type ) { - if ( !$this->get( $type )->isEmpty() ) { - $types[] = $type; - } - } - } - } - return $types; - } - - /** - * Get the size of the queus for a list of job types - * - * @return Array Map of (job type => size) - */ - public function getQueueSizes() { - $sizeMap = array(); - foreach ( $this->getCoalescedQueues() as $info ) { - $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() ); - if ( is_array( $sizes ) ) { // batching features supported - $sizeMap = $sizeMap + $sizes; - } else { // we have to go through the queues in the bucket one-by-one - foreach ( $info['types'] as $type ) { - $sizeMap[$type] = $this->get( $type )->getSize(); - } - } - } - return $sizeMap; - } - - /** - * @return array - */ - protected function getCoalescedQueues() { - global $wgJobTypeConf; - - if ( $this->coalescedQueues === null ) { - $this->coalescedQueues = array(); - foreach ( $wgJobTypeConf as $type => $conf ) { - $queue = JobQueue::factory( - array( 'wiki' => $this->wiki, 'type' => 'null' ) + $conf ); - $loc = $queue->getCoalesceLocationInternal(); - if ( !isset( $this->coalescedQueues[$loc] ) ) { - $this->coalescedQueues[$loc]['queue'] = $queue; - $this->coalescedQueues[$loc]['types'] = array(); - } - if ( $type === 'default' ) { - $this->coalescedQueues[$loc]['types'] = array_merge( - $this->coalescedQueues[$loc]['types'], - array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) ) - ); - } else { - $this->coalescedQueues[$loc]['types'][] = $type; - } - } - } - - return $this->coalescedQueues; - } - - /** - * Check if jobs should not be popped of a queue right now. - * This is only used for performance, such as to avoid spamming - * the queue with many sub-jobs before they actually get run. - * - * @param $type string - * @return bool - */ - public function isQueueDeprioritized( $type ) { - if ( $this->cache->has( 'isDeprioritized', $type, 5 ) ) { - return $this->cache->get( 'isDeprioritized', $type ); - } - if ( $type === 'refreshLinks2' ) { - // Don't keep converting refreshLinks2 => refreshLinks jobs if the - // later jobs have not been done yet. This helps throttle queue spam. - $deprioritized = !$this->get( 'refreshLinks' )->isEmpty(); - $this->cache->set( 'isDeprioritized', $type, $deprioritized ); - return $deprioritized; - } - return false; - } - - /** - * Execute any due periodic queue maintenance tasks for all queues. - * - * A task is "due" if the time ellapsed since the last run is greater than - * the defined run period. Concurrent calls to this function will cause tasks - * to be attempted twice, so they may need their own methods of mutual exclusion. - * - * @return integer Number of tasks run - */ - public function executeReadyPeriodicTasks() { - global $wgMemc; - - list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); - $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' ); - $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp) - - $count = 0; - $tasksRun = array(); // (queue => task => UNIX timestamp) - foreach ( $this->getQueueTypes() as $type ) { - $queue = $this->get( $type ); - foreach ( $queue->getPeriodicTasks() as $task => $definition ) { - if ( $definition['period'] <= 0 ) { - continue; // disabled - } elseif ( !isset( $lastRuns[$type][$task] ) - || $lastRuns[$type][$task] < ( time() - $definition['period'] ) ) - { - try { - if ( call_user_func( $definition['callback'] ) !== null ) { - $tasksRun[$type][$task] = time(); - ++$count; - } - } catch ( JobQueueError $e ) { - MWExceptionHandler::logException( $e ); - } - } - } - } - - $wgMemc->merge( $key, function( $cache, $key, $lastRuns ) use ( $tasksRun ) { - if ( is_array( $lastRuns ) ) { - foreach ( $tasksRun as $type => $tasks ) { - foreach ( $tasks as $task => $timestamp ) { - if ( !isset( $lastRuns[$type][$task] ) - || $timestamp > $lastRuns[$type][$task] ) - { - $lastRuns[$type][$task] = $timestamp; - } - } - } - } else { - $lastRuns = $tasksRun; - } - return $lastRuns; - } ); - - return $count; - } - - /** - * @param $name string - * @return mixed - */ - private function getCachedConfigVar( $name ) { - global $wgConf, $wgMemc; - - if ( $this->wiki === wfWikiID() ) { - return $GLOBALS[$name]; // common case - } else { - list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); - $key = wfForeignMemcKey( $db, $prefix, 'configvalue', $name ); - $value = $wgMemc->get( $key ); // ('v' => ...) or false - if ( is_array( $value ) ) { - return $value['v']; - } else { - $value = $wgConf->getConfig( $this->wiki, $name ); - $wgMemc->set( $key, array( 'v' => $value ), 86400 + mt_rand( 0, 86400 ) ); - return $value; - } - } - } -} diff --git a/includes/job/JobQueueRedis.php b/includes/job/JobQueueRedis.php deleted file mode 100644 index 378e1755..00000000 --- a/includes/job/JobQueueRedis.php +++ /dev/null @@ -1,856 +0,0 @@ - job ID) for unclaimed jobs used for de-duplication - * - h-sha1ById : A hash of (job ID => SHA1) for unclaimed jobs used for de-duplication - * - h-attempts : A hash of (job ID => attempt count) used for job claiming/retries - * - h-data : A hash of (job ID => serialized blobs) for job storage - * A job ID can be in only one of z-delayed, l-unclaimed, z-claimed, and z-abandoned. - * If an ID appears in any of those lists, it should have a h-data entry for its ID. - * If a job has a SHA1 de-duplication value and its ID is in l-unclaimed or z-delayed, then - * there should be no other such jobs with that SHA1. Every h-idBySha1 entry has an h-sha1ById - * entry and every h-sha1ById must refer to an ID that is l-unclaimed. If a job has its - * ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID. - * - * Additionally, "rootjob:* keys track "root jobs" used for additional de-duplication. - * Aside from root job keys, all keys have no expiry, and are only removed when jobs are run. - * All the keys are prefixed with the relevant wiki ID information. - * - * This class requires Redis 2.6 as it makes use Lua scripts for fast atomic operations. - * Additionally, it should be noted that redis has different persistence modes, such - * as rdb snapshots, journaling, and no persistent. Appropriate configuration should be - * made on the servers based on what queues are using it and what tolerance they have. - * - * @ingroup JobQueue - * @ingroup Redis - * @since 1.22 - */ -class JobQueueRedis extends JobQueue { - /** @var RedisConnectionPool */ - protected $redisPool; - - protected $server; // string; server address - protected $compression; // string; compression method to use - - const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days) - - protected $key; // string; key to prefix the queue keys with (used for testing) - - /** - * @params include: - * - redisConfig : An array of parameters to RedisConnectionPool::__construct(). - * Note that the serializer option is ignored "none" is always used. - * - redisServer : A hostname/port combination or the absolute path of a UNIX socket. - * If a hostname is specified but no port, the standard port number - * 6379 will be used. Required. - * - compression : The type of compression to use; one of (none,gzip). - * @param array $params - */ - public function __construct( array $params ) { - parent::__construct( $params ); - $params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua - $this->server = $params['redisServer']; - $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none'; - $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); - } - - protected function supportedOrders() { - return array( 'timestamp', 'fifo' ); - } - - protected function optimalOrder() { - return 'fifo'; - } - - protected function supportsDelayedJobs() { - return true; - } - - /** - * @see JobQueue::doIsEmpty() - * @return bool - * @throws MWException - */ - protected function doIsEmpty() { - return $this->doGetSize() == 0; - } - - /** - * @see JobQueue::doGetSize() - * @return integer - * @throws MWException - */ - protected function doGetSize() { - $conn = $this->getConnection(); - try { - return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ); - } catch ( RedisException $e ) { - $this->throwRedisException( $this->server, $conn, $e ); - } - } - - /** - * @see JobQueue::doGetAcquiredCount() - * @return integer - * @throws MWException - */ - protected function doGetAcquiredCount() { - if ( $this->claimTTL <= 0 ) { - return 0; // no acknowledgements - } - $conn = $this->getConnection(); - try { - $conn->multi( Redis::PIPELINE ); - $conn->zSize( $this->getQueueKey( 'z-claimed' ) ); - $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); - return array_sum( $conn->exec() ); - } catch ( RedisException $e ) { - $this->throwRedisException( $this->server, $conn, $e ); - } - } - - /** - * @see JobQueue::doGetDelayedCount() - * @return integer - * @throws MWException - */ - protected function doGetDelayedCount() { - if ( !$this->checkDelay ) { - return 0; // no delayed jobs - } - $conn = $this->getConnection(); - try { - return $conn->zSize( $this->getQueueKey( 'z-delayed' ) ); - } catch ( RedisException $e ) { - $this->throwRedisException( $this->server, $conn, $e ); - } - } - - /** - * @see JobQueue::doGetAbandonedCount() - * @return integer - * @throws MWException - */ - protected function doGetAbandonedCount() { - if ( $this->claimTTL <= 0 ) { - return 0; // no acknowledgements - } - $conn = $this->getConnection(); - try { - return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); - } catch ( RedisException $e ) { - $this->throwRedisException( $this->server, $conn, $e ); - } - } - - /** - * @see JobQueue::doBatchPush() - * @param array $jobs - * @param $flags - * @return bool - * @throws MWException - */ - protected function doBatchPush( array $jobs, $flags ) { - // Convert the jobs into field maps (de-duplicated against each other) - $items = array(); // (job ID => job fields map) - foreach ( $jobs as $job ) { - $item = $this->getNewJobFields( $job ); - if ( strlen( $item['sha1'] ) ) { // hash identifier => de-duplicate - $items[$item['sha1']] = $item; - } else { - $items[$item['uuid']] = $item; - } - } - - if ( !count( $items ) ) { - return true; // nothing to do - } - - $conn = $this->getConnection(); - try { - // Actually push the non-duplicate jobs into the queue... - if ( $flags & self::QOS_ATOMIC ) { - $batches = array( $items ); // all or nothing - } else { - $batches = array_chunk( $items, 500 ); // avoid tying up the server - } - $failed = 0; - $pushed = 0; - foreach ( $batches as $itemBatch ) { - $added = $this->pushBlobs( $conn, $itemBatch ); - if ( is_int( $added ) ) { - $pushed += $added; - } else { - $failed += count( $itemBatch ); - } - } - if ( $failed > 0 ) { - wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." ); - return false; - } - JobQueue::incrStats( 'job-insert', $this->type, count( $items ) ); - JobQueue::incrStats( 'job-insert-duplicate', $this->type, - count( $items ) - $failed - $pushed ); - } catch ( RedisException $e ) { - $this->throwRedisException( $this->server, $conn, $e ); - } - - return true; - } - - /** - * @param RedisConnRef $conn - * @param array $items List of results from JobQueueRedis::getNewJobFields() - * @return integer Number of jobs inserted (duplicates are ignored) - * @throws RedisException - */ - protected function pushBlobs( RedisConnRef $conn, array $items ) { - $args = array(); // ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] ) - foreach ( $items as $item ) { - $args[] = (string)$item['uuid']; - $args[] = (string)$item['sha1']; - $args[] = (string)$item['rtimestamp']; - $args[] = (string)$this->serialize( $item ); - } - static $script = -<< 0 then - -- Insert into delayed queue (release time as score) - redis.call('zAdd',KEYS[4],rtimestamp,id) - else - -- Insert into unclaimed queue - redis.call('lPush',KEYS[1],id) - end - if sha1 ~= '' then - redis.call('hSet',KEYS[2],id,sha1) - redis.call('hSet',KEYS[3],sha1,id) - end - redis.call('hSet',KEYS[5],id,blob) - pushed = pushed + 1 - end - end - return pushed -LUA; - return $conn->luaEval( $script, - array_merge( - array( - $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] - $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] - $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] - $this->getQueueKey( 'z-delayed' ), # KEYS[4] - $this->getQueueKey( 'h-data' ), # KEYS[5] - ), - $args - ), - 5 # number of first argument(s) that are keys - ); - } - - /** - * @see JobQueue::doPop() - * @return Job|bool - * @throws MWException - */ - protected function doPop() { - $job = false; - - // Push ready delayed jobs into the queue every 10 jobs to spread the load. - // This is also done as a periodic task, but we don't want too much done at once. - if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) { - $this->releaseReadyDelayedJobs(); - } - - $conn = $this->getConnection(); - try { - do { - if ( $this->claimTTL > 0 ) { - // Keep the claimed job list down for high-traffic queues - if ( mt_rand( 0, 99 ) == 0 ) { - $this->recycleAndDeleteStaleJobs(); - } - $blob = $this->popAndAcquireBlob( $conn ); - } else { - $blob = $this->popAndDeleteBlob( $conn ); - } - if ( $blob === false ) { - break; // no jobs; nothing to do - } - - JobQueue::incrStats( 'job-pop', $this->type ); - $item = $this->unserialize( $blob ); - if ( $item === false ) { - wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." ); - continue; - } - - // If $item is invalid, recycleAndDeleteStaleJobs() will cleanup as needed - $job = $this->getJobFromFields( $item ); // may be false - } while ( !$job ); // job may be false if invalid - } catch ( RedisException $e ) { - $this->throwRedisException( $this->server, $conn, $e ); - } - - return $job; - } - - /** - * @param RedisConnRef $conn - * @return array serialized string or false - * @throws RedisException - */ - protected function popAndDeleteBlob( RedisConnRef $conn ) { - static $script = -<<luaEval( $script, - array( - $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] - $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] - $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] - $this->getQueueKey( 'h-data' ), # KEYS[4] - ), - 4 # number of first argument(s) that are keys - ); - } - - /** - * @param RedisConnRef $conn - * @return array serialized string or false - * @throws RedisException - */ - protected function popAndAcquireBlob( RedisConnRef $conn ) { - static $script = -<<luaEval( $script, - array( - $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] - $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] - $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] - $this->getQueueKey( 'z-claimed' ), # KEYS[4] - $this->getQueueKey( 'h-attempts' ), # KEYS[5] - $this->getQueueKey( 'h-data' ), # KEYS[6] - time(), # ARGV[1] (injected to be replication-safe) - ), - 6 # number of first argument(s) that are keys - ); - } - - /** - * @see JobQueue::doAck() - * @param Job $job - * @return Job|bool - * @throws MWException - */ - protected function doAck( Job $job ) { - if ( !isset( $job->metadata['uuid'] ) ) { - throw new MWException( "Job of type '{$job->getType()}' has no UUID." ); - } - if ( $this->claimTTL > 0 ) { - $conn = $this->getConnection(); - try { - static $script = -<<luaEval( $script, - array( - $this->getQueueKey( 'z-claimed' ), # KEYS[1] - $this->getQueueKey( 'h-attempts' ), # KEYS[2] - $this->getQueueKey( 'h-data' ), # KEYS[3] - $job->metadata['uuid'] # ARGV[1] - ), - 3 # number of first argument(s) that are keys - ); - - if ( !$res ) { - wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." ); - return false; - } - } catch ( RedisException $e ) { - $this->throwRedisException( $this->server, $conn, $e ); - } - } - return true; - } - - /** - * @see JobQueue::doDeduplicateRootJob() - * @param Job $job - * @return bool - * @throws MWException - */ - protected function doDeduplicateRootJob( Job $job ) { - if ( !$job->hasRootJobParams() ) { - throw new MWException( "Cannot register root job; missing parameters." ); - } - $params = $job->getRootJobParams(); - - $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); - - $conn = $this->getConnection(); - try { - $timestamp = $conn->get( $key ); // current last timestamp of this job - if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { - return true; // a newer version of this root job was enqueued - } - // Update the timestamp of the last root job started at the location... - return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks - } catch ( RedisException $e ) { - $this->throwRedisException( $this->server, $conn, $e ); - } - } - - /** - * @see JobQueue::doIsRootJobOldDuplicate() - * @param Job $job - * @return bool - */ - protected function doIsRootJobOldDuplicate( Job $job ) { - if ( !$job->hasRootJobParams() ) { - return false; // job has no de-deplication info - } - $params = $job->getRootJobParams(); - - $conn = $this->getConnection(); - try { - // Get the last time this root job was enqueued - $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) ); - } catch ( RedisException $e ) { - $this->throwRedisException( $this->server, $conn, $e ); - } - - // Check if a new root job was started at the location after this one's... - return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); - } - - /** - * @see JobQueue::doDelete() - * @return bool - */ - protected function doDelete() { - static $props = array( 'l-unclaimed', 'z-claimed', 'z-abandoned', - 'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' ); - - $conn = $this->getConnection(); - try { - $keys = array(); - foreach ( $props as $prop ) { - $keys[] = $this->getQueueKey( $prop ); - } - return ( $conn->delete( $keys ) !== false ); - } catch ( RedisException $e ) { - $this->throwRedisException( $this->server, $conn, $e ); - } - } - - /** - * @see JobQueue::getAllQueuedJobs() - * @return Iterator - */ - public function getAllQueuedJobs() { - $conn = $this->getConnection(); - try { - $that = $this; - return new MappedIterator( - $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ), - function( $uid ) use ( $that, $conn ) { - return $that->getJobFromUidInternal( $uid, $conn ); - }, - array( 'accept' => function ( $job ) { return is_object( $job ); } ) - ); - } catch ( RedisException $e ) { - $this->throwRedisException( $this->server, $conn, $e ); - } - } - - /** - * @see JobQueue::getAllQueuedJobs() - * @return Iterator - */ - public function getAllDelayedJobs() { - $conn = $this->getConnection(); - try { - $that = $this; - return new MappedIterator( // delayed jobs - $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ), - function( $uid ) use ( $that, $conn ) { - return $that->getJobFromUidInternal( $uid, $conn ); - }, - array( 'accept' => function ( $job ) { return is_object( $job ); } ) - ); - } catch ( RedisException $e ) { - $this->throwRedisException( $this->server, $conn, $e ); - } - } - - public function getCoalesceLocationInternal() { - return "RedisServer:" . $this->server; - } - - protected function doGetSiblingQueuesWithJobs( array $types ) { - return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) ); - } - - protected function doGetSiblingQueueSizes( array $types ) { - $sizes = array(); // (type => size) - $types = array_values( $types ); // reindex - try { - $conn = $this->getConnection(); - $conn->multi( Redis::PIPELINE ); - foreach ( $types as $type ) { - $conn->lSize( $this->getQueueKey( 'l-unclaimed', $type ) ); - } - $res = $conn->exec(); - if ( is_array( $res ) ) { - foreach ( $res as $i => $size ) { - $sizes[$types[$i]] = $size; - } - } - } catch ( RedisException $e ) { - $this->throwRedisException( $this->server, $conn, $e ); - } - return $sizes; - } - - /** - * This function should not be called outside JobQueueRedis - * - * @param $uid string - * @param $conn RedisConnRef - * @return Job|bool Returns false if the job does not exist - * @throws MWException - */ - public function getJobFromUidInternal( $uid, RedisConnRef $conn ) { - try { - $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ); - if ( $data === false ) { - return false; // not found - } - $item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) ); - if ( !is_array( $item ) ) { // this shouldn't happen - throw new MWException( "Could not find job with ID '$uid'." ); - } - $title = Title::makeTitle( $item['namespace'], $item['title'] ); - $job = Job::factory( $item['type'], $title, $item['params'] ); - $job->metadata['uuid'] = $item['uuid']; - return $job; - } catch ( RedisException $e ) { - $this->throwRedisException( $this->server, $conn, $e ); - } - } - - /** - * Release any ready delayed jobs into the queue - * - * @return integer Number of jobs released - * @throws MWException - */ - public function releaseReadyDelayedJobs() { - $count = 0; - - $conn = $this->getConnection(); - try { - static $script = -<<luaEval( $script, - array( - $this->getQueueKey( 'z-delayed' ), // KEYS[1] - $this->getQueueKey( 'l-unclaimed' ), // KEYS[2] - time() // ARGV[1]; max "delay until" UNIX timestamp - ), - 2 # first two arguments are keys - ); - } catch ( RedisException $e ) { - $this->throwRedisException( $this->server, $conn, $e ); - } - - return $count; - } - - /** - * Recycle or destroy any jobs that have been claimed for too long - * - * @return integer Number of jobs recycled/deleted - * @throws MWException - */ - public function recycleAndDeleteStaleJobs() { - if ( $this->claimTTL <= 0 ) { // sanity - throw new MWException( "Cannot recycle jobs since acknowledgements are disabled." ); - } - $count = 0; - // For each job item that can be retried, we need to add it back to the - // main queue and remove it from the list of currenty claimed job items. - // For those that cannot, they are marked as dead and kept around for - // investigation and manual job restoration but are eventually deleted. - $conn = $this->getConnection(); - try { - $now = time(); - static $script = -<<luaEval( $script, - array( - $this->getQueueKey( 'z-claimed' ), # KEYS[1] - $this->getQueueKey( 'h-attempts' ), # KEYS[2] - $this->getQueueKey( 'l-unclaimed' ), # KEYS[3] - $this->getQueueKey( 'h-data' ), # KEYS[4] - $this->getQueueKey( 'z-abandoned' ), # KEYS[5] - $now - $this->claimTTL, # ARGV[1] - $now - self::MAX_AGE_PRUNE, # ARGV[2] - $this->maxTries # ARGV[3] - ), - 5 # number of first argument(s) that are keys - ); - if ( $res ) { - list( $released, $abandoned, $pruned ) = $res; - $count += $released + $pruned; - JobQueue::incrStats( 'job-recycle', $this->type, $released ); - JobQueue::incrStats( 'job-abandon', $this->type, $abandoned ); - } - } catch ( RedisException $e ) { - $this->throwRedisException( $this->server, $conn, $e ); - } - - return $count; - } - - /** - * @return Array - */ - protected function doGetPeriodicTasks() { - $tasks = array(); - if ( $this->claimTTL > 0 ) { - $tasks['recycleAndDeleteStaleJobs'] = array( - 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ), - 'period' => ceil( $this->claimTTL / 2 ) - ); - } - if ( $this->checkDelay ) { - $tasks['releaseReadyDelayedJobs'] = array( - 'callback' => array( $this, 'releaseReadyDelayedJobs' ), - 'period' => 300 // 5 minutes - ); - } - return $tasks; - } - - /** - * @param $job Job - * @return array - */ - protected function getNewJobFields( Job $job ) { - return array( - // Fields that describe the nature of the job - 'type' => $job->getType(), - 'namespace' => $job->getTitle()->getNamespace(), - 'title' => $job->getTitle()->getDBkey(), - 'params' => $job->getParams(), - // Some jobs cannot run until a "release timestamp" - 'rtimestamp' => $job->getReleaseTimestamp() ?: 0, - // Additional job metadata - 'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ), - 'sha1' => $job->ignoreDuplicates() - ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 ) - : '', - 'timestamp' => time() // UNIX timestamp - ); - } - - /** - * @param $fields array - * @return Job|bool - */ - protected function getJobFromFields( array $fields ) { - $title = Title::makeTitleSafe( $fields['namespace'], $fields['title'] ); - if ( $title ) { - $job = Job::factory( $fields['type'], $title, $fields['params'] ); - $job->metadata['uuid'] = $fields['uuid']; - return $job; - } - return false; - } - - /** - * @param array $fields - * @return string Serialized and possibly compressed version of $fields - */ - protected function serialize( array $fields ) { - $blob = serialize( $fields ); - if ( $this->compression === 'gzip' - && strlen( $blob ) >= 1024 && function_exists( 'gzdeflate' ) ) - { - $object = (object)array( 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' ); - $blobz = serialize( $object ); - return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob; - } else { - return $blob; - } - } - - /** - * @param string $blob - * @return array|bool Unserialized version of $blob or false - */ - protected function unserialize( $blob ) { - $fields = unserialize( $blob ); - if ( is_object( $fields ) ) { - if ( $fields->enc === 'gzip' && function_exists( 'gzinflate' ) ) { - $fields = unserialize( gzinflate( $fields->blob ) ); - } else { - $fields = false; - } - } - return is_array( $fields ) ? $fields : false; - } - - /** - * Get a connection to the server that handles all sub-queues for this queue - * - * @return Array (server name, Redis instance) - * @throws MWException - */ - protected function getConnection() { - $conn = $this->redisPool->getConnection( $this->server ); - if ( !$conn ) { - throw new JobQueueConnectionError( "Unable to connect to redis server." ); - } - return $conn; - } - - /** - * @param $server string - * @param $conn RedisConnRef - * @param $e RedisException - * @throws MWException - */ - protected function throwRedisException( $server, RedisConnRef $conn, $e ) { - $this->redisPool->handleException( $server, $conn, $e ); - throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" ); - } - - /** - * @param $prop string - * @param $type string|null - * @return string - */ - private function getQueueKey( $prop, $type = null ) { - $type = is_string( $type ) ? $type : $this->type; - list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); - if ( strlen( $this->key ) ) { // namespaced queue (for testing) - return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $this->key, $prop ); - } else { - return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $prop ); - } - } - - /** - * @param $key string - * @return void - */ - public function setTestingPrefix( $key ) { - $this->key = $key; - } -} diff --git a/includes/job/README b/includes/job/README deleted file mode 100644 index c11d5a78..00000000 --- a/includes/job/README +++ /dev/null @@ -1,81 +0,0 @@ -/*! -\ingroup JobQueue -\page jobqueue_design Job queue design - -Notes on the Job queuing system architecture. - -\section intro Introduction - -The data model consist of the following main components: -* The Job object represents a particular deferred task that happens in the - background. All jobs subclass the Job object and put the main logic in the - function called run(). -* The JobQueue object represents a particular queue of jobs of a certain type. - For example there may be a queue for email jobs and a queue for squid purge - jobs. - -\section jobqueue Job queues - -Each job type has its own queue and is associated to a storage medium. One -queue might save its jobs in redis while another one uses would use a database. - -Storage medium are defined in a queue class. Before using it, you must -define in $wgJobTypeConf a mapping of the job type to a queue class. - -The factory class JobQueueGroup provides helper functions: -- getting the queue for a given job -- route new job insertions to the proper queue - -The following queue classes are available: -* JobQueueDB (stores jobs in the `job` table in a database) -* JobQueueRedis (stores jobs in a redis server) - -All queue classes support some basic operations (though some may be no-ops): -* enqueueing a batch of jobs -* dequeueing a single job -* acknowledging a job is completed -* checking if the queue is empty - -Some queue classes (like JobQueueDB) may dequeue jobs in random order while other -queues might dequeue jobs in exact FIFO order. Callers should thus not assume jobs -are executed in FIFO order. - -Also note that not all queue classes will have the same reliability guarantees. -In-memory queues may lose data when restarted depending on snapshot and journal -settings (including journal fsync() frequency). Some queue types may totally remove -jobs when dequeued while leaving the ack() function as a no-op; if a job is -dequeued by a job runner, which crashes before completion, the job will be -lost. Some jobs, like purging squid caches after a template change, may not -require durable queues, whereas other jobs might be more important. - -\section aggregator Job queue aggregator - -The aggregators are used by nextJobDB.php, which is a script that will return a -random ready queue (on any wiki in the farm) that can be used with runJobs.php. -This can be used in conjunction with any scripts that handle wiki farm job queues. -Note that $wgLocalDatabases defines what wikis are in the wiki farm. - -Since each job type has its own queue, and wiki-farms may have many wikis, -there might be a large number of queues to keep track of. To avoid wasting -large amounts of time polling empty queues, aggregators exists to keep track -of which queues are ready. - -The following queue aggregator classes are available: -* JobQueueAggregatorMemc (uses $wgMemc to track ready queues) -* JobQueueAggregatorRedis (uses a redis server to track ready queues) - -Some aggregators cache data for a few minutes while others may be always up to date. -This can be an important factor for jobs that need a low pickup time (or latency). - -\section jobs Jobs - -Callers should also try to make jobs maintain correctness when executed twice. -This is useful for queues that actually implement ack(), since they may recycle -dequeued but un-acknowledged jobs back into the queue to be attempted again. If -a runner dequeues a job, runs it, but then crashes before calling ack(), the -job may be returned to the queue and run a second time. Jobs like cache purging can -happen several times without any correctness problems. However, a pathological case -would be if a bug causes the problem to systematically keep repeating. For example, -a job may always throw a DB error at the end of run(). This problem is trickier to -solve and more obnoxious for things like email jobs, for example. For such jobs, -it might be useful to use a queue that does not retry jobs. diff --git a/includes/job/aggregator/JobQueueAggregator.php b/includes/job/aggregator/JobQueueAggregator.php deleted file mode 100644 index a8186abd..00000000 --- a/includes/job/aggregator/JobQueueAggregator.php +++ /dev/null @@ -1,156 +0,0 @@ -doNotifyQueueEmpty( $wiki, $type ); - wfProfileOut( __METHOD__ ); - return $ok; - } - - /** - * @see JobQueueAggregator::notifyQueueEmpty() - */ - abstract protected function doNotifyQueueEmpty( $wiki, $type ); - - /** - * Mark a queue as being non-empty - * - * @param string $wiki - * @param string $type - * @return bool Success - */ - final public function notifyQueueNonEmpty( $wiki, $type ) { - wfProfileIn( __METHOD__ ); - $ok = $this->doNotifyQueueNonEmpty( $wiki, $type ); - wfProfileOut( __METHOD__ ); - return $ok; - } - - /** - * @see JobQueueAggregator::notifyQueueNonEmpty() - */ - abstract protected function doNotifyQueueNonEmpty( $wiki, $type ); - - /** - * Get the list of all of the queues with jobs - * - * @return Array (job type => (list of wiki IDs)) - */ - final public function getAllReadyWikiQueues() { - wfProfileIn( __METHOD__ ); - $res = $this->doGetAllReadyWikiQueues(); - wfProfileOut( __METHOD__ ); - return $res; - } - - /** - * @see JobQueueAggregator::getAllReadyWikiQueues() - */ - abstract protected function doGetAllReadyWikiQueues(); - - /** - * Purge all of the aggregator information - * - * @return bool Success - */ - final public function purge() { - wfProfileIn( __METHOD__ ); - $res = $this->doPurge(); - wfProfileOut( __METHOD__ ); - return $res; - } - - /** - * @see JobQueueAggregator::purge() - */ - abstract protected function doPurge(); - - /** - * Get all databases that have a pending job. - * This poll all the queues and is this expensive. - * - * @return Array (job type => (list of wiki IDs)) - */ - protected function findPendingWikiQueues() { - global $wgLocalDatabases; - - $pendingDBs = array(); // (job type => (db list)) - foreach ( $wgLocalDatabases as $db ) { - foreach ( JobQueueGroup::singleton( $db )->getQueuesWithJobs() as $type ) { - $pendingDBs[$type][] = $db; - } - } - - return $pendingDBs; - } -} diff --git a/includes/job/aggregator/JobQueueAggregatorMemc.php b/includes/job/aggregator/JobQueueAggregatorMemc.php deleted file mode 100644 index 9434da04..00000000 --- a/includes/job/aggregator/JobQueueAggregatorMemc.php +++ /dev/null @@ -1,124 +0,0 @@ -cache = isset( $params['objectCache'] ) - ? wfGetCache( $params['objectCache'] ) - : wfGetMainCache(); - $this->cacheTTL = isset( $params['cacheTTL'] ) ? $params['cacheTTL'] : 180; // 3 min - } - - /** - * @see JobQueueAggregator::doNotifyQueueEmpty() - */ - protected function doNotifyQueueEmpty( $wiki, $type ) { - $key = $this->getReadyQueueCacheKey(); - // Delist the queue from the "ready queue" list - if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock - $curInfo = $this->cache->get( $key ); - if ( is_array( $curInfo ) && isset( $curInfo['pendingDBs'][$type] ) ) { - if ( in_array( $wiki, $curInfo['pendingDBs'][$type] ) ) { - $curInfo['pendingDBs'][$type] = array_diff( - $curInfo['pendingDBs'][$type], array( $wiki ) ); - $this->cache->set( $key, $curInfo ); - } - } - $this->cache->delete( "$key:lock" ); // unlock - } - return true; - } - - /** - * @see JobQueueAggregator::doNotifyQueueNonEmpty() - */ - protected function doNotifyQueueNonEmpty( $wiki, $type ) { - return true; // updated periodically - } - - /** - * @see JobQueueAggregator::doAllGetReadyWikiQueues() - */ - protected function doGetAllReadyWikiQueues() { - $key = $this->getReadyQueueCacheKey(); - // If the cache entry wasn't present, is stale, or in .1% of cases otherwise, - // regenerate the cache. Use any available stale cache if another process is - // currently regenerating the pending DB information. - $pendingDbInfo = $this->cache->get( $key ); - if ( !is_array( $pendingDbInfo ) - || ( time() - $pendingDbInfo['timestamp'] ) > $this->cacheTTL - || mt_rand( 0, 999 ) == 0 - ) { - if ( $this->cache->add( "$key:rebuild", 1, 1800 ) ) { // lock - $pendingDbInfo = array( - 'pendingDBs' => $this->findPendingWikiQueues(), - 'timestamp' => time() - ); - for ( $attempts = 1; $attempts <= 25; ++$attempts ) { - if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock - $this->cache->set( $key, $pendingDbInfo ); - $this->cache->delete( "$key:lock" ); // unlock - break; - } - } - $this->cache->delete( "$key:rebuild" ); // unlock - } - } - return is_array( $pendingDbInfo ) - ? $pendingDbInfo['pendingDBs'] - : array(); // cache is both empty and locked - } - - /** - * @see JobQueueAggregator::doPurge() - */ - protected function doPurge() { - return $this->cache->delete( $this->getReadyQueueCacheKey() ); - } - - /** - * @return string - */ - private function getReadyQueueCacheKey() { - return "jobqueue:aggregator:ready-queues:v1"; // global - } -} diff --git a/includes/job/aggregator/JobQueueAggregatorRedis.php b/includes/job/aggregator/JobQueueAggregatorRedis.php deleted file mode 100644 index c6a799df..00000000 --- a/includes/job/aggregator/JobQueueAggregatorRedis.php +++ /dev/null @@ -1,193 +0,0 @@ -server = $params['redisServer']; - $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); - } - - /** - * @see JobQueueAggregator::doNotifyQueueEmpty() - */ - protected function doNotifyQueueEmpty( $wiki, $type ) { - $conn = $this->getConnection(); - if ( !$conn ) { - return false; - } - try { - $conn->hDel( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ) ); - return true; - } catch ( RedisException $e ) { - $this->handleException( $conn, $e ); - return false; - } - } - - /** - * @see JobQueueAggregator::doNotifyQueueNonEmpty() - */ - protected function doNotifyQueueNonEmpty( $wiki, $type ) { - $conn = $this->getConnection(); - if ( !$conn ) { - return false; - } - try { - $conn->hSet( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ), time() ); - return true; - } catch ( RedisException $e ) { - $this->handleException( $conn, $e ); - return false; - } - } - - /** - * @see JobQueueAggregator::doAllGetReadyWikiQueues() - */ - protected function doGetAllReadyWikiQueues() { - $conn = $this->getConnection(); - if ( !$conn ) { - return array(); - } - try { - $conn->multi( Redis::PIPELINE ); - $conn->exists( $this->getReadyQueueKey() ); - $conn->hGetAll( $this->getReadyQueueKey() ); - list( $exists, $map ) = $conn->exec(); - - if ( $exists ) { // cache hit - $pendingDBs = array(); // (type => list of wikis) - foreach ( $map as $key => $time ) { - list( $type, $wiki ) = $this->dencQueueName( $key ); - $pendingDBs[$type][] = $wiki; - } - } else { // cache miss - // Avoid duplicated effort - $conn->multi( Redis::MULTI ); - $conn->setnx( $this->getReadyQueueKey() . ":lock", 1 ); - $conn->expire( $this->getReadyQueueKey() . ":lock", 3600 ); - if ( $conn->exec() !== array( true, true ) ) { // lock - return array(); // already in progress - } - - $pendingDBs = $this->findPendingWikiQueues(); // (type => list of wikis) - - $conn->delete( $this->getReadyQueueKey() . ":lock" ); // unlock - - $now = time(); - $map = array(); - foreach ( $pendingDBs as $type => $wikis ) { - foreach ( $wikis as $wiki ) { - $map[$this->encQueueName( $type, $wiki )] = $now; - } - } - $conn->hMSet( $this->getReadyQueueKey(), $map ); - } - - return $pendingDBs; - } catch ( RedisException $e ) { - $this->handleException( $conn, $e ); - return array(); - } - } - - /** - * @see JobQueueAggregator::doPurge() - */ - protected function doPurge() { - $conn = $this->getConnection(); - if ( !$conn ) { - return false; - } - try { - $conn->delete( $this->getReadyQueueKey() ); - } catch ( RedisException $e ) { - $this->handleException( $conn, $e ); - return false; - } - return true; - } - - /** - * Get a connection to the server that handles all sub-queues for this queue - * - * @return Array (server name, Redis instance) - * @throws MWException - */ - protected function getConnection() { - return $this->redisPool->getConnection( $this->server ); - } - - /** - * @param RedisConnRef $conn - * @param RedisException $e - * @return void - */ - protected function handleException( RedisConnRef $conn, $e ) { - $this->redisPool->handleException( $this->server, $conn, $e ); - } - - /** - * @return string - */ - private function getReadyQueueKey() { - return "jobqueue:aggregator:h-ready-queues:v1"; // global - } - - /** - * @param string $type - * @param string $wiki - * @return string - */ - private function encQueueName( $type, $wiki ) { - return rawurlencode( $type ) . '/' . rawurlencode( $wiki ); - } - - /** - * @param string $name - * @return string - */ - private function dencQueueName( $name ) { - list( $type, $wiki ) = explode( '/', $name, 2 ); - return array( rawurldecode( $type ), rawurldecode( $wiki ) ); - } -} diff --git a/includes/job/jobs/AssembleUploadChunksJob.php b/includes/job/jobs/AssembleUploadChunksJob.php deleted file mode 100644 index 6237e568..00000000 --- a/includes/job/jobs/AssembleUploadChunksJob.php +++ /dev/null @@ -1,127 +0,0 @@ -removeDuplicates = true; - } - - public function run() { - $scope = RequestContext::importScopedSession( $this->params['session'] ); - $context = RequestContext::getMain(); - try { - $user = $context->getUser(); - if ( !$user->isLoggedIn() ) { - $this->setLastError( "Could not load the author user from session." ); - return false; - } - - if ( count( $_SESSION ) === 0 ) { - // Empty session probably indicates that we didn't associate - // with the session correctly. Note that being able to load - // the user does not necessarily mean the session was loaded. - // Most likely cause by suhosin.session.encrypt = On. - $this->setLastError( "Error associating with user session. Try setting suhosin.session.encrypt = Off" ); - return false; - } - - UploadBase::setSessionStatus( - $this->params['filekey'], - array( 'result' => 'Poll', 'stage' => 'assembling', 'status' => Status::newGood() ) - ); - - $upload = new UploadFromChunks( $user ); - $upload->continueChunks( - $this->params['filename'], - $this->params['filekey'], - $context->getRequest() - ); - - // Combine all of the chunks into a local file and upload that to a new stash file - $status = $upload->concatenateChunks(); - if ( !$status->isGood() ) { - UploadBase::setSessionStatus( - $this->params['filekey'], - array( 'result' => 'Failure', 'stage' => 'assembling', 'status' => $status ) - ); - $this->setLastError( $status->getWikiText() ); - return false; - } - - // We have a new filekey for the fully concatenated file - $newFileKey = $upload->getLocalFile()->getFileKey(); - - // Remove the old stash file row and first chunk file - $upload->stash->removeFileNoAuth( $this->params['filekey'] ); - - // Build the image info array while we have the local reference handy - $apiMain = new ApiMain(); // dummy object (XXX) - $imageInfo = $upload->getImageInfo( $apiMain->getResult() ); - - // Cleanup any temporary local file - $upload->cleanupTempFile(); - - // Cache the info so the user doesn't have to wait forever to get the final info - UploadBase::setSessionStatus( - $this->params['filekey'], - array( - 'result' => 'Success', - 'stage' => 'assembling', - 'filekey' => $newFileKey, - 'imageinfo' => $imageInfo, - 'status' => Status::newGood() - ) - ); - } catch ( MWException $e ) { - UploadBase::setSessionStatus( - $this->params['filekey'], - array( - 'result' => 'Failure', - 'stage' => 'assembling', - 'status' => Status::newFatal( 'api-error-stashfailed' ) - ) - ); - $this->setLastError( get_class( $e ) . ": " . $e->getText() ); - return false; - } - return true; - } - - public function getDeduplicationInfo() { - $info = parent::getDeduplicationInfo(); - if ( is_array( $info['params'] ) ) { - $info['params'] = array( 'filekey' => $info['params']['filekey'] ); - } - return $info; - } - - public function allowRetries() { - return false; - } -} diff --git a/includes/job/jobs/DoubleRedirectJob.php b/includes/job/jobs/DoubleRedirectJob.php deleted file mode 100644 index 33e749b8..00000000 --- a/includes/job/jobs/DoubleRedirectJob.php +++ /dev/null @@ -1,221 +0,0 @@ -" - * @param $redirTitle Title: the title which has changed, redirects pointing to this title are fixed - * @param bool $destTitle Not used - */ - public static function fixRedirects( $reason, $redirTitle, $destTitle = false ) { - # Need to use the master to get the redirect table updated in the same transaction - $dbw = wfGetDB( DB_MASTER ); - $res = $dbw->select( - array( 'redirect', 'page' ), - array( 'page_namespace', 'page_title' ), - array( - 'page_id = rd_from', - 'rd_namespace' => $redirTitle->getNamespace(), - 'rd_title' => $redirTitle->getDBkey() - ), __METHOD__ ); - if ( !$res->numRows() ) { - return; - } - $jobs = array(); - foreach ( $res as $row ) { - $title = Title::makeTitle( $row->page_namespace, $row->page_title ); - if ( !$title ) { - continue; - } - - $jobs[] = new self( $title, array( - 'reason' => $reason, - 'redirTitle' => $redirTitle->getPrefixedDBkey() ) ); - # Avoid excessive memory usage - if ( count( $jobs ) > 10000 ) { - JobQueueGroup::singleton()->push( $jobs ); - $jobs = array(); - } - } - JobQueueGroup::singleton()->push( $jobs ); - } - - function __construct( $title, $params = false, $id = 0 ) { - parent::__construct( 'fixDoubleRedirect', $title, $params, $id ); - $this->reason = $params['reason']; - $this->redirTitle = Title::newFromText( $params['redirTitle'] ); - } - - /** - * @return bool - */ - function run() { - if ( !$this->redirTitle ) { - $this->setLastError( 'Invalid title' ); - return false; - } - - $targetRev = Revision::newFromTitle( $this->title, false, Revision::READ_LATEST ); - if ( !$targetRev ) { - wfDebug( __METHOD__ . ": target redirect already deleted, ignoring\n" ); - return true; - } - $content = $targetRev->getContent(); - $currentDest = $content ? $content->getRedirectTarget() : null; - if ( !$currentDest || !$currentDest->equals( $this->redirTitle ) ) { - wfDebug( __METHOD__ . ": Redirect has changed since the job was queued\n" ); - return true; - } - - // Check for a suppression tag (used e.g. in periodically archived discussions) - $mw = MagicWord::get( 'staticredirect' ); - if ( $content->matchMagicWord( $mw ) ) { - wfDebug( __METHOD__ . ": skipping: suppressed with __STATICREDIRECT__\n" ); - return true; - } - - // Find the current final destination - $newTitle = self::getFinalDestination( $this->redirTitle ); - if ( !$newTitle ) { - wfDebug( __METHOD__ . ": skipping: single redirect, circular redirect or invalid redirect destination\n" ); - return true; - } - if ( $newTitle->equals( $this->redirTitle ) ) { - // The redirect is already right, no need to change it - // This can happen if the page was moved back (say after vandalism) - wfDebug( __METHOD__ . " : skipping, already good\n" ); - } - - // Preserve fragment (bug 14904) - $newTitle = Title::makeTitle( $newTitle->getNamespace(), $newTitle->getDBkey(), - $currentDest->getFragment(), $newTitle->getInterwiki() ); - - // Fix the text - $newContent = $content->updateRedirect( $newTitle ); - - if ( $newContent->equals( $content ) ) { - $this->setLastError( 'Content unchanged???' ); - return false; - } - - $user = $this->getUser(); - if ( !$user ) { - $this->setLastError( 'Invalid user' ); - return false; - } - - // Save it - global $wgUser; - $oldUser = $wgUser; - $wgUser = $user; - $article = WikiPage::factory( $this->title ); - - // Messages: double-redirect-fixed-move, double-redirect-fixed-maintenance - $reason = wfMessage( 'double-redirect-fixed-' . $this->reason, - $this->redirTitle->getPrefixedText(), $newTitle->getPrefixedText() - )->inContentLanguage()->text(); - $article->doEditContent( $newContent, $reason, EDIT_UPDATE | EDIT_SUPPRESS_RC, false, $user ); - $wgUser = $oldUser; - - return true; - } - - /** - * Get the final destination of a redirect - * - * @param $title Title - * - * @return bool if the specified title is not a redirect, or if it is a circular redirect - */ - public static function getFinalDestination( $title ) { - $dbw = wfGetDB( DB_MASTER ); - - // Circular redirect check - $seenTitles = array(); - $dest = false; - - while ( true ) { - $titleText = $title->getPrefixedDBkey(); - if ( isset( $seenTitles[$titleText] ) ) { - wfDebug( __METHOD__, "Circular redirect detected, aborting\n" ); - return false; - } - $seenTitles[$titleText] = true; - - if ( $title->getInterwiki() ) { - // If the target is interwiki, we have to break early (bug 40352). - // Otherwise it will look up a row in the local page table - // with the namespace/page of the interwiki target which can cause - // unexpected results (e.g. X -> foo:Bar -> Bar -> .. ) - break; - } - - $row = $dbw->selectRow( - array( 'redirect', 'page' ), - array( 'rd_namespace', 'rd_title', 'rd_interwiki' ), - array( - 'rd_from=page_id', - 'page_namespace' => $title->getNamespace(), - 'page_title' => $title->getDBkey() - ), __METHOD__ ); - if ( !$row ) { - # No redirect from here, chain terminates - break; - } else { - $dest = $title = Title::makeTitle( $row->rd_namespace, $row->rd_title, '', $row->rd_interwiki ); - } - } - return $dest; - } - - /** - * Get a user object for doing edits, from a request-lifetime cache - * False will be returned if the user name specified in the - * 'double-redirect-fixer' message is invalid. - * - * @return User|bool - */ - function getUser() { - if ( !self::$user ) { - self::$user = User::newFromName( wfMessage( 'double-redirect-fixer' )->inContentLanguage()->text() ); - # User::newFromName() can return false on a badly configured wiki. - if ( self::$user && !self::$user->isLoggedIn() ) { - self::$user->addToDatabase(); - } - } - return self::$user; - } -} diff --git a/includes/job/jobs/DuplicateJob.php b/includes/job/jobs/DuplicateJob.php deleted file mode 100644 index be1bfe5c..00000000 --- a/includes/job/jobs/DuplicateJob.php +++ /dev/null @@ -1,59 +0,0 @@ -getTitle(), $job->getParams(), $job->id ); - $djob->command = $job->getType(); - $djob->params = is_array( $djob->params ) ? $djob->params : array(); - $djob->params = array( 'isDuplicate' => true ) + $djob->params; - $djob->metadata = $job->metadata; - return $djob; - } - - public function run() { - return true; - } -} diff --git a/includes/job/jobs/EmaillingJob.php b/includes/job/jobs/EmaillingJob.php deleted file mode 100644 index 9fbf3124..00000000 --- a/includes/job/jobs/EmaillingJob.php +++ /dev/null @@ -1,47 +0,0 @@ -params['to'], - $this->params['from'], - $this->params['subj'], - $this->params['body'], - $this->params['replyto'] - ); - - return $status->isOK(); - } - -} diff --git a/includes/job/jobs/EnotifNotifyJob.php b/includes/job/jobs/EnotifNotifyJob.php deleted file mode 100644 index bbe988d0..00000000 --- a/includes/job/jobs/EnotifNotifyJob.php +++ /dev/null @@ -1,58 +0,0 @@ -params['editorID'] ) && $this->params['editorID'] ) { - $editor = User::newFromId( $this->params['editorID'] ); - // B/C, only the name might be given. - } else { - # FIXME: newFromName could return false on a badly configured wiki. - $editor = User::newFromName( $this->params['editor'], false ); - } - $enotif->actuallyNotifyOnPageChange( - $editor, - $this->title, - $this->params['timestamp'], - $this->params['summary'], - $this->params['minorEdit'], - $this->params['oldid'], - $this->params['watchers'], - $this->params['pageStatus'] - ); - return true; - } - -} diff --git a/includes/job/jobs/HTMLCacheUpdateJob.php b/includes/job/jobs/HTMLCacheUpdateJob.php deleted file mode 100644 index 44c240bb..00000000 --- a/includes/job/jobs/HTMLCacheUpdateJob.php +++ /dev/null @@ -1,263 +0,0 @@ -rowsPerJob = $wgUpdateRowsPerJob; - $this->rowsPerQuery = $wgUpdateRowsPerQuery; - $this->blCache = $title->getBacklinkCache(); - } - - public function run() { - if ( isset( $this->params['start'] ) && isset( $this->params['end'] ) ) { - # This is hit when a job is actually performed - return $this->doPartialUpdate(); - } else { - # This is hit when the jobs have to be inserted - return $this->doFullUpdate(); - } - } - - /** - * Update all of the backlinks - */ - protected function doFullUpdate() { - global $wgMaxBacklinksInvalidate; - - # Get an estimate of the number of rows from the BacklinkCache - $max = max( $this->rowsPerJob * 2, $wgMaxBacklinksInvalidate ) + 1; - $numRows = $this->blCache->getNumLinks( $this->params['table'], $max ); - if ( $wgMaxBacklinksInvalidate !== false && $numRows > $wgMaxBacklinksInvalidate ) { - wfDebug( "Skipped HTML cache invalidation of {$this->title->getPrefixedText()}." ); - return true; - } - - if ( $numRows > $this->rowsPerJob * 2 ) { - # Do fast cached partition - $this->insertPartitionJobs(); - } else { - # Get the links from the DB - $titleArray = $this->blCache->getLinks( $this->params['table'] ); - # Check if the row count estimate was correct - if ( $titleArray->count() > $this->rowsPerJob * 2 ) { - # Not correct, do accurate partition - wfDebug( __METHOD__ . ": row count estimate was incorrect, repartitioning\n" ); - $this->insertJobsFromTitles( $titleArray ); - } else { - $this->invalidateTitles( $titleArray ); // just do the query - } - } - - return true; - } - - /** - * Update some of the backlinks, defined by a page ID range - */ - protected function doPartialUpdate() { - $titleArray = $this->blCache->getLinks( - $this->params['table'], $this->params['start'], $this->params['end'] ); - if ( $titleArray->count() <= $this->rowsPerJob * 2 ) { - # This partition is small enough, do the update - $this->invalidateTitles( $titleArray ); - } else { - # Partitioning was excessively inaccurate. Divide the job further. - # This can occur when a large number of links are added in a short - # period of time, say by updating a heavily-used template. - $this->insertJobsFromTitles( $titleArray ); - } - return true; - } - - /** - * Partition the current range given by $this->params['start'] and $this->params['end'], - * using a pre-calculated title array which gives the links in that range. - * Queue the resulting jobs. - * - * @param $titleArray array - * @param $rootJobParams array - * @return void - */ - protected function insertJobsFromTitles( $titleArray, $rootJobParams = array() ) { - // Carry over any "root job" information - $rootJobParams = $this->getRootJobParams(); - # We make subpartitions in the sense that the start of the first job - # will be the start of the parent partition, and the end of the last - # job will be the end of the parent partition. - $jobs = array(); - $start = $this->params['start']; # start of the current job - $numTitles = 0; - foreach ( $titleArray as $title ) { - $id = $title->getArticleID(); - # $numTitles is now the number of titles in the current job not - # including the current ID - if ( $numTitles >= $this->rowsPerJob ) { - # Add a job up to but not including the current ID - $jobs[] = new HTMLCacheUpdateJob( $this->title, - array( - 'table' => $this->params['table'], - 'start' => $start, - 'end' => $id - 1 - ) + $rootJobParams // carry over information for de-duplication - ); - $start = $id; - $numTitles = 0; - } - $numTitles++; - } - # Last job - $jobs[] = new HTMLCacheUpdateJob( $this->title, - array( - 'table' => $this->params['table'], - 'start' => $start, - 'end' => $this->params['end'] - ) + $rootJobParams // carry over information for de-duplication - ); - wfDebug( __METHOD__ . ": repartitioning into " . count( $jobs ) . " jobs\n" ); - - if ( count( $jobs ) < 2 ) { - # I don't think this is possible at present, but handling this case - # makes the code a bit more robust against future code updates and - # avoids a potential infinite loop of repartitioning - wfDebug( __METHOD__ . ": repartitioning failed!\n" ); - $this->invalidateTitles( $titleArray ); - } else { - JobQueueGroup::singleton()->push( $jobs ); - } - } - - /** - * @param $rootJobParams array - * @return void - */ - protected function insertPartitionJobs( $rootJobParams = array() ) { - // Carry over any "root job" information - $rootJobParams = $this->getRootJobParams(); - - $batches = $this->blCache->partition( $this->params['table'], $this->rowsPerJob ); - if ( !count( $batches ) ) { - return; // no jobs to insert - } - - $jobs = array(); - foreach ( $batches as $batch ) { - list( $start, $end ) = $batch; - $jobs[] = new HTMLCacheUpdateJob( $this->title, - array( - 'table' => $this->params['table'], - 'start' => $start, - 'end' => $end, - ) + $rootJobParams // carry over information for de-duplication - ); - } - - JobQueueGroup::singleton()->push( $jobs ); - } - - /** - * Invalidate an array (or iterator) of Title objects, right now - * @param $titleArray array - */ - protected function invalidateTitles( $titleArray ) { - global $wgUseFileCache, $wgUseSquid; - - $dbw = wfGetDB( DB_MASTER ); - $timestamp = $dbw->timestamp(); - - # Get all IDs in this query into an array - $ids = array(); - foreach ( $titleArray as $title ) { - $ids[] = $title->getArticleID(); - } - - if ( !$ids ) { - return; - } - - # Don't invalidated pages that were already invalidated - $touchedCond = isset( $this->params['rootJobTimestamp'] ) - ? array( "page_touched < " . - $dbw->addQuotes( $dbw->timestamp( $this->params['rootJobTimestamp'] ) ) ) - : array(); - - # Update page_touched - $batches = array_chunk( $ids, $this->rowsPerQuery ); - foreach ( $batches as $batch ) { - $dbw->update( 'page', - array( 'page_touched' => $timestamp ), - array( 'page_id' => $batch ) + $touchedCond, - __METHOD__ - ); - } - - # Update squid - if ( $wgUseSquid ) { - $u = SquidUpdate::newFromTitles( $titleArray ); - $u->doUpdate(); - } - - # Update file cache - if ( $wgUseFileCache ) { - foreach ( $titleArray as $title ) { - HTMLFileCache::clearFileCache( $title ); - } - } - } -} diff --git a/includes/job/jobs/NullJob.php b/includes/job/jobs/NullJob.php deleted file mode 100644 index b6164a5d..00000000 --- a/includes/job/jobs/NullJob.php +++ /dev/null @@ -1,76 +0,0 @@ - $queue = JobQueueGroup::singleton(); - * > $job = new NullJob( Title::newMainPage(), array( 'lives' => 10 ) ); - * > $queue->push( $job ); - * @endcode - * You can then confirm the job has been enqueued by using the showJobs.php - * maintenance utility: - * @code - * $ php maintenance/showJobs.php --group - * null: 1 queue; 0 claimed (0 active, 0 abandoned) - * $ - * @endcode - * - * @ingroup JobQueue - */ -class NullJob extends Job { - /** - * @param $title Title (can be anything) - * @param array $params job parameters (lives, usleep) - * @param $id Integer: job id - */ - function __construct( $title, $params, $id = 0 ) { - parent::__construct( 'null', $title, $params, $id ); - if ( !isset( $this->params['lives'] ) ) { - $this->params['lives'] = 1; - } - if ( !isset( $this->params['usleep'] ) ) { - $this->params['usleep'] = 0; - } - $this->removeDuplicates = !empty( $this->params['removeDuplicates'] ); - } - - public function run() { - if ( $this->params['usleep'] > 0 ) { - usleep( $this->params['usleep'] ); - } - if ( $this->params['lives'] > 1 ) { - $params = $this->params; - $params['lives']--; - $job = new self( $this->title, $params ); - JobQueueGroup::singleton()->push( $job ); - } - return true; - } -} diff --git a/includes/job/jobs/PublishStashedFileJob.php b/includes/job/jobs/PublishStashedFileJob.php deleted file mode 100644 index 5a24f93c..00000000 --- a/includes/job/jobs/PublishStashedFileJob.php +++ /dev/null @@ -1,140 +0,0 @@ -removeDuplicates = true; - } - - public function run() { - $scope = RequestContext::importScopedSession( $this->params['session'] ); - $context = RequestContext::getMain(); - try { - $user = $context->getUser(); - if ( !$user->isLoggedIn() ) { - $this->setLastError( "Could not load the author user from session." ); - return false; - } - - if ( count( $_SESSION ) === 0 ) { - // Empty session probably indicates that we didn't associate - // with the session correctly. Note that being able to load - // the user does not necessarily mean the session was loaded. - // Most likely cause by suhosin.session.encrypt = On. - $this->setLastError( "Error associating with user session. Try setting suhosin.session.encrypt = Off" ); - return false; - } - - - UploadBase::setSessionStatus( - $this->params['filekey'], - array( 'result' => 'Poll', 'stage' => 'publish', 'status' => Status::newGood() ) - ); - - $upload = new UploadFromStash( $user ); - // @todo initialize() causes a GET, ideally we could frontload the antivirus - // checks and anything else to the stash stage (which includes concatenation and - // the local file is thus already there). That way, instead of GET+PUT, there could - // just be a COPY operation from the stash to the public zone. - $upload->initialize( $this->params['filekey'], $this->params['filename'] ); - - // Check if the local file checks out (this is generally a no-op) - $verification = $upload->verifyUpload(); - if ( $verification['status'] !== UploadBase::OK ) { - $status = Status::newFatal( 'verification-error' ); - $status->value = array( 'verification' => $verification ); - UploadBase::setSessionStatus( - $this->params['filekey'], - array( 'result' => 'Failure', 'stage' => 'publish', 'status' => $status ) - ); - $this->setLastError( "Could not verify upload." ); - return false; - } - - // Upload the stashed file to a permanent location - $status = $upload->performUpload( - $this->params['comment'], - $this->params['text'], - $this->params['watch'], - $user - ); - if ( !$status->isGood() ) { - UploadBase::setSessionStatus( - $this->params['filekey'], - array( 'result' => 'Failure', 'stage' => 'publish', 'status' => $status ) - ); - $this->setLastError( $status->getWikiText() ); - return false; - } - - // Build the image info array while we have the local reference handy - $apiMain = new ApiMain(); // dummy object (XXX) - $imageInfo = $upload->getImageInfo( $apiMain->getResult() ); - - // Cleanup any temporary local file - $upload->cleanupTempFile(); - - // Cache the info so the user doesn't have to wait forever to get the final info - UploadBase::setSessionStatus( - $this->params['filekey'], - array( - 'result' => 'Success', - 'stage' => 'publish', - 'filename' => $upload->getLocalFile()->getName(), - 'imageinfo' => $imageInfo, - 'status' => Status::newGood() - ) - ); - } catch ( MWException $e ) { - UploadBase::setSessionStatus( - $this->params['filekey'], - array( - 'result' => 'Failure', - 'stage' => 'publish', - 'status' => Status::newFatal( 'api-error-publishfailed' ) - ) - ); - $this->setLastError( get_class( $e ) . ": " . $e->getText() ); - return false; - } - return true; - } - - public function getDeduplicationInfo() { - $info = parent::getDeduplicationInfo(); - if ( is_array( $info['params'] ) ) { - $info['params'] = array( 'filekey' => $info['params']['filekey'] ); - } - return $info; - } - - public function allowRetries() { - return false; - } -} diff --git a/includes/job/jobs/RefreshLinksJob.php b/includes/job/jobs/RefreshLinksJob.php deleted file mode 100644 index 4fc8bac6..00000000 --- a/includes/job/jobs/RefreshLinksJob.php +++ /dev/null @@ -1,222 +0,0 @@ -removeDuplicates = true; // job is expensive - } - - /** - * Run a refreshLinks job - * @return boolean success - */ - function run() { - $linkCache = LinkCache::singleton(); - $linkCache->clear(); - - if ( is_null( $this->title ) ) { - $this->error = "refreshLinks: Invalid title"; - return false; - } - - # Wait for the DB of the current/next slave DB handle to catch up to the master. - # This way, we get the correct page_latest for templates or files that just changed - # milliseconds ago, having triggered this job to begin with. - if ( isset( $this->params['masterPos'] ) && $this->params['masterPos'] !== false ) { - wfGetLB()->waitFor( $this->params['masterPos'] ); - } - - $revision = Revision::newFromTitle( $this->title, false, Revision::READ_NORMAL ); - if ( !$revision ) { - $this->error = 'refreshLinks: Article not found "' . - $this->title->getPrefixedDBkey() . '"'; - return false; // XXX: what if it was just deleted? - } - - self::runForTitleInternal( $this->title, $revision, __METHOD__ ); - - return true; - } - - /** - * @return Array - */ - public function getDeduplicationInfo() { - $info = parent::getDeduplicationInfo(); - // Don't let highly unique "masterPos" values ruin duplicate detection - if ( is_array( $info['params'] ) ) { - unset( $info['params']['masterPos'] ); - } - return $info; - } - - /** - * @param $title Title - * @param $revision Revision - * @param $fname string - * @return void - */ - public static function runForTitleInternal( Title $title, Revision $revision, $fname ) { - wfProfileIn( $fname ); - $content = $revision->getContent( Revision::RAW ); - - if ( !$content ) { - // if there is no content, pretend the content is empty - $content = $revision->getContentHandler()->makeEmptyContent(); - } - - // Revision ID must be passed to the parser output to get revision variables correct - $parserOutput = $content->getParserOutput( $title, $revision->getId(), null, false ); - - $updates = $content->getSecondaryDataUpdates( $title, null, false, $parserOutput ); - DataUpdate::runUpdates( $updates ); - - InfoAction::invalidateCache( $title ); - - wfProfileOut( $fname ); - } -} - -/** - * Background job to update links for a given title. - * Newer version for high use templates. - * - * @ingroup JobQueue - */ -class RefreshLinksJob2 extends Job { - function __construct( $title, $params, $id = 0 ) { - parent::__construct( 'refreshLinks2', $title, $params, $id ); - // Base jobs for large templates can easily be de-duplicated - $this->removeDuplicates = !isset( $params['start'] ) && !isset( $params['end'] ); - } - - /** - * Run a refreshLinks2 job - * @return boolean success - */ - function run() { - global $wgUpdateRowsPerJob; - - $linkCache = LinkCache::singleton(); - $linkCache->clear(); - - if ( is_null( $this->title ) ) { - $this->error = "refreshLinks2: Invalid title"; - return false; - } - - // Back compat for pre-r94435 jobs - $table = isset( $this->params['table'] ) ? $this->params['table'] : 'templatelinks'; - - // Avoid slave lag when fetching templates. - // When the outermost job is run, we know that the caller that enqueued it must have - // committed the relevant changes to the DB by now. At that point, record the master - // position and pass it along as the job recursively breaks into smaller range jobs. - // Hopefully, when leaf jobs are popped, the slaves will have reached that position. - if ( isset( $this->params['masterPos'] ) ) { - $masterPos = $this->params['masterPos']; - } elseif ( wfGetLB()->getServerCount() > 1 ) { - $masterPos = wfGetLB()->getMasterPos(); - } else { - $masterPos = false; - } - - $tbc = $this->title->getBacklinkCache(); - - $jobs = array(); // jobs to insert - if ( isset( $this->params['start'] ) && isset( $this->params['end'] ) ) { - # This is a partition job to trigger the insertion of leaf jobs... - $jobs = array_merge( $jobs, $this->getSingleTitleJobs( $table, $masterPos ) ); - } else { - # This is a base job to trigger the insertion of partitioned jobs... - if ( $tbc->getNumLinks( $table, $wgUpdateRowsPerJob + 1 ) <= $wgUpdateRowsPerJob ) { - # Just directly insert the single per-title jobs - $jobs = array_merge( $jobs, $this->getSingleTitleJobs( $table, $masterPos ) ); - } else { - # Insert the partition jobs to make per-title jobs - foreach ( $tbc->partition( $table, $wgUpdateRowsPerJob ) as $batch ) { - list( $start, $end ) = $batch; - $jobs[] = new RefreshLinksJob2( $this->title, - array( - 'table' => $table, - 'start' => $start, - 'end' => $end, - 'masterPos' => $masterPos, - ) + $this->getRootJobParams() // carry over information for de-duplication - ); - } - } - } - - if ( count( $jobs ) ) { - JobQueueGroup::singleton()->push( $jobs ); - } - - return true; - } - - /** - * @param $table string - * @param $masterPos mixed - * @return Array - */ - protected function getSingleTitleJobs( $table, $masterPos ) { - # The "start"/"end" fields are not set for the base jobs - $start = isset( $this->params['start'] ) ? $this->params['start'] : false; - $end = isset( $this->params['end'] ) ? $this->params['end'] : false; - $titles = $this->title->getBacklinkCache()->getLinks( $table, $start, $end ); - # Convert into single page refresh links jobs. - # This handles well when in sapi mode and is useful in any case for job - # de-duplication. If many pages use template A, and that template itself - # uses template B, then an edit to both will create many duplicate jobs. - # Roughly speaking, for each page, one of the "RefreshLinksJob" jobs will - # get run first, and when it does, it will remove the duplicates. Of course, - # one page could have its job popped when the other page's job is still - # buried within the logic of a refreshLinks2 job. - $jobs = array(); - foreach ( $titles as $title ) { - $jobs[] = new RefreshLinksJob( $title, - array( 'masterPos' => $masterPos ) + $this->getRootJobParams() - ); // carry over information for de-duplication - } - return $jobs; - } - - /** - * @return Array - */ - public function getDeduplicationInfo() { - $info = parent::getDeduplicationInfo(); - // Don't let highly unique "masterPos" values ruin duplicate detection - if ( is_array( $info['params'] ) ) { - unset( $info['params']['masterPos'] ); - } - return $info; - } -} diff --git a/includes/job/jobs/UploadFromUrlJob.php b/includes/job/jobs/UploadFromUrlJob.php deleted file mode 100644 index c993cfb4..00000000 --- a/includes/job/jobs/UploadFromUrlJob.php +++ /dev/null @@ -1,184 +0,0 @@ -upload = new UploadFromUrl(); - $this->upload->initialize( - $this->title->getText(), - $this->params['url'], - false - ); - $this->user = User::newFromName( $this->params['userName'] ); - - # Fetch the file - $opts = array(); - if ( $wgCopyUploadAsyncTimeout ) { - $opts['timeout'] = $wgCopyUploadAsyncTimeout; - } - $status = $this->upload->fetchFile( $opts ); - if ( !$status->isOk() ) { - $this->leaveMessage( $status ); - return true; - } - - # Verify upload - $result = $this->upload->verifyUpload(); - if ( $result['status'] != UploadBase::OK ) { - $status = $this->upload->convertVerifyErrorToStatus( $result ); - $this->leaveMessage( $status ); - return true; - } - - # Check warnings - if ( !$this->params['ignoreWarnings'] ) { - $warnings = $this->upload->checkWarnings(); - if ( $warnings ) { - - # Stash the upload - $key = $this->upload->stashFile(); - - if ( $this->params['leaveMessage'] ) { - $this->user->leaveUserMessage( - wfMessage( 'upload-warning-subj' )->text(), - wfMessage( 'upload-warning-msg', - $key, - $this->params['url'] )->text() - ); - } else { - wfSetupSession( $this->params['sessionId'] ); - $this->storeResultInSession( 'Warning', - 'warnings', $warnings ); - session_write_close(); - } - - return true; - } - } - - # Perform the upload - $status = $this->upload->performUpload( - $this->params['comment'], - $this->params['pageText'], - $this->params['watch'], - $this->user - ); - $this->leaveMessage( $status ); - return true; - - } - - /** - * Leave a message on the user talk page or in the session according to - * $params['leaveMessage']. - * - * @param $status Status - */ - protected function leaveMessage( $status ) { - if ( $this->params['leaveMessage'] ) { - if ( $status->isGood() ) { - $this->user->leaveUserMessage( wfMessage( 'upload-success-subj' )->text(), - wfMessage( 'upload-success-msg', - $this->upload->getTitle()->getText(), - $this->params['url'] - )->text() ); - } else { - $this->user->leaveUserMessage( wfMessage( 'upload-failure-subj' )->text(), - wfMessage( 'upload-failure-msg', - $status->getWikiText(), - $this->params['url'] - )->text() ); - } - } else { - wfSetupSession( $this->params['sessionId'] ); - if ( $status->isOk() ) { - $this->storeResultInSession( 'Success', - 'filename', $this->upload->getLocalFile()->getName() ); - } else { - $this->storeResultInSession( 'Failure', - 'errors', $status->getErrorsArray() ); - } - session_write_close(); - } - } - - /** - * Store a result in the session data. Note that the caller is responsible - * for appropriate session_start and session_write_close calls. - * - * @param string $result the result (Success|Warning|Failure) - * @param string $dataKey the key of the extra data - * @param $dataValue Mixed: the extra data itself - */ - protected function storeResultInSession( $result, $dataKey, $dataValue ) { - $session =& self::getSessionData( $this->params['sessionKey'] ); - $session['result'] = $result; - $session[$dataKey] = $dataValue; - } - - /** - * Initialize the session data. Sets the intial result to queued. - */ - public function initializeSessionData() { - $session =& self::getSessionData( $this->params['sessionKey'] ); - $$session['result'] = 'Queued'; - } - - /** - * @param $key - * @return mixed - */ - public static function &getSessionData( $key ) { - if ( !isset( $_SESSION[self::SESSION_KEYNAME][$key] ) ) { - $_SESSION[self::SESSION_KEYNAME][$key] = array(); - } - return $_SESSION[self::SESSION_KEYNAME][$key]; - } -} -- cgit v1.2.2